SparkStreaming与Kafka010之05之02 Consumer的offset 自定义设置offset

2023-11-06

package Kafka010

import Kafka010.Utils.MyKafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Created by Shi shuai RollerQing on 2019/12/24 19:47
 *
 * kakfa的API 0-10版本的Consumer测试
 */
//TODO :  kakfa的API 0-10版本的Consumer测试
object Kafka010Demo02 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName(s"${this.getClass.getCanonicalName}")
    val ssc = new StreamingContext(conf, Seconds(5))

    val topics = List("topicB") //后面的ConsumerStrategies的参数要求topic为集合的形式 可能不止一个topic
    val kafkaParams = MyKafkaUtils.getKafkaConsumerParams("SparkKafka010")


    //自定义设置offsets
    val offsets: Map[TopicPartition, Long] = Map(
      new TopicPartition("topicB", 0) -> 300L,
      new TopicPartition("topicB", 1) -> 300L,
      new TopicPartition("topicB", 2) -> 300L
    )


    val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
    )
    ds.foreachRDD(rdd => {
      rdd.foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()

  }
}

没错 从三百开始的
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SparkStreaming与Kafka010之05之02 Consumer的offset 自定义设置offset 的相关文章

  • Spark大数据分析与实战笔记(第一章 Scala语言基础-3)

    文章目录 1 3 Scala的数据结构 1 3 1 数组 数组的遍历 数组转换 1 3 2 元组 创建元组 获取元组中的值 拉链操作 1 3 3 集合 List Set Map 1 3 Scala的数据结构 对于每一门编程语言来说 数组 A
  • 大数据开发必备面试题Spark篇合集

    1 Hadoop 和 Spark 的相同点和不同点 Hadoop 底层使用 MapReduce 计算架构 只有 map 和 reduce 两种操作 表达能力比较欠缺 而且在 MR 过程中会重复的读写 hdfs 造成大量的磁盘 io 读写操作
  • windows python kafka 初级使用

    今天花了点时间在这个kafka上 因为我们工作中也用到了kafka 我这边对于kafka的理解是能用或者知道基本原理就行 实现在自己的windows环境搭建一次kafka 然后使用python进行数据的生产和消费 如果之后工作中对于kafk
  • 附录:kafka源码启动

    本文以源码2 8为例 准备如下 idea 2019 1 4 jdk 1 8 scala 2 12 8 gradle 6 8 1 zookeeper 3 4 10 kafka2 8源码 注意 以下安装都需要装在没有空格的路径上 比如D Pro
  • ELK配置记录(filebeat+kafka+Logstash+Elasticsearch+Kibana)

    一 简介 elk日志平台 日志收集 分析和展示的解决方案 满足用户对 志的查询 排序 统计需求 elk架构 filebeat 采集 kafka Logstash 管道 Elasticsearch 存储 搜索 Kibana 日志应用 各组件功
  • 学习笔记-Spark环境搭建与使用

    一 20 04 Ubuntu安装 清华源ISO源 https mirrors tuna tsinghua edu cn ubuntu releases 20 04 下载链接 https mirrors tuna tsinghua edu c
  • kafka配置内外网访问

    listeners 学名叫监听器 其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务 advertised listeners 和 listeners 相比多了个 advertised Advertised 的
  • kafka问题解决:org.apache.kafka.common.errors.TimeoutException

    记录使用kafka遇到的问题 1 Caused by java nio channels UnresolvedAddressException null 2 org apache kafka common errors TimeoutExc
  • 大数据手册(Spark)--Spark基本概念

    文章目录 Spark 基本概念 Hadoop 生态 Spark 生态 Spark 基本架构 Spark运行基本流程 弹性分布式数据集 RDD Spark安装配置 Spark基本概念 Spark基础知识 PySpark版 Spark机器学习
  • kafka(三)重平衡

    历史文章 kafka 一 kafka的基础与常用配置 文章目录 一 kafka消费者组 二 重平衡 Rebalance 2 1 重平衡触发条件 2 2 重平衡策略 2 2 1 Range 平均分配 2 2 2 RoundRobin 轮询分配
  • WebSocket + kafka实时推送数据(springboot纯后台)

    逻辑 kafka订阅消费者主题 消费后通过webSocket推送到前端 kafka vue financial webSocket 学习引用 SpringBoot2 0集成WebSocket 实现后台向前端推送信息 World Of Mos
  • [分布式] zookeeper集群与kafka集群

    目录 一 Zookeeper 概述 1 1 Zookeeper定义 1 2 Zookeeper 工作机制 1 3 Zookeeper 特点 1 4 Zookeeper 数据结构 1 5 Zookeeper 应用场景 1 6 Zookeepe
  • 【硬刚大数据之学习路线篇】2021年从零到大数据专家的学习指南(全面升级版)

    欢迎关注博客主页 https blog csdn net u013411339 本文由 王知无 原创 首发于 CSDN博客 本文首发CSDN论坛 未经过官方和本人允许 严禁转载 欢迎点赞 收藏 留言 欢迎留言交流 声明 本篇博客在我之前发表
  • sparkstreamming 消费kafka(2)

    spark streaming提供了两种获取方式 一种是同storm一样 实时读取缓存到内存中 另一种是定时批量读取 这两种方式分别是 Receiver base Direct 一 Receiver base Spark官方最先提供了基于R
  • sparkstreamming 消费kafka(1)

    pom
  • kafka系列——KafkaProducer源码分析

    实例化过程 在KafkaProducer的构造方法中 根据配置项主要完成以下对象或数据结构的实例化 配置项中解析出 clientId 用于跟踪程序运行情况 在有多个KafkProducer时 若没有配置 client id则clientId
  • 在windows系统下使用IDEA对kafka源码进行编译环境搭建以及配置

    目录 一 前期准备工作 step1 安装JDK1 8 step2 安装zookeeper单机版 step3 安装Gradle 5 4 step4 安装scala 2 11 12 二 将kafka源代码部署到编辑器IDEA并测试 step1
  • shell脚本,一次性启动kafka集群

    版本centos6 5 64位操作系统 已配置JDK1 8 三个节点 在s121节点上可以免密登录到另外两个节点 另外kafka0 9 0 1的安装目录相同 修改了主机名 并在每个节点的hosts文件中设置了映射 脚本内容 bin bash
  • Kafka——Mac搭建kafka环境

    1 下载Kafka安装包 下载地址 将压缩包移动到 usr local mv kafka 2 12 3 1 0 tgz usr local 解压 tar zxvf kafka 2 12 3 1 0 tgz 2 启动 启动zookeeper
  • 一文弄懂事件Event与Kafka的区别

    事件 Event 和 Apache Kafka 是两个概念层面上有所不同的东西 它们在应用程序中的作用和使用场景也有很大的差异 1 概念和定义 事件 Event 事件是 系统内发生 的特定事情或状态变化的表示 在编程和软件设计中 事件通常被

