flink kafka生产者在检查点恢复时以一次模式发送重复消息

2024-02-25

我正在写一个案例来测试 flink 两步提交,下面是概述。

sink kafka曾经是kafka生产者。sink stepmysql接收器是否扩展two step commit. sink comparemysql接收器是否扩展two step commit,并且这个接收器偶尔会抛出异常来模拟检查点失败。

当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但是kafka消费者将读取上次成功的偏移量,并且kafka生产者会生成消息,即使他在这个检查点失败之前已经完成了.

在这种情况下如何避免重复消息?

感谢帮助。

env:

  • 弗林克1.9.1

  • java 1.8

  • 卡夫卡2.11

卡夫卡生产者代码:

        dataStreamReduce.addSink(new FlinkKafkaProducer<>(
                "flink_output",
                new KafkaSerializationSchema<Tuple4<String, String, String, Long>>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(Tuple4<String, String, String, Long> element, @Nullable Long timestamp) {
                        UUID uuid = UUID.randomUUID();
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("uuid", uuid.toString());
                        jsonObject.put("key1", element.f0);
                        jsonObject.put("key2", element.f1);
                        jsonObject.put("key3", element.f2);
                        jsonObject.put("indicate", element.f3);
                        return new ProducerRecord<>("flink_output", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
                    }
                },
                kafkaProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        )).name("sink kafka");

检查点设置:

        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(10000);
        executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
        executionEnvironment.getCheckpointConfig().setPreferCheckpointForRecovery(true);

mysql 接收器:

dataStreamReduce.addSink(
                new TwoPhaseCommitSinkFunction<Tuple4<String, String, String, Long>,
                        Connection, Void>
                        (new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE) {

                    int count = 0;
                    Connection connection;

                    @Override
                    protected void invoke(Connection transaction, Tuple4<String, String, String, Long> value, Context context) throws Exception {
                        if (count > 10) {
                            throw new Exception("compare test exception.");
                        }
                        PreparedStatement ps = transaction.prepareStatement(
                                " insert into test_two_step_compare(slot_time, key1, key2, key3, indicate) " +
                                        " values(?, ?, ?, ?, ?) " +
                                        " ON DUPLICATE KEY UPDATE indicate = indicate + values(indicate) "
                        );
                        ps.setString(1, context.timestamp().toString());
                        ps.setString(2, value.f0);
                        ps.setString(3, value.f1);
                        ps.setString(4, value.f1);
                        ps.setLong(5, value.f3);
                        ps.execute();
                        ps.close();
                        count += 1;
                    }

                    @Override
                    protected Connection beginTransaction() throws Exception {
                        LOGGER.error("compare in begin transaction");
                        try {
                            if (connection.isClosed()) {
                                throw new Exception("mysql connection closed");
                            }
                        }catch (Exception e) {
                            LOGGER.error("mysql connection is error: " + e.toString());
                            LOGGER.error("reconnect mysql connection");
                            String jdbcURI = "jdbc:mysql://";
                            Class.forName("com.mysql.jdbc.Driver");
                            Connection connection = DriverManager.getConnection(jdbcURI);
                            connection.setAutoCommit(false);
                            this.connection = connection;
                        }
                        return this.connection;
                    }

                    @Override
                    protected void preCommit(Connection transaction) throws Exception {
                        LOGGER.error("compare in pre Commit");
                    }

                    @Override
                    protected void commit(Connection transaction) {
                        LOGGER.error("compare in commit");
                        try {
                            transaction.commit();
                        } catch (Exception e) {
                            LOGGER.error("compare Commit error: " + e.toString());
                        }
                    }

                    @Override
                    protected void abort(Connection transaction) {
                        LOGGER.error("compare in abort");
                        try {
                            transaction.rollback();
                        } catch (Exception e) {
                            LOGGER.error("compare abort error." + e.toString());
                        }
                    }

                    @Override
                    protected void recoverAndCommit(Connection transaction) {
                        super.recoverAndCommit(transaction);
                        LOGGER.error("compare in recover And Commit");
                    }

                    @Override
                    protected void recoverAndAbort(Connection transaction) {
                        super.recoverAndAbort(transaction);
                        LOGGER.error("compare in recover And Abort");
                    }
                })
                .setParallelism(1).name("sink compare");

