Flink消费Rabbit数据,写入HDFS - 使用 BucketingSink

2023-11-12

一、应用场景:

Flink 消费 Kafka 数据进行实时处理,并将结果写入 HDFS。

二、Bucketing File Sink

由于流数据本身是无界的,所以,流数据将数据写入到分桶(bucket)中。默认使用基于系统时间(yyyy-MM-dd--HH,0时区)的分桶策略。在分桶中,又根据滚动策略,将输出拆分为 part 文件。

1、Flink 提供了两个分桶策略,分桶策略实现了 

org.apache.flink.streaming.connectors.fs.bucketing.Bucketer 接口:

  • BasePathBucketer,不分桶,所有文件写到根目录;
  • DateTimeBucketer,基于系统时间(yyyy-MM-dd--HH)分桶。

除此之外,还可以实现Bucketer接口,自定义分桶策略。

2、Flink 提供了两种writer方式,它们实现了 

org.apache.flink.streaming.connectors.fs.Writer 接口:

  • StringWriter 是系统默认的写入方式,调用toString()方法,同时换行写入;

  • SequenceFileWriter 是Hadoop序列文件写入方式,可配置压缩。

除此之外,还可以实现Writer接口,自定义Writer方式。

具体说明见Flink官网

三、编码。

1、pom添加依赖。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

 2、添加Source。

// 源数据
val kafkaProps = new Properties()
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.DATA_KAFKA_BROKER)
kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "user-login-info")
val consumer010 = new FlinkKafkaConsumer010[String](Constant.DATA_KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps)
val transaction = env.addSource(consumer010)

val sourceStream = transaction.map(s => MemberLogInfo.buildMemberLogInfo(s))
  .uid("source-map")
  .filter(s => s != null)
  .uid("source-filter")

3、添加Sink。

val sink = new BucketingSink[MemberLogInfo](Constant.HDFS_SAVE_PATH_LOGIN)
  .setBucketer(new MemberBucket()) // 自定义桶名称
  .setWriter(new MemberWriter()) // 自定义输出
  .setBatchSize(120*1024*1024) // 设置每个文件的最大大小 ,默认是384M。这里设置为120M
  .setBatchRolloverInterval(Long.MaxValue) // 滚动写入新文件的时间,默认无限大
  .setInactiveBucketCheckInterval(60*1000) // 1分钟检查一次不写入的文件
  .setInactiveBucketThreshold(5*60*1000) // 5min不写入,就滚动写入新的文件
  .setPartSuffix(".log") // 文件后缀

loginStream.addSink(sink)

4、自定义分桶名称

public class MemberBucket implements Bucketer<MemberLogInfo> {
    private static final long serialVersionUID = 10000L;

    @Override
    public Path getBucketPath(Clock clock, Path path, MemberLogInfo memberLogInfo) {
        String day = DateUtil.format(new Date(memberLogInfo.getTimestamp()), "yyyy-MM-dd");
        return new Path(path + "/" + day);
    }
}

5、自定义writer写入

@Override
public void write(MemberLogInfo element) throws IOException {
    // 输出字符串内容
    String content = element.getIggid() + "\t" + element.getType() + "\t" + element.getGameId() + "\t" +
            element.getUrlDeviceName() + "\t" + element.getDeviceId() + "\t" + element.getTimestamp() + "\t" +
            element.getSourceIp() + "\t" + element.getEventType();
    byte[] s = content.getBytes(charsetName);
    FSDataOutputStream outputStream = this.getStream();
    outputStream.write(s);
    outputStream.write(10); // 换行符
}

四、总结。

截止目前,Flink 的 Bucketing File Sink 仍存在不少问题,如:

  • 不支持写入到 Hive。
  • 写入HDFS时,会产生大量的小文件。
  • 当程序突然停止时,文件仍处于inprogress状态。
  • 默认桶下的文件名是 part-{parallel-task}-{count}。当程序重启时,从上次编号值加1继续开始。前提是程序是正常停止
  • 除了使用BucketingSink外,还可以使用StreamingFileSink,见 这里
  • BucketingSink API 参见 这里
  • 间隔检查时间, 是以运行时间开始算的。如10:00:10运行程序,inactiveBucketThreshold=300s,inactiveBucketCheckInterval=60s,最后一次写入时间是10:05:30。则在10:11:10时,将inprocess文件转为正式文件。

