Kafka/Spark消费topic到写出到topic

2023-11-17

1 Kafka的工具类

1.1 从kafka消费数据的方法

  1. 消费者代码
  def getKafkaDStream(ssc : StreamingContext , topic: String  , groupId:String  ) ={
    consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)

    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(topic), consumerConfigs))
    kafkaDStream
  }
  1. 注意点
  • consumerConfigs是定义的可变的map的类型的,具体如下
private val consumerConfigs: mutable.Map[String, Object] = mutable.Map[String,Object](
    // kafka集群位置

    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS),

    // kv反序列化器
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    // groupId
    // offset提交  自动 手动
    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
    //自动提交的时间间隔
    //ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
    // offset重置  "latest"  "earliest"
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"
    // .....
  )
  • consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是为了不限制groupId特意写的传参

  • 是使用自带的kafka工具类createDirectStream方法去消费kafak 的数据,详细参数解释如下

在`KafkaUtils.createDirectStream`方法中,后续传递的参数的含义如下:

1. `ssc`:这是一个`StreamingContext`对象,用于指定Spark Streaming的上下文。
2. `LocationStrategies.PreferConsistent`:这是一个位置策略,用于指定Kafka消费者的位置策略。`PreferConsistent`表示优先选择分区分布均匀的消费者。
3. `ConsumerStrategies.Subscribe[String, String]`:这是一个消费者策略,用于指定Kafka消费者的订阅策略。`Subscribe[String, String]`表示按照指定的泛型主题字符串数组订阅消息,键和值的类型都为`String`。
4. `Array(topic)`:这是一个字符串数组,用于指定要订阅的Kafka主题。
5. `consumerConfigs`:这是一个`java.util.Properties`类型的对象,其中配置了一些Kafka消费者的属性。

总之,在`KafkaUtils.createDirectStream`方法中,这些参数组合被用于创建一个Kafka直连流(Direct Stream),该流可以直接从Kafka主题中消费消息,并将其转换为`InputDStream[ConsumerRecord[String, String]]`类型的DStream。

在这里插入图片描述

  • Subscribe传参需要指定泛型,这边指定string,表示指定主题的键和值的类型,即Array(topic), consumerConfigs传参是string

在这里插入图片描述

  • 最后方法返回一个kafkaDStream

1.2 kafka的生产数据的方法

  1. 生产者代码
  • 创建与配置
/**
    * 生产者对象
    */
  val producer : KafkaProducer[String,String] = createProducer()

  /**
    * 创建生产者对象
    */
  def createProducer():KafkaProducer[String,String] = {
    val producerConfigs: util.HashMap[String, AnyRef] = new util.HashMap[String,AnyRef]
    //生产者配置类 ProducerConfig
    //kafka集群位置
    //producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092")
    //producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils("kafka.bootstrap-servers"))
    producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS))
    //kv序列化器
    producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
    producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
    //acks
    producerConfigs.put(ProducerConfig.ACKS_CONFIG , "all")
    //batch.size  16kb
    //linger.ms   0
    //retries
    //幂等配置
    producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG , "true")

    val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](producerConfigs)
    producer
  }
  • 生产方法
  /**
    * 生产(按照默认的黏性分区策略)
    */
  def send(topic : String  , msg : String ):Unit = {
    producer.send(new ProducerRecord[String,String](topic , msg ))
  }

  /**或者!
    * 生产(按照key进行分区)
    */
  def send(topic : String  , key : String ,  msg : String ):Unit = {
    producer.send(new ProducerRecord[String,String](topic , key ,  msg ))
  }
  • 关闭生产
/**
    * 关闭生产者对象
    */
  def close():Unit = {
    if(producer != null ) producer.close()
  }

  /**
    * 刷写 ,将缓冲区的数据刷写到磁盘
    *
    */
  def flush(): Unit ={
    producer.flush()
  }

2 消费数据

2.1 消费到数据

单纯的使用返回的ConsumerRecord不支持序列化,没有实现序列化接口

