SparkStreaming与Kafka010之05之01 Consumer

2023-11-11

package Kafka010

import Kafka010.Utils.MyKafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Created by Shi shuai RollerQing on 2019/12/24 19:47
 *
 * kakfa的API 0-10版本的Consumer测试
 */
//TODO :  kakfa的API 0-10版本的Consumer测试
object Kafka010Demo01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName(s"${this.getClass.getCanonicalName}")
    val ssc = new StreamingContext(conf, Seconds(5))

    val topic = List("topicA") //后面的ConsumerStrategies的参数要求topic为集合的形式 可能不止一个topic
    val kafkaParams = MyKafkaUtils.getKafkaConsumerParams("SparkKafka010")

    val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topic, kafkaParams)
    )

    ds.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        println(rdd.count())
      }
    })
    ssc.start()
    ssc.awaitTermination()

  }
}
   //这个KafkaUtils.createDirectStream要规定kafka的k v的类型 然后三个参数 一个ssc 另外两个位置策略和消费者策略点进去看看
   // PreferConsistent: Use this in most cases, it will consistently distribute partitions across all executors.
   // PreferBrokers: Use this only if your executors are on the same nodes as your Kafka brokers.
   // PreferFixed: Use this to place particular TopicPartitions on particular hosts if your load is uneven.
   // Any TopicPartition not specified in the map will use a consistent location.

// Assign: 消费部分分区

工具类

package Kafka010.Utils

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

/**
 * Created by Shi shuai RollerQing on 2019/12/24 19:20
 */
object MyKafkaUtils {

  def getKafkaConsumerParams(grouid: String = "SparkStreaming010", autoCommit: String = "true"): Map[String, String] = {
    val kafkaParams = Map[String, String] (
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> autoCommit,
      //ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",//earliest、 none 、latest 具体含义可以点进去看
      ConsumerConfig.GROUP_ID_CONFIG -> grouid,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName
    )
    kafkaParams
  }

  /**
   * 这个是官网的写kafka配置的写法,不过还是推荐使用第一种,这样不用自己写参数,避免手误
   *
   * 这个没有经过测试 要是使用也要改下 传参数进来 比如跟上面一样的groupid 要不就使用默认的
   * @return
   */
  def getKafkaConsumerParams2(): Map[String, Object] = {
    val kafkaParams = Map[String, Object] {
      "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092"
      "key.deserializer" -> classOf[StringDeserializer]
      "value.deserializer" -> classOf[StringDeserializer]
      "auto.offset.reset" -> "latest"
      "group.id" -> "topicA"
      "enable.auto.commit" -> (true: java.lang.Boolean)
    }
    kafkaParams
  }

  def main(args: Array[String]): Unit = {
    println(classOf[StringDeserializer].getName) //org.apache.kafka.common.serialization.StringDeserializer
    println(classOf[StringDeserializer].getClass) //class java.lang.Class
    println(classOf[StringDeserializer]) //class org.apache.kafka.common.serialization.StringDeserializer

  }
}

