SparkStreaming与Kafka010之05之01 Consumer

2023-10-27

package Kafka010

import Kafka010.Utils.MyKafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Created by Shi shuai RollerQing on 2019/12/24 19:47
 *
 * kakfa的API 0-10版本的Consumer测试
 */
//TODO :  kakfa的API 0-10版本的Consumer测试
object Kafka010Demo01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName(s"${this.getClass.getCanonicalName}")
    val ssc = new StreamingContext(conf, Seconds(5))

    val topic = List("topicA") //后面的ConsumerStrategies的参数要求topic为集合的形式 可能不止一个topic
    val kafkaParams = MyKafkaUtils.getKafkaConsumerParams("SparkKafka010")

    val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topic, kafkaParams)
    )

    ds.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        println(rdd.count())
      }
    })
    ssc.start()
    ssc.awaitTermination()

  }
}
   //这个KafkaUtils.createDirectStream要规定kafka的k v的类型 然后三个参数 一个ssc 另外两个位置策略和消费者策略点进去看看
   // PreferConsistent: Use this in most cases, it will consistently distribute partitions across all executors.
   // PreferBrokers: Use this only if your executors are on the same nodes as your Kafka brokers.
   // PreferFixed: Use this to place particular TopicPartitions on particular hosts if your load is uneven.
   // Any TopicPartition not specified in the map will use a consistent location.

// Assign: 消费部分分区

工具类

package Kafka010.Utils

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

/**
 * Created by Shi shuai RollerQing on 2019/12/24 19:20
 */
object MyKafkaUtils {

  def getKafkaConsumerParams(grouid: String = "SparkStreaming010", autoCommit: String = "true"): Map[String, String] = {
    val kafkaParams = Map[String, String] (
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> autoCommit,
      //ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",//earliest、 none 、latest 具体含义可以点进去看
      ConsumerConfig.GROUP_ID_CONFIG -> grouid,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName
    )
    kafkaParams
  }

  /**
   * 这个是官网的写kafka配置的写法,不过还是推荐使用第一种,这样不用自己写参数,避免手误
   *
   * 这个没有经过测试 要是使用也要改下 传参数进来 比如跟上面一样的groupid 要不就使用默认的
   * @return
   */
  def getKafkaConsumerParams2(): Map[String, Object] = {
    val kafkaParams = Map[String, Object] {
      "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092"
      "key.deserializer" -> classOf[StringDeserializer]
      "value.deserializer" -> classOf[StringDeserializer]
      "auto.offset.reset" -> "latest"
      "group.id" -> "topicA"
      "enable.auto.commit" -> (true: java.lang.Boolean)
    }
    kafkaParams
  }

  def main(args: Array[String]): Unit = {
    println(classOf[StringDeserializer].getName) //org.apache.kafka.common.serialization.StringDeserializer
    println(classOf[StringDeserializer].getClass) //class java.lang.Class
    println(classOf[StringDeserializer]) //class org.apache.kafka.common.serialization.StringDeserializer

  }
}

