Kafka工具类

2023-11-10

package com.cnic.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.Properties;

public class MyKafkaUtil {

    private static Properties properties = new Properties();
    private static final String BOOTSTRAP_SERVERS = "hadoop102:9092";

    static {
        properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    }

    public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String group_id) {

        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group_id);

        return new FlinkKafkaConsumer<String>(topic,
                new KafkaDeserializationSchema<String>() {
                    @Override
                    public boolean isEndOfStream(String nextElement) {
                        return false;
                    }

                    @Override
                    public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
                        if (record == null || record.value() == null) {
                            return "";
                        } else {
                            return new String(record.value());
                        }
                    }

                    @Override
                    public TypeInformation<String> getProducedType() {
                        return BasicTypeInfo.STRING_TYPE_INFO;
                    }
                },
                properties);
    }

    public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
        return new FlinkKafkaProducer<String>(topic,
                new SimpleStringSchema(),
                properties);
    }

    /**
     * Kafka-Source DDL 语句
     *
     * @param topic   数据源主题
     * @param groupId 消费者组
     * @return 拼接好的 Kafka 数据源 DDL 语句
     */
    public static String getKafkaDDL(String topic, String groupId) {
        return " with ('connector' = 'kafka', " +
                " 'topic' = '" + topic + "'," +
                " 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
                " 'properties.group.id' = '" + groupId + "', " +
                " 'format' = 'json', " +
                " 'scan.startup.mode' = 'latest-offset')";
    }

    /**
     * Kafka-Sink DDL 语句
     *
     * @param topic 输出到 Kafka 的目标主题
     * @return 拼接好的 Kafka-Sink DDL 语句
     */
    public static String getUpsertKafkaDDL(String topic) {

        return "WITH ( " +
                "  'connector' = 'upsert-kafka', " +
                "  'topic' = '" + topic + "', " +
                "  'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
                "  'key.format' = 'json', " +
                "  'value.format' = 'json' " +
                ")";
    }

    public static String getTopicDbDDL(String groupId) {
        return "CREATE TABLE topic_db ( " +
                "  `database` String, " +
                "  `table` String, " +
                "  `type` String, " +
                "  `data` Map<String,String>, " +
                "  `old` Map<String,String>, " +
                "  `pt` AS PROCTIME() " +
                ")" + MyKafkaUtil.getKafkaDDL("topic_db", groupId);
    }


}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka工具类 的相关文章

  • Netbeans 8.1 Gnome 3 GTK+ UI 字体和选项卡高度

    我刚刚在运行 GNOME 3 桌面的 Ubuntu 16 04 上安装了 NetBeans 8 1 如果可能的话 我想继续使用 IDE 的 GTK 外观和感觉 但 UI 上的字体 尤其是选项卡中的字体 太小且重叠 我尝试添加 fontsiz
  • 有没有创建 Cron 表达式的 Java 代码? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我需要一个 Java 代码来根据用户输入创建一个 cron 表达式 用户输入是时间 频率和执行次数 只需从评论中添加 自己创建 即可
  • Java Logger 未记录到 Netbeans 中的输出

    我正在 Netbeans 中使用 Maven 启动一个 Java 项目 我编写了一些代码来使用 Logger 类进行日志记录 但是 日志记录似乎不起作用 在程序开始时 我运行 Logger getLogger ProjectMainClas
  • eclipse行号状态行贡献项是如何实现的?

    我需要更新状态行编辑器特定的信息 我已经有了自己的实现 但我想看看 eclipse 贡献项是如何实现的 它显示状态行中的行号 列位置 谁能指点一下 哪里可以找到源代码 提前致谢 亚历克斯 G 我一直在研究它 它非常复杂 我不确定我是否了解完
  • Android:文本淡入和淡出

    我已阅读此 stackoverflow 问题和答案 并尝试实现文本淡入和淡出 Android中如何让文字淡入淡出 https stackoverflow com questions 8627211 how to make text fade
  • 如何在单个查询中搜索 RealmObject 的 RealmList 字段

    假设我有一堂课 public class Company extends RealmObject private String companyId private RealmList
  • 如何在 Spring 中使 @PropertyResource 优先于任何其他 application.properties ?

    我正在尝试在类路径之外添加外部配置属性资源 它应该覆盖任何现有的属性 但以下方法不起作用 SpringBootApplication PropertySource d app properties public class MyClass
  • 如何在java中将日期格式从YYMMDD更改为YYYY-MM-DD? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我从机器可读代码中获取日期格式为 YYMMDD 如何将其更改为 YYYY MM DD 例如我收到 871223 YYMMDD 我想把它改成
  • 如何在字段值无效的情况下更改 Struts2 验证错误消息?

    我在 Web 表单上使用 Struts2 验证 如果字段假设为整数或日期 则
  • 如何仅从 Firestore 获取最新更新的数据?

    在 Firestore 上发现任何更改时始终获取整个文档 如何只获取最近更新的数据 这是我的数据 我需要在第一次加载时在聊天中按对象顺序 例如 2018 09 17 30 40 msg和sendby 并且如果数据更新则仅获取新的msg和se
  • 如何在 ant 中为 junit 测试设置 file.encoding?

    我还没有完全完成file encoding 和 ant https stackoverflow com questions 1339352 how do i set dfile encoding within ants build xml
  • Akka 与现有 java 项目集成的示例

    如果我已经有现有的javaWeb 应用程序使用spring and servlet容器 将 Akka 集成到其中的正确方法是什么 就像我将会有Actor1 and Actor2互相沟通的 开始使用这些演员的切入点是什么 例如 1 把它放在那
  • Java继承,扩展类如何影响实际类

    我正在查看 Sun 认证学习指南 其中有一段描述了最终修饰符 它说 如果程序员可以自由地扩展我们所知的 String 类文明 它可能会崩溃 他什么意思 如果可以扩展 String 类 我是否不会有一个名为 MyString 的类继承所有 S
  • 蓝牙发送和接收文本数据

    我是 Android 开发新手 我想制作一个使用蓝牙发送和接收文本的应用程序 我得到了有关发送文本的所有内容逻辑工作 但是当我尝试在手机中测试它时 我看不到界面 这是Main Activity Code import android sup
  • 如何在JPanel中设置背景图片

    你好 我使用 JPanel 作为我的框架的容器 然后我真的想在我的面板中使用背景图片 我真的需要帮助 这是我到目前为止的代码 这是更新 请检查这里是我的代码 import java awt import javax swing import
  • 如何区分从 Saxon XPathSelector 返回的属性节点和元素节点

    给定 XML
  • Spring @Cacheable 和 @Async 注解

    我需要缓存一些异步计算的结果 具体来说 为了克服这个问题 我尝试使用 Spring 4 3 缓存和异步计算功能 作为示例 我们采用以下代码 Service class AsyncService Async Cacheable users C
  • 在 Spring 上下文中查找方法级自定义注释

    我想知道的是 所有的类 方法Spring http en wikipedia org wiki Spring Framework注释为 Versioned的bean 我创建了自定义注释 Target ElementType METHOD E
  • Android View Canvas onDraw 未执行

    我目前正在开发一个自定义视图 它在画布上绘制一些图块 这些图块是从多个文件加载的 并将在需要时加载 它们将由 AsyncTask 加载 如果它们已经加载 它们只会被绘制在画布上 这工作正常 如果加载了这些图片 AsyncTask 就会触发v
  • Java RMI - 客户端超时

    我正在使用 Java RMI 构建分布式系统 它必须支持服务器丢失 如果我的客户端使用 RMI 连接到服务器 如果该服务器出现故障 例如电缆问题 我的客户端应该会收到异常 以便它可以连接到其他服务器 但是当服务器出现故障时 我的客户端什么也