如果有写的不对的地方,欢迎大家指正。有什么疑问,欢迎加QQ群:176098255

一起学习
 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink消费Rabbit数据,写入HDFS - 使用 BucketingSink 的相关文章

  • Flink实战之实时风控规则引擎

    问题导读 1 怎样构建一个风控业务架构 2 风控规则模型有哪些 3 怎样实现Flink CEP 动态更新 一 项目背景 目前钱大妈基于云原生大数据组件 DataWorks MaxCompute Flink Hologres 构建了离线和实时
  • 【Flink系列1】flink与spark的区别

    Flink简介 spark基本架构 flink基本架构 Spark提出的最主要抽象概念是弹性分布式数据集 RDD flink支持增量迭代计算 基于流执行引擎 Flink提供了诸多更高抽象层的API以方便用户编写分布式任务 1 DataSet
  • Flink之IntervalJoin介绍

    InterValJoin算子 间隔流 一条流去join另一条流去过去一段时间内的数据 该算子将keyedStream与keyedStream转化为DataStream 再给定的时间边界内 默认包含边界 相当于一个窗口 按指定的key对俩个K
  • 作为一个编程新手,我再也不怕Flink迷了我的眼!

    欢迎大家前往腾讯云 社区 获取更多腾讯海量技术实践干货哦 本文由kyledong 发表于云 社区专栏 使用 Flink 编写处理逻辑时 新手总是容易被林林总总的概念所混淆 为什么 Flink 有那么多的类型声明方式 BasicTypeInf
  • Flink 1.17教程:基本合流操作

    基本合流操作 在实际应用中 我们经常会遇到来源不同的多条流 需要将它们的数据进行联合处理 所以Flink中合流的操作会更加普遍 对应的API也更加丰富 联合 Union 最简单的合流操作 就是直接将多条流合在一起 叫作流的 联合 union
  • 大数据技术Flink详解

    一 有状态的流式处理 Apache Flink 是一个分布式流处理器 具有直观和富有表现力的API 可实现有状态的流处理应用程序 它以容错的方式有效地大规模运行这些应用程序 Flink 于2014 年4 月加入Apache 软件基金会作为孵
  • 微众银行DSS部署单机-普通版

    DSS 普通版部署 我的服务器 我的配置 vim conf config sh vim conf db sh QA 我的服务器 centos 7 0 8C16G 100G机械硬盘 我的配置 bashrc文件内容 JDK export JAV
  • flink源码阅读---Flink intervalJoin 使用和原理分析

    1 前言 Flink中基于DataStream的join 只能实现在同一个窗口的两个数据流进行join 但是在实际中常常会存在数据乱序或者延时的情况 导致两个流的数据进度不一致 就会出现数据跨窗口的情况 那么数据就无法在同一个窗口内join
  • 206.Flink(一):flink概述,flink集群搭建,flink中执行任务,单节点、yarn运行模式,三种部署模式的具体实现

    一 Flink概述 1 基本描述 Flink官网地址 Apache Flink Stateful Computations over Data Streams Apache Flink Flink是一个框架和分布式处理引擎 用于对无界和有界
  • [1132]Flink与Kafka版本对应关系

    以下为Flink和Kafka的版本对照表 Flink版本 Kafka版本 1 12 X 2 4 1 1 11 X 2 4 1 1 10 X 2 2 1 1 9 X 2 2 0 1 8 X 2 0 1 1 7 X 2 0 1 0 10 x 0
  • flink中AggregateFunction 执行步骤以及含义全网详细解释

    package operator import org apache flink api common functions AggregateFunction import org apache flink api common funct
  • 【Flink Rest-ful API 】

    Flink 有了一些查询job状态指标的API 这些监控 API is a REST ful API 接受 HTTP 请求并返回JSON data 这些监控API以jobManager中web server 为基础 默认其监听端口为8081
  • 浅谈Hadoop体系和MPP体系

    浅谈Hadoop体系和MPP体系 引言 如题 在大数据发展至今 为了应对日益繁多的数据分析处理 和解决客户各种奇思妙 怪 想需求 形形色色的大数据处理的框架和对应的数据存储手段层出不穷 有老当益壮的Hadoop体系 依靠Hadoop巨大的社
  • Flink常用算子总结

    Streaming 算子 Map 将元素处理转换 再输出 map算子对一个DataStream中的每个元素使用用户自定义的Mapper函数进行处理 每个输入元素对应一个输出元素 最终整个数据流被转换成一个新的DataStream 输出的数据
  • Flink checkPoint和SavePoint

    savepoint和checkpoint都是flink为容错提供的强大功能特性 能够自动或手动保存job的运行状态 两者区别 checkpoint 应用定时触发 用户保存状态 会过期 内部应用失败重启的时候启用 但是手动cancel时 会删
  • Flink on Zeppelin-2

    Flink Interpreter类型 首先介绍下Zeppelin中的Flink Interpreter类型 Zeppelin的Flink Interpreter支持Flink的所有API DataSet DataStream Table
  • 【硬刚大数据之学习路线篇】2021年从零到大数据专家的学习指南(全面升级版)

    欢迎关注博客主页 https blog csdn net u013411339 本文由 王知无 原创 首发于 CSDN博客 本文首发CSDN论坛 未经过官方和本人允许 严禁转载 欢迎点赞 收藏 留言 欢迎留言交流 声明 本篇博客在我之前发表
  • 【双流(1)|原理】flink 双流join原理(1)Interval Join:state过大? 回撤现象出现时,sink如何处理,还有如何优化回撤?数据出现shuffle时join是如何处理的?

    先思考几个问题 双流join的基本原理是什么 双流join的分类有哪些 具体的实现是什么 双流join产生的问题 回撤的情况以及优化的可能性 多流join数据倾斜与性能优化思路 多流join的可能性 文章目录 一 流的join和表的join
  • Flink_05_状态(个人总结)

    声明 1 本文为我的个人复习总结 并非那种从零基础开始普及知识 内容详细全面 言辞官方的文章 2 由于是个人总结 所以用最精简的话语来写文章 3 若有错误不当之处 请指出 状态 状态就是一块内存 一个变量 如果要访问历史窗口 或批次 的数据
  • Apache Flink(十五):Flink任务提交模式

    个人主页 IT贫道 大数据OLAP体系技术栈 Apache Doris Clickhouse 技术 CSDN博客 私聊博主 加入大数据技术讨论群聊 获取更多大数据资料 博主个人B栈地址 豹哥教你大数据的个人空间 豹哥教你大数据个人主页 哔哩

