5.Flink对接Kafka入门

2023-05-16

Flink Connector Kafka

  • 1. Kafka
    • 1.1. [Kafka官网](http://kafka.apache.org/)
    • 1.2. Kafka 简述
    • 1.3. Kafka特性
    • 1.4. kafka的应用场景
    • 1.5. kafka-manager的部署
    • 1.6. `使用Kafka Connect导入/导出数据`
    • 1.7. [Kafka日志存储原理](https://blog.csdn.net/shujuelin/article/details/80898624)
  • 2. Kafka与Flink的融合
    • 2.1. kafka连接flink流计算,实现flink消费kafka的数据
    • 2.2. flink 读取kafka并且自定义水印再将数据写入kafka中
  • 3. Airbnb 是如何通过 balanced Kafka reader 来扩展 Spark streaming 实时流处理能力的
  • 4. 寄语:海阔凭鱼跃,天高任鸟飞

1. Kafka

1.1. Kafka官网

1.2. Kafka 简述

在这里插入图片描述
在这里插入图片描述

  • Kafka 是一个分布式消息系统:具有生产者、消费者的功能。它提供了类似于JMS 的特性,但是在设计实现上完全不同,此外它并不是JMS 规范的实现。

1.3. Kafka特性

  • 消息持久化:基于文件系统来存储和缓存消息
  • 高吞吐量
  • 多客户端支持:核心模块用Scala 语言开发,Kafka 提供了多种开发语言的接入,如Java 、Scala、C 、C++、Python 、Go 、Erlang 、Ruby 、Node. 等
  • 安全机制
    • 通过SSL 和SASL(Kerberos), SASL/PLA时验证机制支持生产者、消费者与broker连接时的身份认证;
    • 支持代理与ZooKeeper 连接身份验证
    • 通信时数据加密
    • 客户端读、写权限认证
    • Kafka 支持与外部其他认证授权服务的集成
  • 数据备份
  • 轻量级
  • 消息压缩

1.4. kafka的应用场景

  • Kafka作为消息传递系统
    在这里插入图片描述
  • Kafka 作为存储系统
    在这里插入图片描述
  • Kafka用做流处理
    在这里插入图片描述
  • 消息,存储,流处理结合起来使用
    在这里插入图片描述

1.5. kafka-manager的部署

Kafka Manager 由 yahoo 公司开发,该工具可以方便查看集群 主题分布情况,同时支持对 多个集群的管理、分区平衡以及创建主题等操作。

  • Centos7安装kafka-manager

  • 启动脚本

    • bin/cmak -Dconfig.file=conf/application.conf -java-home /usr/lib/jdk-11.0.6 -Dhttp.port=9008 &
  • 界面效果
    在这里插入图片描述
    在这里插入图片描述

  • 注意

1.6. 使用Kafka Connect导入/导出数据

  • 替代Flume——Kafka Connect
  • 集群模式
    • 注意: 在集群模式下,配置并不会在命令行传进去,而是需要REST API来创建,修改和销毁连接器。
    • 通过一个示例了解kafka connect连接器
    • kafka connect简介以及部署

1.7. Kafka日志存储原理

Kafka的Message存储采用了分区(partition)分段(LogSegment)稀疏索引这几个手段来达到了高效性

  • 查看分区.index文件

     bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files kafka-logs/t2-2/00000000000000000000.index 
    
  • 查看log文件

    /bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files t1-1/00000000000000000000.log --print-data-log
    
  • 查看TimeIndex文件

    bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files t1-2/00000000000000000000.timeindex --verify-index-only
    
  • 引入时间戳的作用

2. Kafka与Flink的融合

Flink 提供了专门的 Kafka 连接器,向 Kafka topic 中读取或者写入数据。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 机制,可提供 exactly-once 的处理语义。为此,Flink 并不完全依赖于跟踪 Kafka 消费组的偏移量,而是在内部跟踪和检查偏移量。

2.1. kafka连接flink流计算,实现flink消费kafka的数据

  • 创建flink项目
    sbt new tillrohrmann/flink-project.g8

  • 配置sbt

    ThisBuild / resolvers ++= Seq(
        "Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
        Resolver.mavenLocal
    )
    
    name := "FlinkKafkaProject"
    
    version := "1.0"
    
    organization := "com.xiaofan"
    
    ThisBuild / scalaVersion := "2.12.6"
    
    val flinkVersion = "1.10.0"
    val kafkaVersion = "2.2.0"
    
    val flinkDependencies = Seq(
      "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
      "org.apache.kafka" %% "kafka" % kafkaVersion % "provided",
      "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
      "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided")
    
    lazy val root = (project in file(".")).
      settings(
        libraryDependencies ++= flinkDependencies
      )
    
    assembly / mainClass := Some("com.xiaofan.Job")
    
    // make run command include the provided dependencies
    Compile / run  := Defaults.runTask(Compile / fullClasspath,
                                       Compile / run / mainClass,
                                       Compile / run / runner
                                      ).evaluated
    
    // stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain"
    Compile / run / fork := true
    Global / cancelable := true
    
    // exclude Scala library from assembly
    assembly / assemblyOption  := (assembly / assemblyOption).value.copy(includeScala = false)
    
    
  • 源代码

    package com.xiaofan
    
    import java.util.Properties
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
    import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    
    /**
     * 用flink消费kafka
     *
     * @author xiaofan
     */
    object ReadingFromKafka {
      val ZOOKEEPER_HOST = "192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181"
      val KAFKA_BROKER = "192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091"
      val TRANSACTION_GROUP = "com.xiaofan.flink"
    
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.enableCheckpointing(1000)
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    
        // configure kafka consumer
        val kafkaProps = new Properties()
        kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
        kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER)
        kafkaProps.setProperty("group.id", TRANSACTION_GROUP)
    
        val transaction: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("xiaofan01", new SimpleStringSchema(), kafkaProps))
        transaction.print
    
        env.execute()
    
      }
    }
    
    
  • 启动kafka集群,运行结果
    在这里插入图片描述