我不太确定我是否正确理解了这个问题:

当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但是kafka生产者将读取上次成功的偏移量并生成消息,即使他在这个检查点失败之前已经完成了。

Kafka 生产者没有读取任何数据。因此,我假设您的整个管道重新读取旧的偏移量并产生重复项。如果是这样,你需要了解 Flink 如何确保恰好一次。

  1. 创建定期检查点是为了在发生故障时保持一致的状态。
  2. 这些检查点包含检查点时最后一次成功读取记录的偏移量。
  3. 恢复后,Flink 将从上次成功检查点中存储的偏移量重新读取所有记录。因此,将重放上次检查点和失败之间生成的相同记录。
  4. 重播记录将恢复故障前的状态。
  5. 它将产生复制源自重播输入记录的输出。
  6. 接收器有责任确保不会将重复项有效写入目标系统。

对于最后一点,有两种选择:

  • 仅在写入检查点时输出数据,这样目标中就不会出现有效的重复项。这种简单的方法非常通用(独立于接收器),但会增加检查点间隔到延迟。
  • 让接收器对输出进行重复数据删除。

后一个选项用于 Kafka 接收器。它使用 Kafka 事务来删除重复数据。为了避免消费者端重复,您需要确保它没有读取文档中提到的未提交数据 https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance。还要确保事务超时足够大,以免在故障和恢复之间丢弃数据。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink kafka生产者在检查点恢复时以一次模式发送重复消息 的相关文章

  • 在 Biztalk WCF 适配器中启用流式处理

    我想从 biztalk 发送一个带有一些元数据的大 blob 到 WCF 服务 我可以自由控制服务设置和 biztalk 设置 尽管我必须使用 http 来访问该服务 该 blob 很可能在 GB 范围内 因此使用 BasicHttp 或
  • Flink TaskManager 超时?

    我正在运行 Flink 应用程序 通过 Yarn 似乎有时任务管理器会随机超时 这是错误 java util concurrent TimeoutException Heartbeat of TaskManager with id some
  • Apache Flink 环境中的 AWS SDK 冲突

    我正在尝试将我的作业部署到 Flink 环境 但总是收到错误 java lang NoSuchMethodError com amazonaws AmazonWebServiceRequest putCustomQueryParameter
  • 如何从 Google 云端硬盘视频获取redirector.googlevideo.com 链接

    我有一个谷歌驱动器视频文件 例如https drive google com file d FILE ID view https drive google com file d FILE ID view 并且我想获取其redirector
  • Apache Flink、JDBC 和 fat jar 是否存在类加载问题?

    使用 Apache Flink 1 8 并尝试运行RichAsyncFunction 我得到No Suitable Driver Found初始化 Hikari 池时出错RichAsyncFunction open 在 IDE 中它运行得很
  • 如何在流式传输之前知道音频歌曲的持续时间?

    我正在制作一个流音频歌曲的应用程序 在自定义媒体播放器中 我必须显示该音频文件的总持续时间 如果一首音频歌曲是 SDCard 我可以使用以下方法知道它的持续时间 MediaPlayer player public double durati
  • HTML5 将 png 缓冲区加载到画布中(用于流式传输)

    通过 websocket 我检索 PNG 格式的图像的二进制缓冲区 类似的东西 http blog nihilogic dk 2008 05 compression using canvas and png html 我想将此 PNG 缓冲
  • C# 中的 StreamReader 和缓冲区

    我对 StreamReader 的缓冲区使用有疑问 这里 http msdn microsoft com en us library system io streamreader aspx http msdn microsoft com e
  • 结构化 Spark 流指标检索

    我有一个具有结构化 Spark 流的应用程序 我想获取一些指标 例如调度延迟 延迟等 通常 此类指标可以在 Spark UI Streaming 选项卡中找到 但是 结构化流不存在此类功能我知道 那么如何获取这些指标值呢 目前 我尝试使用查
  • Flink 流顺序

    Flink 能保证流的执行顺序吗 我有两个 Kafka 主题 每个主题都有一个分区 流 1 和流 2 并使用keyBy 流由一个处理coprocess功能 在我的测试过程中 我可以看到两个流的内容并不总是按顺序执行 我可以将并行度设置为 1
  • python-twitter 流 api 支持/示例

    我正在与python twitter http code google com p python twitter 并意识到 Twitter 提供流媒体api http dev twitter com pages streaming api实
  • 使用 slick/scala 进行流式传输

    我正在研究 scala slick 流 并试图了解它是如何工作的 这是我的测试代码 val bigdata TableQuery BigData val x db stream bigdata result transactionally
  • 找到实际的 RTMP 流 URL?

    让我举个例子 这是视频嵌入代码 div Loading the player div
  • 在 Chrome 和 IE11 上流式传输可观看的 .mjpeg 视频

    我在本地托管了一个 mjpeg 文件http 127 0 0 1 web Images Stream somevideo mjpeg http 127 0 0 1 web Images Stream somevideo mjpeg 我在我的
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • 播放 video.js ustream m3u8 文件流

    我尝试在网页中播放带有 video js 的 m3u8 文件流 但我无法做到这一点 我不知道错误在哪里
  • Cassandra Pojo Sink Flink 中的动态表名称

    我是 Apache Flink 的新手 我正在使用 Pojo Sink 将数据加载到 Cassandra 中 现在 我在以下命令的帮助下指定表和键空间名称 Table注解 现在 我想在运行时动态传递表名称和键空间名称 以便可以将数据加载到用
  • Apache Flink - “keyBy”中的异常处理

    由于代码错误或缺乏验证 进入 Flink 作业的数据可能会触发异常 我的目标是提供一致的异常处理方式 我们的团队可以在 Flink 作业中使用这种方式 而不会导致生产中出现任何停机 重启策略似乎不适用于此处 因为 简单的重启无法解决问题 我
  • Azure 和直播

    我正在尝试使用 Azure 制作实时视频流 但我真的不知道应该先尝试什么 首先 有一个网站 上面有最新版本的 Flowplayer 现在我需要知道如何将网络摄像头的视频流传输到 Azure 我需要什么样的软件 我正在尝试使用 Express
  • Python 2.7:支持一个端口上多个连接的流式 HTTP 服务器

    我正在寻找一个标准的Python 2 7包 提供一个同时执行的HTTP服务器流媒体同一端口号上的连接 嘿 各位版主 请停止将我的问题标记为想要以非流媒体方式提供服务的问题的重复项 例如 python 中的多线程 Web 服务器 https