在这里插入图片描述

因此需要转换成通用的jsonobject对象

//3. 处理数据
    //3.1 转换数据结构
    val jsonObjDStream: DStream[JSONObject] = offsetRangesDStream.map(
      consumerRecord => {
        //获取ConsumerRecord中的value,value就是日志数据
        val log: String = consumerRecord.value()
        //转换成Json对象
        val jsonObj: JSONObject = JSON.parseObject(log)
        //返回
        jsonObj
      }
    )

2.2 数据分流发送到对应topic

  1. 提取错误数据并发送到对应的topic中
jsonObjDStream.foreachRDD(
      rdd => {

        rdd.foreachPartition(
          jsonObjIter => {
            for (jsonObj <- jsonObjIter) {
              //分流过程
              //分流错误数据
              val errObj: JSONObject = jsonObj.getJSONObject("err")
              if(errObj != null){
                //将错误数据发送到 DWD_ERROR_LOG_TOPIC
                MyKafkaUtils.send(DWD_ERROR_LOG_TOPIC ,  jsonObj.toJSONString )
              }else{
                  
              }
            }
          }
        }	
  1. 将公共字段和页面数据发送到DWD_PAGE_DISPLAY_TOPIC
    在这里插入图片描述
else{
                // 提取公共字段
                val commonObj: JSONObject = jsonObj.getJSONObject("common")
                val ar: String = commonObj.getString("ar")
                val uid: String = commonObj.getString("uid")
                val os: String = commonObj.getString("os")
                val ch: String = commonObj.getString("ch")
                val isNew: String = commonObj.getString("is_new")
                val md: String = commonObj.getString("md")
                val mid: String = commonObj.getString("mid")
                val vc: String = commonObj.getString("vc")
                val ba: String = commonObj.getString("ba")
                //提取时间戳
                val ts: Long = jsonObj.getLong("ts")
                // 页面数据
                val pageObj: JSONObject = jsonObj.getJSONObject("page")
                if(pageObj != null ){
                  //提取page字段
                  val pageId: String = pageObj.getString("page_id")
                  val pageItem: String = pageObj.getString("item")
                  val pageItemType: String = pageObj.getString("item_type")
                  val duringTime: Long = pageObj.getLong("during_time")
                  val lastPageId: String = pageObj.getString("last_page_id")
                  val sourceType: String = pageObj.getString("source_type")

                  //封装成PageLog,这边还写了bean实体类去接收
                  var pageLog =
                    PageLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,ts)
                  //发送到DWD_PAGE_LOG_TOPIC
                  MyKafkaUtils.send(DWD_PAGE_LOG_TOPIC , JSON.toJSONString(pageLog , new SerializeConfig(true)))//scala中bean没有set和get方法,这边是直接操作字段
                }

  1. 其他曝光、事件、启动数据如下
                  //提取曝光数据
                  val displaysJsonArr: JSONArray = jsonObj.getJSONArray("displays")
                  if(displaysJsonArr != null && displaysJsonArr.size() > 0 ){
                    for(i <- 0 until displaysJsonArr.size()){
                      //循环拿到每个曝光
                      val displayObj: JSONObject = displaysJsonArr.getJSONObject(i)
                      //提取曝光字段
                      val displayType: String = displayObj.getString("display_type")
                      val displayItem: String = displayObj.getString("item")
                      val displayItemType: String = displayObj.getString("item_type")
                      val posId: String = displayObj.getString("pos_id")
                      val order: String = displayObj.getString("order")

                      //封装成PageDisplayLog
                      val pageDisplayLog =
                        PageDisplayLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,displayType,displayItem,displayItemType,order,posId,ts)
                      // 写到 DWD_PAGE_DISPLAY_TOPIC
                      MyKafkaUtils.send(DWD_PAGE_DISPLAY_TOPIC , JSON.toJSONString(pageDisplayLog , new SerializeConfig(true)))
                    }
                  }
                  //提取事件数据(课下完成)
                  val actionJsonArr: JSONArray = jsonObj.getJSONArray("actions")
                  if(actionJsonArr != null && actionJsonArr.size() > 0 ){
                    for(i <- 0 until actionJsonArr.size()){
                      val actionObj: JSONObject = actionJsonArr.getJSONObject(i)
                      //提取字段
                      val actionId: String = actionObj.getString("action_id")
                      val actionItem: String = actionObj.getString("item")
                      val actionItemType: String = actionObj.getString("item_type")
                      val actionTs: Long = actionObj.getLong("ts")

                      //封装PageActionLog
                      var pageActionLog =
                        PageActionLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,actionId,actionItem,actionItemType,actionTs,ts)
                      //写出到DWD_PAGE_ACTION_TOPIC
                      MyKafkaUtils.send(DWD_PAGE_ACTION_TOPIC , JSON.toJSONString(pageActionLog , new SerializeConfig(true)))
                    }
                  }
                }
                // 启动数据(课下完成)
                val startJsonObj: JSONObject = jsonObj.getJSONObject("start")
                if(startJsonObj != null ){
                  //提取字段
                  val entry: String = startJsonObj.getString("entry")
                  val loadingTime: Long = startJsonObj.getLong("loading_time")
                  val openAdId: String = startJsonObj.getString("open_ad_id")
                  val openAdMs: Long = startJsonObj.getLong("open_ad_ms")
                  val openAdSkipMs: Long = startJsonObj.getLong("open_ad_skip_ms")

                  //封装StartLog
                  var startLog =
                    StartLog(mid,uid,ar,ch,isNew,md,os,vc,ba,entry,openAdId,loadingTime,openAdMs,openAdSkipMs,ts)
                  //写出DWD_START_LOG_TOPIC
                  MyKafkaUtils.send(DWD_START_LOG_TOPIC , JSON.toJSONString(startLog ,new SerializeConfig(true)))

2.3 精确一次消费

  1. 背景

发送kafka的是自动提交,如果提交有误,会出现漏消费或者重复消费

  1. 相关语义
  • 至少一次消费:数据不会丢失,但存在数据重复
  • 最多一次消费:数据不会重复,但可能丢失数据
  • 精确一次消费:不多不少一次消费
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka/Spark消费topic到写出到topic 的相关文章

  • QML实现文件十六进制数据展示

    前言 将一个二进制文件直接拖放到Qt Creator中可以直接查看到以十六进制显示的数据格式 如 要实现一个这样的效果 还是要花不少时间的 在网上找了挺多示例 其中一个开源代码效果不错 参考这里 但是是在QWidget中实现的 通过继承QA

随机推荐

  • 小知识:随机生成26个字母中(一个或多个)的字母

    小知识 就直接上代码了 不多说 String str for int i 0 i lt 1 i str str char Math random 26 A 特别注意的2点 1 A 是随机生成大写的26个随机字母 2 a 是随机生成小写的26
  • LangChain之Output parsers

    LangChain之Output parsers Output parsers将LLM输出的文本 转换为structured data CommaSeparatedListOutputParser 解析结果为List 提示词如下 def g
  • 用python怎样实现滑动验证码呢?

    手把手带大家实现Bilibili模拟登陆 滑动验证码 项目 来肝 1 为什么要处理滑动验证码 在很多时候我们在做模拟登陆的时候会遇到滑动验证码 这个时候就必须要处理 2 目标网站 bilibili视频网站的滑动验证码 外链图片转存失败 源站
  • 头文件 sting.h 和 cstring 还有 string 区别

  • 显示器颜色不正常的原因是什么

    显示器是电脑的重要部件之一 显示器颜色不正常 会对我们看电脑造成很大影响 也很容易感觉到眼睛疲劳 显示器颜色不正常是怎么回事 应该怎么处理呢 下面为大家一一道来 显示器颜色不正常的根源 显示器是属于电脑的I O设备 即输入输出设备 它可以分
  • echarts之饼图制作+标示线

    1 安装echarts组件 npm install echarts s 2 在main js中全局引入以及挂载 import echarts from echarts 引入 Vue prototype echarts echarts 挂载
  • 对于Scanner类中next()和nextLine()的区别

    对于键盘录入对象Scanner对象的两个录入字符串方法的区别 Scanner sc new Scanner System in sc next 和 sc nextLine 的区别 next 对于录入的字符串碰到空格就会停止录入 nextLi
  • CentOS7 yum源修改为阿里,配置阿里epel源

    一 概念 区分 yum源 什么是yum源 yum是一个在CentOS RedHat和Fedora操作系统中使用的Shell前端软件包管理器 yum主要管理基于rpm的软件包 Centos先将发布的软件放置到YUM服务器内 然后分析这些软件的
  • 自定义分页标签

    原文地址 http blog csdn net wjt1989wjt article details 4720350 步骤一 编写分页标签处理类 分页标签处理类 public class PagerTag extends TagSuppor
  • vue使用的百度地图的天气查询功能

    首先需要在 Vue js 项目中安装百度地图 JavaScript API SDK 并获取相应的密钥 ak 然后 可以按照以下步骤使用百度地图的天气查询功能并使用 Axios 进行请求 官网文档 https lbsyun baidu com
  • Vue3 Cannot read properties of undefined (reading ‘use‘)

    在用vue3脚手架搭建项目的时候 配置路由 一直报错 错误代码不显示了 正确代码 min js import createApp from vue import App from App vue import Router from rou
  • Linux系统使用 NetworkManager 工具来管理网络

    使用 NetworkManager 工具来管理网络 其在命令行下对应的命令是 nmcli 要连接WiFi 相关的命令如下 1 查看网络设备列表 sudo nmcli dev 注意 如果列出的设备状态全部是 unmanaged 的 说明这些网
  • 单纯记录一下主题色样式——笔记

    单纯想 记录一下这个主题样式的设置 HTML代码
  • Prometheus: 通过ConfigMap来添加Grafana仪表盘

    如果你通过kube prometheus stack部署了Prometheus Grafana 那么Grafana中的仪表盘就是通过边车 sidecar 来动态获取的 Sidecar的观察对象是ConfigMap 当ConfigMap中配置
  • JavaFx如何打成exe包并设置exe的图标

    JavaFx如何打成exe包并设置exe的图标 javaFx在本地运行没问题后 想打成exe包 并在没有jdk的环境下使用 可参考以下操作 在pom中添加如下maven插件
  • 利用qt 信号槽传递自定义结构体--借助QVariant

    在前面的博客里 我介绍了利用Q DECLARE METATYPE和qRegsterMetaType来传递自定义的结构体 但是这样做有个缺点 qRegisterMetaType 只能在main 函数里才能发挥作用 https blog csd
  • Towards Open Set Deep Networks:开放世界的目标检测

    文章发表于2016年 文章链接 1 概述 随着深度网络在目标检测领域的发展 网络的性能和准确率都在不断提升 但是存在的一个问题 深度网络很容易被一些图片 在人类看来没有意义 所欺骗 即使我们觉得该图像并不属于某一类别 但是深度网络还是会以高
  • yii2+ueditor百度富文本编辑器+七牛云单图多图均可

    ueditor百度富文本版本 1 4 3 yii2七牛云SDK yii2安装及使用七牛云文件上传 第一步 打开 web ueditor php Uploader class php文件在最顶部引入 yii2安装及使用七牛云文件上传 内com
  • SQL Server 列转行函数 UNPIVOT(大数据)

    SQL Server 列转行函数 UNPIVOT 大数据 在 SQL Server 中 UNPIVOT 是一种用于将列转换为行的函数 它可以帮助我们重新组织和分析数据 本文将详细介绍 UNPIVOT 函数的使用方法以及如何在处理大数据时进行
  • Kafka/Spark消费topic到写出到topic

    1 Kafka的工具类 1 1 从kafka消费数据的方法 消费者代码 def getKafkaDStream ssc StreamingContext topic String groupId String consumerConfigs