结果没错 求的就是每5s的批次的数据条数
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SparkStreaming与Kafka010之05之01 Consumer 的相关文章

  • 大数据spark开发入门教程

    大数据是互联网发展的方向 大数据人才是未来的高薪贵族 随着大数据人才的供不应求 大数据人才的薪资待遇也在不断提升 如果你也想进入大数据行业 也想学习大数据技术 大数据讲师认为 可以先从spark技术开始 一 Spark是什么 Spark是一
  • Spark 从入门到放弃(一)Spark基础概念

    一 Spark基础概念 1 Application Spark应用程序 application 应用 其实就是用spark submit提交的程序 一个application通常包含三部分 从数据源 比方说HDFS 取数据形成RDD 通过R
  • 大数据开发必备面试题Spark篇合集

    1 Hadoop 和 Spark 的相同点和不同点 Hadoop 底层使用 MapReduce 计算架构 只有 map 和 reduce 两种操作 表达能力比较欠缺 而且在 MR 过程中会重复的读写 hdfs 造成大量的磁盘 io 读写操作
  • 大数据技术之Kafka——Kafka入门

    目录 一 概述 1 1 为什么要有Kafka 1 2 定义 1 3 消息队列 1 消息队列的应用场景 2 消息队列的两种模式 1 4 基础架构 二 Producer生产者 2 1 生产者消息发送流程 2 1 1 发送原理 2 2 异步发送A
  • Kafka 顺序消费方案

    Kafka 顺序消费方案 前言 1 问题引入 2 解决思路 3 实现方案 前言 本文针对解决Kafka不同Topic之间存在一定的数据关联时的顺序消费问题 如存在Topic insert和Topic update分别是对数据的插入和更新 当
  • Kafka——集群

    文章目录 集群 1 搭建个集群 2 集群发送消息 3 集群消费 3 1 Procuder 3 2 Consumer 4 消费顺序 集群 对于kafka来说 一个单独的broker意味着kafka集群中只有一个节点 要想增加kafka集群中的
  • 黑马头条 热点文章实时计算、kafkaStream

    热点文章 实时计算 1 今日内容 1 1 定时计算与实时计算 1 2 今日内容 kafkaStream 什么是流式计算 kafkaStream概述 kafkaStream入门案例 Springboot集成kafkaStream 实时计算 用
  • flink 1.4版本flink table方式消费kafka写入hive方式踩坑

    最近在搞flink 搞了一个当前比较新的版本试了一下 当时运行了很长时间 hdfs里面查询有文件 但是hive里面查询这个表为空 后面用了很多种方式 一些是说自己去刷新hive表 如下 第一种方式刷新 alter table t kafka
  • 大数据—— Flink 的优化

    目录 一 Flink内存优化 1 1 Flink 内存配置 二 配置进程参数 2 1 场景 2 2 操作步骤 三 解决数据倾斜 3 1 场景描述 3 2 解决方式 3 2 1 数据源的消费不均匀 调整并发度 3 2 2 数据分布不均匀 四
  • spark内存模型

    Spark 1 6 开始使用了统一内存管理模块 UnifiedMemoryManager 并引入了堆外内存 Off heap memory 1 6之前的内存管理就不进行介绍了 spark堆内和堆外内存模型的示意图 注意 堆外内存是依赖于wo
  • 学习笔记-Spark环境搭建与使用

    一 20 04 Ubuntu安装 清华源ISO源 https mirrors tuna tsinghua edu cn ubuntu releases 20 04 下载链接 https mirrors tuna tsinghua edu c
  • kafka配置内外网访问

    listeners 学名叫监听器 其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务 advertised listeners 和 listeners 相比多了个 advertised Advertised 的
  • spark groupByKey和groupBy,groupByKey和reduceByKey的区别

    1 groupByKey Vs groupBy 用于对pairRDD按照key进行排序 author starxhong object Test def main args Array String Unit val sparkConf n
  • Kafka生产者模式生成10亿条数据

    生产者生产消息 public class MyProducer2 public static void main String args throws InterruptedException 生产者 Properties properti
  • [分布式] zookeeper集群与kafka集群

    目录 一 Zookeeper 概述 1 1 Zookeeper定义 1 2 Zookeeper 工作机制 1 3 Zookeeper 特点 1 4 Zookeeper 数据结构 1 5 Zookeeper 应用场景 1 6 Zookeepe
  • 通过yarn提交作业到spark,运行一段时间后报错。

    加粗样式
  • shell脚本,一次性启动kafka集群

    版本centos6 5 64位操作系统 已配置JDK1 8 三个节点 在s121节点上可以免密登录到另外两个节点 另外kafka0 9 0 1的安装目录相同 修改了主机名 并在每个节点的hosts文件中设置了映射 脚本内容 bin bash
  • JAVA 安装与简单使用

    JAVA简易安装 下载安装 环境变量 进入变量界面 设置变量 验证JAVA环境 运行Java程序 个人站 ghzzz cn 还在备案 很快就能访问了 下载安装 第一步当然是从官网下载安装java了 网上有很多的教程 这里简单的写一下 在这里
  • python+django基于Spark的国漫画推荐系统 可视化大屏分析

    国漫推荐信息是现如今社会信息交流中一个重要的组成部分 本文将从国漫推荐管理的需求和现状进行分析 使得本系统的设计实现具有可使用的价 做出一个实用性好的国漫推荐系统 使其能满足用户的需求 并可以让用户更方便快捷地国漫推荐 国漫推荐系统的设计开
  • 一文弄懂事件Event与Kafka的区别

    事件 Event 和 Apache Kafka 是两个概念层面上有所不同的东西 它们在应用程序中的作用和使用场景也有很大的差异 1 概念和定义 事件 Event 事件是 系统内发生 的特定事情或状态变化的表示 在编程和软件设计中 事件通常被