结果没错 求的就是每5s的批次的数据条数
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SparkStreaming与Kafka010之05之01 Consumer 的相关文章

  • 大数据spark开发入门教程

    大数据是互联网发展的方向 大数据人才是未来的高薪贵族 随着大数据人才的供不应求 大数据人才的薪资待遇也在不断提升 如果你也想进入大数据行业 也想学习大数据技术 大数据讲师认为 可以先从spark技术开始 一 Spark是什么 Spark是一
  • spark-submit 报错 Initial job has not accepted any resources

    spark submit 报这样的错误 WARN scheduler TaskSchedulerImpl Initial job has not accepted any resources check your cluster UI to
  • Spark 从入门到放弃(一)Spark基础概念

    一 Spark基础概念 1 Application Spark应用程序 application 应用 其实就是用spark submit提交的程序 一个application通常包含三部分 从数据源 比方说HDFS 取数据形成RDD 通过R
  • 黑马头条 热点文章实时计算、kafkaStream

    热点文章 实时计算 1 今日内容 1 1 定时计算与实时计算 1 2 今日内容 kafkaStream 什么是流式计算 kafkaStream概述 kafkaStream入门案例 Springboot集成kafkaStream 实时计算 用
  • Flink消费kafka出现空指针异常

    文章目录 出现场景 表现 问题 解决 tombstone Kafka中提供了一个墓碑消息 tombstone 的概念 如果一条消息的key不为null 但是其value为null 那么此消息就是墓碑消息 出现场景 双流join时 采用的是l
  • springboot集成kafka实战项目,kafka生产者、消费者、创建topic,指定消费分区

    springboot集成kafka实战项目 kafka生产者 消费者 创建topic 指定消费分区 前言 本项目代码可直接集成到你现有的springboot项目中 功能包括 1 kafka生产者配置 2 kafka消费者配置 指定分区消费
  • kafka + zookeeper下载/安装/使用(超详细)

    kafka是需要zk来支持 所以先下载zk 1 下载安装zookeeper 下载地址 选择不带source的 下载下来解压2次 进入到 D zookeeper apache zookeeper 3 6 1 bin conf 目录下 把zoo
  • win10系统下安装Kafka 的详细步骤

    Win10 系统下要使用Kafka需要经过以下三个步骤 1 安装JDK 需要安装依赖java JDK 2 安装zookeeper 资源协调 分配管理 3 安装Kafka 一 安装 Java SE Development Kit 13 0 1
  • 附录:kafka源码启动

    本文以源码2 8为例 准备如下 idea 2019 1 4 jdk 1 8 scala 2 12 8 gradle 6 8 1 zookeeper 3 4 10 kafka2 8源码 注意 以下安装都需要装在没有空格的路径上 比如D Pro
  • 11.Linux下Spark的安装配置以及spark-shell的启动和 Spark集群环境搭建

    本案例软件包 链接 https pan baidu com s 1zABhjj2umontXe2CYBW DQ 提取码 1123 若链接失效在下面评论 我会及时更新 目录 1 安装Spark 1 先用xftp将安装包传到home hadoo
  • [Docker]使用Docker部署Kafka

    Kafka 是一个分布式流处理平台 它依赖于 ZooKeeper 作为其协调服务 在 Kafka 集群中 ZooKeeper 负责管理和协调 Kafka 的各个节点 因此 要在 Docker 容器中启动 Kafka 通常需要同时启动一个 Z
  • sparkstreamming 消费kafka(2)

    spark streaming提供了两种获取方式 一种是同storm一样 实时读取缓存到内存中 另一种是定时批量读取 这两种方式分别是 Receiver base Direct 一 Receiver base Spark官方最先提供了基于R
  • kafka系列——KafkaProducer源码分析

    实例化过程 在KafkaProducer的构造方法中 根据配置项主要完成以下对象或数据结构的实例化 配置项中解析出 clientId 用于跟踪程序运行情况 在有多个KafkProducer时 若没有配置 client id则clientId
  • Kafka 监控系统Eagle 使用教程 V1.4.0

    1 下载安装zookeeper 2 下载安装kafka 3 下载安装kafka eagle http download kafka eagle org tar zvxf kafka eagle bin 1 4 0 tar gz 4 配置JA
  • Kafka 权威指南

    Kafka 权威指南 这本书于 2021 年看完 2022 年又看了一遍 感觉书读百遍 其义自现 这本书侧重于 Kafka 的理论知识 虽然书有点老 但是其中关于 Kafka 的基础知识的章节讲得确实不错 适合学习 Kafka 的新手以及
  • Spark 配置

    文章目录 1 Spark 配置 1 1 Spark 属性 1 1 1 动态加载Spark属性 1 1 2 查看Spark属性 1 2 环境变量 2 重新指定配置文件目录 3 继承Hadoop集群配置 4 定制的Hadoop Hive配置 1
  • MQ - KAFKA 高级篇

    kafak是一个分布式流处理平台 提供消息持久化 基于发布 订阅的方式的消息中间件 同时通过消费端配置相同的groupId支持点对点通信 适用场景 构造实时流数据管道 用于系统或应用之间可靠的消息传输 数据采集及处理 例如连接到一个数据库系
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比
  • 阿里技术官亲笔力作:Kafka限量笔记,一本书助你掌握Kafka的精髓

    前言 分布式 堪称程序员江湖中的一把利器 无论面试还是职场 皆是不可或缺的技能 而Kafka 这款分布式发布订阅消息队列的璀璨明珠 其魅力之强大 无与伦比 对于Kafka的奥秘 我们仍需继续探索 要论对Kafka的熟悉程度 恐怕阿里的大佬们