随机推荐

  • python search用法,Python-re中search()函数的用法详解(查找ip)

    1 首先来看一下search 和find 的区别 import re s1 2221155 search 字符串第一次出现的位置 print re search 1 s1 print s1 find 1 它们的输出分别是 search 函数
  • 苹果系统itunes连iphone连不上服务器,itunes不识别iphone,iPhone连接不上iTunes怎么解决?连接不上iTunes怎么办?...

    今天一网友求助 itunes不识别iphone iPhone手机插上电脑后可以弹出设备 在电脑里面可以显示并能打开手机的相册 怎么样iphone也连不上iTunes 而换另外的一个iPhone连接又很正常 iPhone连接不上iTunes怎
  • 6-17 使用函数实现字符串部分复制 (20 分)

    6 17 使用函数实现字符串部分复制 20 分 本题要求编写函数 将输入字符串t中从第m个字符开始的全部字符复制到字符串s中 函数接口定义 void strmcpy char t int m char s 裁判测试程序样例 include
  • 使用JSONP解决跨域

    1 首先需要知道什么是跨域 浏览器从一个域名的网页去请求另一个域名的资源时 域名 端口 协议任一不同 都是跨域 出于浏览器的同源策略限制 同源策略 Sameoriginpolicy 是一种约定 它是浏览器最核心也最基本的安全功能 如果缺少了
  • c语言中auto、register、extern、static用法

    转载地址 http www 111cn net net c 38998 htm 四种存储类别说明符有两种存储期 自动存储期和静态存储期 auto和register对应自动存储期 具有自动存储期的变量在进入声明该变量的程序块是被建立 它在该程
  • 删除system/app下的apk

    要删除系统system app目录下的的APK 由于 system app目录默认是只读 所以 想要删除这些APK 必须首先获得system目录的删除权限 通过如下步骤删除system app下的apk文件 1 连接设备 如果是手机则需要打
  • Linux 基础笔记 权限与文件管理

    Linux 基础笔记 权限与文件管理 字符界面中退出登录可用哪种方法 exit 命令或 Ctrl D 组合键 pwd命令的功能是什么 显示当前目录的绝对路径 当前目录为 home 使用以下哪个命令后可进入 home Studd test目录
  • React中常见的TypeScript定义使用

    前言 在我学习typescript时 想在react中使用typescript写代码 从头开始的时候是懵逼的 因为官方文档并没有使用typescript的教程 多是自己在网上查 自己看定义摸索 所以今天把我用过的 总结归纳一下 希望能帮助到
  • MySQL查询合并结果去重_MySQL数据表合并去重的简单实现方法

    场景 爬取的数据生成数据表 结构与另一个主表相同 需要进行合并 去重 解决 直接举例 首先创建两个表pep pep2 其中pep是主表 CREATE TABLE IF NOT EXISTS pep pep2 id INT UNSIGNED
  • 「从零开始造 RPC 轮子系列」01 我为什么要去造一个轮子?

    目录 好日子 为什么你需要学习造轮子 投资自己 不要成为调包侠 通过造轮子你能学到什么 下一步计划 好日子 宣布一件事情 好日子 12月有个好日子 20211202 从左往右读 接着从右往左读你会发现居然是对称的 这是属于程序员的 浪漫 身
  • android系统网络管理,详解安全管家Android平台上网管理功能

    Android系统在这几年忽然兴起 并成为如今最受欢迎的智能手机操作系统 却始终没有能够很好地对自身的上网管理功能做出很好的优化 小编也和很多人一样 是Symbian转战Android的 而作为一名伪资深智能手机用户 当感受到Android
  • C语言的union联合体,可实现不同类型数据的转换

    结构体和共用体的区别在于 结构体的各个成员会占用不同的内存 互相之间没有影响 而共用体的所有成员占用同一段内存 修改一个成员会影响其余所有成员 结构体占用的内存大于等于所有成员占用的内存的总和 成员之间可能会存在缝隙 共用体占用的内存等于最
  • python pd pandas.cut 出现 NAN的问题

    原因 pandas cut 设置lebel的时候是 0 n 只包含n不包含0 解决方法 重新设置一个lebel 使其包含特殊的格式 产生问题的例子 源代码 d2 pd cut DaysList bins 0 1700 1800 1900 2
  • 最新AI系统ChatGPT源码+支持OpenAI全模型+国内AI模型+AI绘画

    一 SparkAI智能创作系统 SparkAi创作系统是基于国外很火的ChatGPT进行开发的Ai智能问答系统 本期针对源码系统整体测试下来非常完美 可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统 那么如何搭建部
  • Linux新手入门必须学会的相关知识

    Linux新手入门必须学会的相关知识 Linux基本知识 1 Linux基本介绍 Linux是什么 Linux的诞生过程 Linux内核与Linux内核发行版 常见的Linux发行版 2 Linux基础安全介绍 SSH登录方式介绍 SSH基
  • 嵌入式系统开发入门(一)

    1 开发工具概览 1 1硬件 1 1 1开发平台 基础系统 树莓派raspberry 选用4B版本 单片机系统 arduino 进阶系统 JETSON nano 1 1 2 配件 摄像头 开发板专用摄像头 树莓派 JETSON nano
  • python代码~满屏小练习

    完整代码如下 import tkinter as tk import random import threading import time def boom window tk Tk width window winfo screenwi
  • debian 11搭建ftp

    配置本地用户 创建用户 adduser user1 编辑配置文件 vim etc vsftpd conf grep v listen yes local enable YES 本地用户登陆限制 write enable YES 本地用户写权
  • QT教程:基本控件及相关类的介绍和使用

    一 QString类 Qt提供了自己的字符串处理类 不用担心内存分配以及关于 0结尾的这些注意事项 QSting会自动对占用的内存进行扩充 链接非常的迅速 实例 QString str hello str world 拼接字符串操作 对象函
  • Kafka工具类

    package com cnic utils import org apache flink api common serialization SimpleStringSchema import org apache flink api c