2.2. flink 读取kafka并且自定义水印再将数据写入kafka中

  • 需求说明(自定义窗口,每分钟的词频统计)

    • 从kafka中读取数据(topic:t1)
    • kafka中有event time时间值,通过该时间戳来进行时间划分,窗口长度为10秒,窗口步长为5秒
    • 由于生产中可能会因为网络或者其他原因导致数据延时,比如 00:00:10 时间的数据可能 00:00:12 才会传入kafka中,所以在flink的处理中应该设置延时等待处理,这里设置的2秒,可以自行修改。
    • 结果数据写入kafka中(topic:t2)(数据格式 time:时间 count:每分钟的处理条数
  • 准备环境flink1.10.0 + kafka2.2.0

  • 创建topic

    bin/kafka-topics.sh --create --bootstrap-server 192.168.1.25:9091 --replication-factor 2 --partitions 3 --topic t1
    
    bin/kafka-topics.sh --create --bootstrap-server 192.168.1.25:9091 --replication-factor 2 --partitions 3 --topic t2
    
  • 向t1中生产数据

    package com.xiaofan
    
    import java.util.Properties
    
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    
    object ProduceData {
      def main(args: Array[String]): Unit = {
        val props = new Properties()
        props.put("bootstrap.servers", "192.168.1.25:9091")
        props.put("acks", "1")
        props.put("retries", "3")
        props.put("batch.size", "16384") // 16K
    
        props.put("linger.ms", "1")
        props.put("buffer.memory", "33554432") // 32M
    
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    
        val producer = new KafkaProducer[String, String](props)
        var i = 0
        while (true) {
          i += 1
          // 模拟标记事件时间
          val record = new ProducerRecord[String, String]("t1", i + "," + System.currentTimeMillis())
          //  只管发送消息,不管是否发送成功
          producer.send(record)
          Thread.sleep(300)
        }
    
      }
    }
    
    
  • 消费t1数据,处理后再次传入kafka t2

    package com.xiaofan
    
    
    import java.text.SimpleDateFormat
    import java.util.{Date, Properties}
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
    
    /**
     * Watermark 案例
     * 根据自定义水印定义时间,计算每秒的消息数并且写入 kafka中
     */
    object StreamingWindowWatermarkScala {
    
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
    
        val topic = "t1"
        val prop = new Properties()
        prop.setProperty("bootstrap.servers","192.168.1.25:9091")
        prop.setProperty("group.id","con1")
    
    
        val myConsumer = new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),prop)
        // 添加源
        val text = env.addSource(myConsumer)
    
        val inputMap = text.map(line=>{
          val arr = line.split(",")
          (arr(0),arr(1).trim.toLong)
        })
        // 添加水印
        val waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {
          var currentMaxTimestamp = 0L
          var maxOutOfOrderness = 3000L// 最大允许的乱序时间是10s
    
          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
          override def getCurrentWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    
          override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long) = {
            val timestamp = element._2
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
            val id = Thread.currentThread().getId
            println("currentThreadId:"+id+",key:"+element._1+",eventtime:["+element._2+"|"+sdf.format(element._2)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp+"|"+sdf.format(getCurrentWatermark().getTimestamp)+"]")
            timestamp
          }
        })
    
        val window = waterMarkStream.map(x=>(x._2,1)).timeWindowAll(Time.seconds(1),Time.seconds(1)).sum(1).map(x=>"time:"+tranTimeToString(x._1.toString)+"  count:"+x._2)
        // .window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和调用TimeWindow效果一样
    
    
        val topic2 = "t2"
        val props = new Properties()
        props.setProperty("bootstrap.servers","192.168.1.25:9091")
        //使用支持仅一次语义的形式
        val myProducer = new FlinkKafkaProducer[String](topic2,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
    
        window.addSink(myProducer)
        env.execute("StreamingWindowWatermarkScala")
    
      }
    
    
      def tranTimeToString(timestamp:String) :String={
        val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        val time = fm.format(new Date(timestamp.toLong))
        time
      }
    
    
    }
    
  • 运行效果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3. Airbnb 是如何通过 balanced Kafka reader 来扩展 Spark streaming 实时流处理能力的

  • 参考链接1
  • 参考链接2