随机推荐

  • grails 将 svn 修订版添加到 app.version

    我正在尝试将 svn 修订版添加到我的app version不需要 ant 或其他类似的外部工具 看来我可以加入 Events groovy对此 但文档相对较少 有人知道怎么做吗 This http grails 1312388 n4 na
  • JApplet NoClassDefFoundError

    我正在 Eclipse 上编写 Japplet 它时不时地停止在 html 页面上工作 以下是错误 Exception in thread thread applet main MapGenerator class 1 java lang
  • 有没有一种简单的方法可以从 .NET 用户控件中删除“ct100”前缀?

    长话短说 几十个页面没有使用母版页 对于新模块 我创建了一个带有菜单控件的母版页 菜单控件已经存在 这样我就可以在我现在创建的大约六个页面上获得相同的外观 由于内容页使用母版页 因此菜单控件的名称更改为ct100 Menu1而不仅仅是Men
  • 使用 C# 编辑 DataGridview 并将其保存在数据库表中

    我使用 MYSQL Server 作为我的项目后端 我有一个 DataGridView 它填充了数据库中的数据 当我在 DataGridView 单元格中进行更改并单击保存按钮时 数据需要在 DataGridView 和数据库表中更改 这是
  • 新的CSS样式声明

    我正在尝试使用浏览器的内置类型CSSStyleDeclaration以编程方式传递和修改样式 这很方便 因为 cssText财产 然而 new CSSStyleDeclaration 抛出一个Illegal Constructor错误 所以
  • Gradle 以非零退出值 1 完成

    我刚刚在 libgdx 中生成了一个项目并导入到 eclipse 编译了一些依赖项 现在我得到了 Error Gradle Execution failed for task android compileDebugAidl com and
  • 如何选择自动完成下拉列表中的第一个元素

    如果没有元素 任何人都可以帮助我如何选择自动完成下拉列表的第一个元素 被选中 我尝试使用自动对焦 为键盘事件工作 如果我使用鼠标 第一个元素不会选择自动聚焦的元素 visit here https stackoverflow com a 9
  • 在 Swift 中使用 NSURL 读取文本文件

    我想读取并显示位于 URL 的文本文件的内容 我正在为 Yosemite 编写 Mac 应用程序 我需要使用 Swift 但我坚持这样做 这是我的代码 let messageURL NSURL string http localhost 8
  • 任务并行库 INotifyPropertyChanged 不抛出异常?

    我有一个 wpf 项目 我在绑定到文本框的属性上使用 INotifyPropertyChanged 我正在使用任务 TaskParallelLibrary 在不同的线程上更新此值 它已正确更新并且不会引发异常 我认为它会抛出异常 因为它是在
  • Angular 4 - Http 请求错误:您在需要流的地方提供了“未定义”

    在尝试执行 HTTP Post 请求时 我收到以下错误 auth service ts c694 156 请求新的时出错 密码 错误消息 您在流所在位置提供了 未定义 预期的 您可以提供 Observable Promise Array 或
  • 如何使用uiwebview显示一些网页?

    如何使用 uiwebview 显示某个 url 请求的网页 我不知道该怎么做 谁能告诉我该怎么做 有开源的吗 谢谢 NSString urlAddress http www google com NSURL url NSURL URLWit
  • 如何更加重视机器学习中的某些特征?

    如果使用像 scikit learn 这样的库 如何为 SVM 这样的分类器的输入中的某些特征分配更多权重 这是人们做还是不做的事 首先 你可能不应该这样做 机器学习的整个概念是使用统计分析分配最佳权重 你在这里干扰了整个概念 因此你需要非
  • 将列表传递给 Tcl 过程

    将列表传递给 Tcl 过程的规范方法是什么 如果我能得到它 以便列表自动扩展为可变数量的参数 我真的很喜欢它 所以像这样 set a b c myprocedure option1 option2 a and myprocedure opt
  • 在 IE 和 Chrome 中上传之前预览图像

    我正在尝试设计一个模块 在用户将图像上传到数据库之前 我想在其中向用户显示图像的预览 我找到了一个适用于 Firefox 但不适用于 IE 和 Chrome 的解决方案 有人可以帮助我吗 这是我的代码 function imageURL i
  • 这个空白隐藏在哪里?

    我有一个字符向量 它是一些 PDF 抓取的文件pdftotext 命令行工具 一切都 幸福地 排列得很好 然而 该向量充满了一种空白类型 无法使用正则表达式 gt test 1 Address Clinic Information Stor
  • whereis python 和 python --version 之间的矛盾

    在一个 Python 环境中 我输入whereis python 并得到以下信息 python usr bin python2 6 usr bin python2 6 config usr bin python usr lib python
  • 如何通知用户NPM包版本更新?

    我用 Node JS 编写了一个 CLI 工具并发布到NPM https www npmjs com package rapid react 每次在终端中运行时 我都需要通知用户可用的新版本及其类型 补丁 次要 主要 以便他 她可以相应地更
  • 如何计算时间复杂度为 O(n log n) 的 XOR(二元)卷积

    是按位异或运算 我认为Karatsuba算法可能可以解决该问题 但是当我尝试在Karatsuba算法中使用XOR代替 时 很难得到子问题 The 卷积定理 https en wikipedia org wiki Convolution th
  • 为什么在 Python 中处理已排序数组并不比处理未排序数组快?

    在这篇文章中为什么处理排序数组比处理随机数组更快 https stackoverflow com questions 11227809 why is processing a sorted array faster than an unso
  • flink kafka生产者在检查点恢复时以一次模式发送重复消息

    我正在写一个案例来测试 flink 两步提交 下面是概述 sink kafka曾经是kafka生产者 sink stepmysql接收器是否扩展two step commit sink comparemysql接收器是否扩展two step