随机推荐

  • javascript数据类型number、string和布尔

    number数字类型 计算机number是有一个范围的 2的53次方 2的53次方 注意书写顺序 例如一个商品17 45 买3个打9折 以下两种写法保留两位小数结果是不相同的 var obj age 20 var num 17 45 con
  • 8-使用QT5的鼠标事件和滚轮事件

    使用QT5的鼠标事件和滚轮事件 完成鼠标左键拖动窗口 双击全屏 滚轮放大缩小窗口大小 这里使用的是QMouseEvent类里面的鼠标事件 通常进行重定义部件的鼠标事件处理函数来实现自定义的内容操作 同样 鼠标滚轮操作是利用QWheelEve
  • 三个可替代“迅雷”的下载软件,速度超快!

    今天推荐可以替代迅雷的软件 那些你使用迅雷无法下载的资源 现在都可以下载了哦 1 qBittorrent 解压提供的安装包 然后双击 qbittorrent exe 找到你要下载的磁力链接 点击右上角 添加下载任务 设置你的下载路径 点击
  • K8S控制器Deployment

    简述 Deployment为Pod和ReplicaSet提供了一个声明式定义 declarative 方法 用来替代以前的ReplicationController来方便的管理应用 典型的应用场景包括 定义Deployment来创建Pod和
  • 关于搭建测试环境(详细)

    简述搭建测试环境 本人呢 是一名测试人员 以前工作的时候我们的测试环境都是网管 运维 帮我们管理的 顶多也就在tomcat下部署项目包 还是操作几个比较简单的指令 前不久就自己搭了套测试环境 然后也没事总结了一下 大致相同 可能存在个别差异
  • k8s lifecycle——poststart和prestop

    1 lifecycle的声明 lifecycle postStart exec command bin sh c sleep 100 preStop exec command bin sh c sleep 100 2 poststart 容
  • Nginx修改文件配置--配置本地网址

  • seata-server-1.5.2的环境搭建

    配置文件位置 使用的是nacos和mysql数据库 简单部署在Win10上 Linux上配置修改相同 启动命令不同 找到 seata server 1 5 2 seata conf目录下的application yml和applicatio
  • win10 vscode+clangd代码提示+cmake+mingw编译器和调试器

    win10 vscode clangd代码提示 cmake mingw编译器和调试器 前言 第一步 把cmake mingw llvm win64安装好 安装好vscode必备的插件 利用cmake构建一个项目 利用vscode的launc
  • 移动端VUE实现一周课程表

    效果图 点击课程弹出课程详情 代码 使用嵌套的v for循环去实现
  • m3u8文件中的 m3u8标签与属性说明

    EXTM3U 每个M3U文件第一行必须是这个tag 请标示作用 EXT X VERSION 3 该属性可以没有 EXT X MEDIA SEQUENCE 140651513 每一个media URI在PlayList中只有唯一的序号 相邻之
  • ubuntu18.04安装cuda、cudnn、pytorch-gpu

    cuda安装 参考博客 https blog csdn net mbdong article details 121926316 https mp weixin qq com s ZTzfC7xp8PVMvOONVIiK6g https b
  • Vue简单实例——Vuex代码实现

    简单介绍 上一篇我们介绍了Vuex的理论 这一章我们开始说明Vuex在代码方面的实现 基本使用 要想使用Vuex首先要进行下载 在下载的时候需要注意 如果你使用的vue2的框架 需要使用vuex的3版本 如果使用的是vue3的框架 才可以使
  • Open3D DbScanClustering聚类算法及聚类分簇可视化及存储

    DBSCAN聚类算法 是基于密度的聚类算法 该算法需要两个参数 labels np array pcd cluster dbscan eps 0 02 min points 10 print progress True 入参 eps 定义到
  • Java实体类与byte数组相互转换

    1 使用ByteArrayStream 和 ObjectStream public abstract class ByteConvert public byte getByte try ByteArrayOutputStream out n
  • Qt技巧:sqlite数据库 判断表是否存在

    m dbTest QSqlDatabase addDatabase QSQLITE m dbTest setDatabaseName sqlite db if m dbTest open qDebug lt lt database succ
  • Android安卓期末大作业 新闻app 实现注册登录增删改查功能

    Android安卓期末大作业 新闻app 文末附下载链接 app情况如下图所示 点我下载 https download csdn net download weixin 43474701 75953692
  • 泛微oa明细表添加按钮_关于E8,这些快捷方式你必须知道

    摘要 快捷方式不只是快 更能带来酣畅用户体验 本期场景为您带来E8快捷方式精选 看着那些大神们手指翻飞 在键盘上灵活起舞 说实话 你有过几分羡慕吗 快捷输入这东西 有时候真是少不了 虽然只是几秒几十秒的差异 但用户体验着实不同 试想 当你正
  • 基于OpenCV的数码管数字识别

    利用OpenCV可实现工业仪表设备的读数识别 仪表一般可分为两 数字式仪表和指针式仪表 本博文主要介绍一下数字式仪表识别的关键技术 下图是用软件模拟的数码管图片 本文识别的也就是图中的数字 一 图像定位 在实际的应用场景中 拍摄到的仪表区域
  • SparkStreaming与Kafka010之05之01 Consumer

    package Kafka010 import Kafka010 Utils MyKafkaUtils import org apache kafka clients consumer ConsumerRecord import org a