随机推荐

  • NUC980开源项目11-启动方式

    上面是我的微信和QQ群 欢迎新朋友的加入 项目码云地址 国内下载速度快 https gitee com jun626 nuc980 open source project 项目github地址 https github com Jun117
  • Windows7/10上快速搭建Tesseract-OCR开发环境操作步骤

    之前在https blog csdn net fengbingchun article details 51628957 中描述过如何在Windows上搭建Tesseract OCR开发环境 那时除了需要clone https github
  • MySQL——事务和视图

    2023 9 17 本章开始介绍TCL语言 Transaction Control Language 事务控制语言 事务 事务的概念 一个或一组sql语句组成一个执行单元 这个执行单元要么全部执行 要么全部不执行 事务的特性 ACID 原子
  • scala---spark本地调式远程获取hdfs数据注意事项

    文章目录 前言 一 Hadoop配置注意事项 1 1 core site xml 1 2 core site xml 二 本地hadoop环境配置注意事项 三 本地scala项目spark代码调试 总结 前言 这篇文章主要帮大家绕开一些本地
  • 异常关机后Oracle无法正常连接,使用 conn /as sysdba 出现 ORA-01034 和 ORA-27101: shared memory realm does not exist...

    最近异常关机导致oracle无法连接 一直提示ORA 01034和ORA 27101的错误 打开cmd后 输入 sqlplus npolog conn as sysdba 提示 ORA 01034 Oracle not available
  • windows10使用WSL安装Linux(以ubuntu为例)

    1 安装工具WSL 适用于 Linux 的 Windows 子系统 WSL 可让开发人员直接在 Windows 上按原样运行 GNU Linux 环境 包括大多数命令行工具 实用工具和应用程序 且不会产生传统虚拟机或双启动设置开销 是win
  • 浙大水业oa系统服务器地址,OA系统

    OA系统功能定位于知识管理 企业决策支持 资源共享和企业协同工作 它由单纯的办公自动化向提升到协助管理整个企业为目标 表现在以下四个方面 把协同工作融入业务流程中 团队中通过及时的交流 准确的任务分派从而实现高绩效管理 E OFFICE办公
  • 通过js修改网页内容

    js可以通过文本所在标签的id获取该标签对象 然后修改其内容 如 document getElementById 标签id innerHTML 要修改的文本内容 该方法可以在要修改的文本内容中加html标签 如果只是纯文本的话 可以使用in
  • 严重性 代码 说明 项目 文件 行 禁止显示状态

    严重性 代码 说明 项目 文件 行 禁止显示状态 错误 LNK2019 无法解析的外部符号 public void thiscall LinkedList
  • 解决ubuntu无法输入中文标点

    使用Ctrl 切换
  • ListBox控件 滚动条

    今天在使用LISTBOX控件中遇到的一点小问题 主要是两个问题 水平滚动条不显示内容 垂直滚动条没有自动滚动 在网上查了一下找到了解决办法 原来只需要向控件发送消息就行了 具体代码如下 以下都是在Dialog类中的函数操作 如果是使用 Se
  • C++编程规范(101条规则、准则与最佳实践)

    C 编程规范 101条规则 准则与最佳实践 虽然是书本的目录 但也是高度的概括和总结 组织和策略问题 第0条 不要拘泥于小节 了解哪些东西不应该标准化 第1 条 在高警告级别干净利落地进行编译 第2 条 使用自动构建系统 第3 条 使用版本
  • 解决uniapp在微信小程序显示图片/数据,h5不显示图片/数据。

    配置跨域 首先在mainifest json中的源码视图中配置跨域 h5 devServer port 8080 disableHostCheck true proxy dpc target https www edonguoji cn c
  • Linux系统编程之常用线程同步的三种方法

    Linux系统编程之线程同步高效率编程 Linux系统中线程最大的特点就是共享性 线程同步问题较为困难也很重要 最常用的三种是 条件变量 互斥锁 无名信号量 ps 有名信号量可用于进程同步 无名信号量只能用于线程同步 是轻量级的 一 互斥锁
  • Google Guava

    转载自并发编程网 ifeve com 本文链接地址 Google Guava官方教程 中文版 中文文档 http ifeve com google guava 开源地址 https github com google guava 今天偶然发
  • swagger3或者swagger报nullpointexception

    很简单这个问题就是版本不匹配 就是2 6 0以上版本的springbootmvc扫描方法和老版本不同 在springboot配置 application yml 里面加上如果是properties则是加上 spring mvc pathma
  • 配置Spark on YARN集群内存

    在这里插入代码片 运行文件有几个G大 默认的spark的内存设置就不行了 需要重新设置 还没有看Spark源码 只能先搜搜相关的博客解决问题 按照Spark应用程序中的driver分布方式不同 Spark on YARN有两种模式 yarn
  • 利用CNN进行人脸年龄预测

    很久之前做的东西了 最近做了一个人脸相似度检测 里面用到了这里的一个模型 所以抽个空把人脸年龄检测的思路总结一下 与其他CNN分类问题类似 人脸年龄预测无非就是将人脸分为多个类别 然后训练卷积神经网络 最后利用训练好的卷积神经网络进行分类即
  • 数据结构: 线性表(无哨兵位单链表实现)

    文章目录 1 线性表的链式表示 链表 1 1 顺序表的优缺点 1 2 链表的概念 1 3 链表的优缺点 1 4 链表的结构 2 单链表的定义 2 1 单链表的结构体 2 2 接口函数 3 接口函数的实现 3 1 动态申请一个结点 BuySL
  • SparkStreaming与Kafka010之05之02 Consumer的offset 自定义设置offset

    package Kafka010 import Kafka010 Utils MyKafkaUtils import org apache kafka clients consumer ConsumerRecord import org a