4. 寄语:海阔凭鱼跃,天高任鸟飞

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

5.Flink对接Kafka入门 的相关文章

  • Docker入门之二Docker原理、常用命令

    一 底层原理 Docker是什么工作的 xff1f Docker是一个 Client Server 结构的系统 xff0c Docker的守护进程运行在主机上 通过 Socket 从客户端访问 xff01 Dockerserver接收到 D
  • Docker入门之三容器命令

    一 容器命令 说明 xff1a 有了镜像才能创建容器 xff0c 首先下载一个centos镜像测试 1 下载镜像 docker pull centos 2 新建容器并启动 docker run docker run span class t

随机推荐

  • 【stm32f103】认识GPIO

    认识GPIO 什么是GPIOGPIO基本结构保护二极管输出模式推挽输出开漏输出 输出数据寄存器 xff08 参考手册8 2 xff09 GPIOx ODRGPIOx BSRR 输入模式 GPIO工作模式输入模式 xff08 模拟 浮空 上拉
  • 【ubuntu】Windows10远程桌面连接ubuntu20.04【未完待续】【由于特别卡顿,不好用,不续了】

    先上结果 环境 硬件 xff1a Jetson Xavier NX 套件 系统 xff1a Ubuntu 20 04 43 Windows10 软件 xff1a Xorg xrdp xubuntu desktop 解决 0 问题 使用Jet
  • 5G的介绍与个人理解

    1 5G概念 移动通讯自20世纪80年代诞生以来 xff0c 经过了三十多年的爆发式增长 xff0c 已经成为连接人类社会的基础信息网络 随着4G进入规模商用阶段 xff0c 面向2020年及未来的第五代移动通讯 xff0c 已成为全球研发
  • Linux系统安装详解

    前言 xff1a 随着开源软件在世界范围内影响力日益增强 xff0c Linux服务操作系统在整个服务器操作系统市场格局中占据很大的市场份额 xff0c 已经形成大规模市场应用局面 xff0c 尤其在政府 金融 农业 交通 电信等国家关键领
  • Tomcat 服务器的部署

    前言 目录 前言 一 Tomcat介绍 二 Tomcat组件 三 Tomcat部署步骤 Tomcat各目录 四 Tomcat 优化 Tomcat服务器是一个免费的开放源代码的Web应用服务器 xff0c 属于轻量级应用服务器 xff0c 在
  • mysql数据库管理

    目录 一 数据库结构 二 常用的数据类型 三 查看数据库结构 1 查看当前服务器中的数据库 2 查看数据库中包含的表 3 查看表的结构 xff08 字段 xff09 四 SQL语句 xff08 一 xff09 DDL xff08 数据定义语
  • redis实战主从复制和搭建哨兵

    目录 前言 1 搭建主从复制 1 主msater配置 2 从slave配置 3 验证结果 二 搭建哨兵 1 在所有节点修改配置文件 2 启动哨兵 3 模拟故障 三 总结 前言 接着上回这次带来的实战主从复制和搭建哨兵 1 搭建主从复制 环境
  • ELK日志分析系统

    目录 一 ELK日志分析系统简介 1 日志服务器的优缺点 2 ELK是什么 xff1f 2 1 Logstash管理包含四种工具 2 2 日志处理步骤 二 Elasticsearch的基础核心概念 三 Logstash介绍 四 Kibana
  • Docker网络和数据卷

    目录 一 Docker 网络模式 1 docker容器的虚拟网关 2 Docker的网络模式 3 Docker自定义网络 二 Docker数据卷 1 数据卷 2 数据卷容器 3 容器互联 使用centos镜像 一 Docker 网络模式 1
  • C++ string的格式化

    lt sstream gt 库定义了三种类 xff1a istringstream ostringstream和stringstream xff0c 分别用来进行流的输入 输出和输入输出操作 可以利用它实现输入输出的格式化 xff0c 下面
  • nginx日志格式分析

    先随便截取一个nginx标准日志 xff1a 62 173 145 171 12 Jan 2020 17 23 54 43 0800 34 GET vvx 000000000000 cfg HTTP 1 1 34 404 169 34 34
  • Mybatis如何实现分页

    Mybatis如何实现分页 关键字limit实现分页 Interceptor Plugin实现分页 首先定一个拦截器 拦截器会拦截所有以ByPage结尾的方法 xff0c 然后拼接sql 语句的limit关键字实现分页 span class
  • 万用表蜂鸣档使用

    学习笔记 电路调试 万用表的使用 今天焊接 调试学校的51开发学习板 xff0c 真是个血汗的泪程 xff0c 不过知道一些东西 xff0c 现在分享出来 蜂鸣档 大宝贝 此位 xff0c 表示此时红黑表笔所接电路断路 xff08 个人理解
  • AD导出的Pdf原理图显示不全

    打开设置 按如图 操作 若打印出的 PDF字显示不全 按如下操作 61 61 双击TXT文档进入设置 xff0c 将字体改成宋体 xff0c 如下图所示
  • Proteus 网络名的添加

    1 首先先连接出一条线出来 2 接下来单机左边边框的键 xff1a 3 鼠标放入绿色直线上 xff0c 并单击它 xff08 注意 xff01 xff09 不要单击绿点 xff0c 否则线只会身长 然后在string下输入你的网络名即可
  • AD使用笔记1 原理图与PCB布局同步实现

    这里以AD20为例子 xff0c 单击AD上方的设置标件 选择出Sysetm下的Navigation 单击交互选择 xff0c 即可实现AD原理图与PCB同步布局实现
  • 按键消抖程序

    同步时钟域的应用 按键消抖程序 机械按键的抖动示意图 采用20ms按键消抖 FPGA 的晶振时钟一般为 50Mhz xff0c 也就是 每个时钟周期为 20ns xff08 1 50Mhz 61 20ns xff09 xff0c 那么计数值
  • FPGA &&双按键控制&&LED呼吸灯实现

    FPGA amp amp 双按键控制 amp amp LED呼吸灯实现 文章目录 FPGA amp amp 双按键控制 amp amp LED呼吸灯实现1 按键脉冲设计1 1按键脉冲信号代码1 2 按键脉冲Testbench代码1 3 仿真
  • 5.Flink对接Kafka入门

    Flink Connector Kafka 1 Kafka1 1 Kafka官网 http kafka apache org 1 2 Kafka 简述1 3 Kafka特性1 4 kafka的应用场景1 5 kafka manager的部署
  • 利用 Selenium WebDriver + Grid2 实现并行的浏览器端性能测试

    在 Web 2 0 应用中 xff0c 页面装载时间和浏览器渲染时间将成为决定性能的关键因素 我们在测试过程中不仅需要手动触发性能测试工具 xff0c 而且需要模仿不同的用户行为 xff0c 包括不同的浏览器 不同的网络条件和不同的使用习惯