随机推荐

  • 计算宽度_桥梁有效宽度计算,看看很有用!

    有效分布宽度实质上是剪力滞效应的反应 由于目前桥梁设计多用二维平面解析 故荷载的有效分布宽度仍需要计算 不过还有很多深层次问题还不能合理解答 有待进一步研究和探讨 各中间跨正弯矩段取该跨计算跨径的0 2倍 边跨正弯矩段取该跨计算跨径的0 2
  • 回溯法-装载问题

    子集树问题 和 子集树的0 1背包问题类似 但是没有考虑价格 include
  • 【Parallels Desktop】解决Sorry, This Application Cannot Be Run Under A Virtual Machine

    问题描述 解决步骤 Win R 或Cmd R 打开 运行 窗口 输入regedit并点击 确定 打开注册表编辑器 依次展开HKEY LOCAL MACHINE HARDWARE ACPI DSDT文件夹 鼠标右键点击PRLS 选择 重命名
  • Redis第二讲 Redis数据持久化AOF和RDB

    RDB快照 snapshot 在默认情况下 Redis 将内存数据库快照保存在名字为 dump rdb 的二进制文件中 你可以对 Redis 进行设置 让它在 N 秒内数据集至少有 M 个改动 这一条件被满足时 自动保存一次数据集 save
  • 【修仙境界】等级划分

    文章目录 一 下境界 1 炼气 一共13层 2 筑基 分初 中 后期和大圆满 3 结丹 分初 中 后期和大圆满 4 元婴 分初 中 后期和大圆满 5 化神 分初 中 后期和大圆满 二 中境界 1 炼虚 分初 中 后期和大圆满 2 合体 分初
  • C++ 编程出错的地方(考试选择题易错点)

    一 int IsSvn int n if n 7 0 return 1 要判断这个数能不能被7整除 你就只返回1吗 那岂不是只返回1 没有0的情况了 应该改为 int IsSvn int n if n 7 0 return 1 else r
  • 2021年电赛模块化程序总结

    文章目录 1 ADC0804 2 LCD1602 3 AD9854 1 ADC0804 集成A D转换器品种繁多 选用时应综合考虑各种因素选取集成芯片 一般逐次比较型A D转换器用的比较多 ADC0804就是这类单片集成A D转换器 ADC
  • 9、HTML:有序列表(ol),无序列表(ul),描述列表(dl、dt、dd)详解

    1 什么是列表 什么是列表 什么是有序列表 什么是无序列表 上面写的 3 句话就是一个列表 你懂得 2 有序列表 有序列表 英文叫做 ordered list 所以标签也是取这个词组的首字母 ol ol标签括起来的范围就是有序列表的范围 而
  • Win11怎么修改c盘用户名?

    Win11怎么修改c盘用户名 不知道的小伙伴们可以学起来了 谨慎操作 以下的方法提供给你 希望对你有所帮助 Win11更改C盘user用户名教程 一 开启Administrator权限并登入 搜索框搜索cmd 右击以管理员身份运行 出现cm
  • C++每日一问:C++ 内存管理——内存泄漏及处理

    2 内存泄漏 2 1 C 中动态内存分配引发问题的解决方案 假设我们要开发一个String类 它可以方便地处理字符串数据 我们可以在类中声明一个数组 考虑到有时候字符串极长 我们可以把数组大小设为200 但一般的情况下又不需要这么多的空间
  • 唯一分解定理(分解质因子)

    唯一分解定理 每个大于一的自然数均可写为质数的积 而且这些素因子按大小排列之后 写法只有一种方式 最简单的写法 include
  • matlab绘制正弦函数、幅度调制初步、Inner matrix dimensions must agree错误

    以sin 2 f t 表达式来绘制正弦图像 必须给定数值序列才能绘制出图像 t必须给定一个数值序列 然后计算出 y sin 函数值序列 以t为横轴 y为纵轴 就绘制出了图像 先给出f 4 在这里是有几个周期 采样率Fs 100 matlab
  • flask从入门到精通,知识讲解+代码演示 day1

    flask从入门到精通 知识讲解 代码演示 day1 文章目录 flask从入门到精通 知识讲解 代码演示 day1 一 flask是什么 二 使用步骤 1 创造flask项目 2 初入flask 3 flask代码初运行 4 flask从
  • Spring Cloud实战(五)-声明式接口模块

    接着上一篇 Spring Cloud实战 四 配置中心 现在开始搭建api模块 一 声明式接口模块api 1 pom xml
  • 数学建模-相关性分析(Matlab)

    注意 代码文件仅供参考 一定不要直接用于自己的数模论文中 国赛对于论文的查重要求非常严格 代码雷同也算作抄袭 如何修改代码避免查重的方法 https www bilibili com video av59423231 清风数学建模 一 基础
  • GPU与GPGPU泛淡

    GPU与GPGPU泛淡 GPU Graphics Processing Unit 也即显卡 是一种专门在个人电脑 工作站 游戏机和一些移动设备 如平板电脑 智能手机等 上作图像运算工作的微处理器 它已经是个人PC和移动设备上不可或缺的芯片
  • C#数据类型之枚举类型

    一 枚举类型的定义 public enum 枚举名称 枚举数据类型 枚举的数据类型可以省略 默认类型为int 枚举项1 枚举项的值 枚举项的值是整数可以自己设置 枚举项2 枚举项3 例如 public enum month ushort 一
  • Clion + mysql (win/Mac + 本地/远程)

    新手教程 那些年我用clion操作mysql的一些经验教训 本文目录 使用clion自带的数据库工具 对数据库进行操作 连接本地数据库 建库 建表 编辑表格 修改字段名 查询数据 插入新的数据 sql常用语句 mysql版 win Clio
  • 口罩检测——数据准备(2)

    文章目录 前言 一 数据介绍 二 数据标注 三 数据转换 总结 前言 上一篇文章中小编讲解了口罩检测的环境要求 在这一篇文章中我们就正式进入项目的讲解 我们从数据准备开始 数据是模型快乐的源泉 没有高质量的数据 再好的模型也白搭 一 数据介
  • Flink消费Rabbit数据,写入HDFS - 使用 BucketingSink

    一 应用场景 Flink 消费 Kafka 数据进行实时处理 并将结果写入 HDFS 二 Bucketing File Sink 由于流数据本身是无界的 所以 流数据将数据写入到分桶 bucket 中 默认使用基于系统时间 yyyy MM