随机推荐

  • Linux下多进程通信(signal,pipe)

    操作系统实验导航 实验一 银行家算法 https blog csdn net weixin 46291251 article details 115384510 实验二 多级队列调度和多级反馈队列调度算法 https blog csdn n
  • GpuMat ROI

    在引用GpuMat数据的ROI时 需要保证该数据在Gpu 内存中存储是连续的 使用gpu createContinuous创建连续空间 cuda GpuMat dst pyr laplace tmp dst pyr laplace gpu
  • LL(1)文法构造FIRST、FOLLOW、分析表并分析

    一 实验目的 学生运用编译原理的知识在实验技能和方法自行设计实验方案并加以实现 二 使用仪器 器材 计算机一台 操作系统 Windows10 编程软件 Intellij IDEA 三 实验内容及原理 1 实验内容 输入任意一个正确的文法G
  • Windows音量变化通知 - 系统音量监控

    Windows音量变化通知 系统音量监控 Endpoint Volume Controls 1 实现IAudioEndpointVolumeCallback接口 2 主函数 总结 参考 Endpoint Volume Controls 本次
  • 婚姻好不好,嫁给谁很重要

    都说幸福的婚姻是相似的 不幸的婚姻各有各的不幸 事实上 那些不幸的婚姻 追根究底不过都是找错了人 婚姻好不好 关键就在于嫁给谁 因为 值得相信的从来不是感情 而是人 人若靠谱 婚姻便可靠 人若靠不住 婚姻迟早生变 这世上 有的夫妻恩恩爱爱
  • 一次发生在JVM新生代和老年代的GC过程简述

    首先 我们假设程序当前的堆空间的情况如下 然后 程序在运行过程中 开始了我们的第一次YoungGC 年轻代GC 得到如下的图 通过这次的GC 我们的2 3 4对象都被回收了 只有1对象得到了保留 进入了S1 幸存者区 然后我们的程序在运行的
  • Java 通过Soap方式调用WebService接口

    import org apache commons lang3 StringEscapeUtils import org apache http HttpEntity import org apache http client config
  • 短视频seo抖音矩阵源码开发搭建技术解析

    一 短视频seo抖音矩阵源码开发需要考虑以下几个方面 技术选型 选择合适的开发语言 框架和数据库 常用的开发语言有Java PHP等 常用的框架有Spring Django等 常用的数据库有MySQL MongoDB等 服务器的选择 根据应
  • 如何在 NodeJs 中上传、处理和存储文件:分步手册

    存储文件有三种基本方法 1 直接将其存储在数据库中 2 将其存储在文件系统中并将路径保存到数据库 3 将其存储在某些云存储中 例如 Amazon S3 Google Cloud Storage 或 Microsoft Azure Blob
  • 去除自定义AlertDialog黑边

    http blog csdn net mwj 88 article details 45482421 1 现象描述 html view plain copy View view LayoutInflater from getActivity
  • java学习笔记——day1

    java笔记 字面量 变量 数据类型 命名规则 类型转换 运算符operator API 程序的流程控制 数组 字面量 变量 字面量 计算机用来处理数据的 字面量就是告诉程序员 数据在程序中的书写格式 字符 单引号 一个字符 字符串 双引号
  • python+selenium自动化软件测试(第3章):unittes

    3 1 unittest简介 前言 python基础比较弱的 建议大家多花点时间把基础语法学好 这里有套视频 可以照着练习下 http pan baidu com s 1i44jZdb 密码 92fs 熟悉java的应该都清楚常见的单元测试
  • 分层测试(一):什么是分层测试?

    什么是分层测试 分层测试是通过对质量问题分类 分层来保证整体系统质量的测试体系 模块内通过接口测试保证模块质量 多模块之间通过集成测试保证通信路径和模块间交互质量 整体系统通过端到端用例对核心业务场景进行验证 用户体验通过手工测试确保无妨碍
  • Unity开发(2)建片草地

    文章目录 1 简述 2 创建 2 1 创建项目 2 2 进入开发窗体 3 建个地面 3 1 新建地面 3 2 调整地面大小 3 3 添加草地 3 3 1 初识Unity图片资源 3 3 2 添加图片资源 3 3 3 修改图片在场景中大小 1
  • C语言入门知识1(零基础新手适用)

    C语言入门知识1 零基础新手适用 程序语言 1 机器语言 机器语言是低级语言 是用01码来编写的二进制代码语言 2 汇编语言 汇编语言也是低级语言 是用英文字母和符号串编写的 3 高级语言 由于汇编语言依赖于硬件体系且符合较多 为了方便高级
  • Go中 defer的使用

    文章目录 简介 示例 使用场景 捕获异常 文件操作 简介 defer 是 Golang 中的一个非常有用的关键字 它用于注册延迟调用 也就是一个函数的执行被延迟到调用它的函数返回之后 常用于资源清理 异常处理等场景 示例 defer 是注册
  • python实现电子邮件编程

    一 几个专业名词 MUA MTA MDA 假设我们自己的电子邮件地址是me 163 com 对方的电子邮件地址是friend sina com 注意地址都是虚构的哈 现在我们用Outlook或者Foxmail之类的软件写好邮件 填上对方的E
  • C++提高8: 类模板成员函数类外实现和类模板分文件编写

    1 类模板成员函数类外实现 类外实现主要有三个关键点 作用域 识别T的数据类型 告诉编译器这是一个类模板 剩下的 就还是基础的类内声明类外定义实现了 直接上代码观察一下 include
  • redis后台实现投票功能

    原创文章 转载请注明出处https blog csdn net qq 41969845 article details 108406059 一 前言 本文以投票功能为例 从实际例子中熟练掌握redis的应用 阅读本文需要有一定的Java基础
  • SparkStreaming与Kafka010之05之01 Consumer

    package Kafka010 import Kafka010 Utils MyKafkaUtils import org apache kafka clients consumer ConsumerRecord import org a