使用Kafka Connect时如何转换所有时间戳字段?

2023-11-22

我正在尝试将所有时间戳字段转换为格式为字符串类型yyyy-MM-dd HH:mm:ss.

要转换多个字段,我必须为每个字段单独创建一个转换。

...
"transforms":"tsFormat1,tsFormat2,...,tsFormatN",
"transforms.tsFormat1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat1.target.type": "string",
"transforms.tsFormat1.field": "ts_col1",
"transforms.tsFormat1.format": "yyyy-MM-dd HH:mm:ss",
"transforms.tsFormat2.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat2.target.type": "string",
"transforms.tsFormat2.field": "ts_col2",
"transforms.tsFormat2.format": "yyyy-MM-dd HH:mm:ss",
...
"transforms.tsFormatN.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormatN.target.type": "string",
"transforms.tsFormatN.field": "ts_colN",
"transforms.tsFormatN.format": "yyyy-MM-dd HH:mm:ss",
...

 

有没有办法对所有时间戳列应用单个转换?

我努力了,

...
"transforms":"tsFormat",
"transforms.tsFormat.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat.target.type": "string",
"transforms.tsFormat.field": "ts_col1, ts_col2,..., ts_colN",
"transforms.tsFormat.format": "yyyy-MM-dd HH:mm:ss",
...

and

...
"transforms":"tsFormat",
"transforms.tsFormat.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat.target.type": "string",
"transforms.tsFormat.field": "ts_col1",
"transforms.tsFormat.field": "ts_col2",
...
"transforms.tsFormat.field": "ts_colN",
"transforms.tsFormat.format": "yyyy-MM-dd HH:mm:ss",
...

 


更好的是数字类型匹配之类的东西"numeric.mapping": "best_fit"。就像如何numeric.mapping适用于所有数字字段(无需手动指定字段名称)以尝试找到最佳数字类型,是否有类似的东西可以对所有时间戳字段应用转换或字符串格式?


我在原来的基础上做了一些小小的调整TimestampConverter,如您所愿完美工作:https://github.com/howareyouo/kafka-connect-timestamp-converter

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用Kafka Connect时如何转换所有时间戳字段? 的相关文章

  • Spark:将 bytearray 转换为 bigint

    尝试使用 pyspark 和 Spark sql 将 kafka 键 二进制 字节数组 转换为 long bigint 会导致数据类型不匹配 无法将二进制转换为 bigint 环境详情 Python 3 6 8 Anaconda custo
  • KafkaStreams 同一应用程序中的多个流

    我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策 假设我想将两个不同的事件放入其中KTables 我有一个制作人将这些消息发送给KStream那就是听那个话题 据我所知 我不能对消息使用条件转发KafkaStrea
  • 如何使用 haproxy 负载均衡器 Kafka Bootstrap?

    我有一个 kafka 集群 由 3 台在 AWS 上运行的机器组成 卡夫卡1到卡夫卡3 我正在使用新型卡夫卡消费者 gt 0 8 我知道kafka客户端连接到其中一台kafka服务器 获取服务器元数据 然后直接连接到代理 我想确保在代理发生
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO
  • 在 Confluence 4.1 + Kafka 1.1 中为 Kafka Connect 打包自定义 Java `partitioner.class` 插件?

    我已经成功地将用 Java 编写的简单自定义 Partitioner 类用于 Confluence 3 2 x Kafka 0 10 x 上的 Kafka Connect 接收器 我想升级到 Confluence 4 1 Kafka 1 1
  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • Kafka Streams - 跳跃窗口 - 去重键

    我正在 4 小时窗口上进行跳跃窗口聚合 每 5 分钟前进一次 由于跳跃窗口重叠 我得到了具有不同聚合值的重复键 TimeWindows of 240 60 1000L advanceBy 5 60 1000L 如何消除具有重复数据的重复键或
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues
  • Spark shell (spark 3.0.0) 添加包 confluence kafka 5.5.1 javax.ws.rs-api 问题

    我本地的win10 WSL回到ubuntu 在ubuntu上 我安装了spark3 0 0 confluence平台5 5 1 手动下载 当我尝试运行spark shell或spark submit时 下面是shell示例 spark sh
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • kafka消费者群体正在重新平衡

    我正在使用 Kafka 9 和新的 java 消费者 我正在循环内进行轮询 当代码尝试执行 Consumer commitSycn 时 由于组重新平衡 我收到 commitfailedexcption 请注意 我将 session time
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153

随机推荐

  • 如何将 URL 作为路径参数传递给 FastAPI 路由?

    我使用 FastAPI 创建了一个简单的 API 并且尝试将 URL 作为任意地址传递到 FastAPI 路由path范围 from fastapi import FastAPI app FastAPI app post path def
  • 如何保存应用了 CSS 滤镜的图像

    我想知道如何将 CSS 过滤器应用于图像 然后将图像保存到磁盘 例如 我有一个图像标签 我可以通过 CSS 应用棕褐色效果 img sepia filter sepia 20 并将该类应用于 HTML 中的图像标签 img src img
  • “for”开头的分号如何工作?

    我刚刚在 Mozilla 网站上看到了这段代码 虽然对我来说它看起来很糟糕 但我可能不熟悉它的用法 for k lt len k if k in t t k searchElement return k 循环开头的分号如何工作 完整的代码是
  • 如何在 JPA 中从 @EmbeddedId 设置反向引用

    有人知道是否可以在 JPA 内建立反向引用 EmbeddedId 例如 有一个形式的实体 Entity public class Entity1 Id GeneratedValue private String identifier pri
  • Rails active_storage:安装不起作用

    我已将 Rails api 应用程序从 5 1 更新到 5 2 我只使用rails api 我正在尝试使用活动存储 我认为问题出在线路上config api only true in config application rb 我做了很多谷
  • Numpy:对于一个数组中的每个元素,找到另一个数组中的索引

    我有两个一维数组 x 和 y 一个比另一个小 我试图找到 y 在 x 中的每个元素的索引 我发现了两种简单的方法来做到这一点 第一个很慢 第二个需要大量内存 缓慢的方式 indices for iy in y indices np wher
  • 使Jackson在序列化时不输出类名(使用Spring MVC)

    有没有办法强制 Jackson 不要将类名放在 Json 输出中 我问了一个question这导致了这个问题 但我希望问的是一个更有针对性的问题 我在执行此操作时使用 Spring MVC 但我不确定这有多重要 所以 而不是 NamedSy
  • Android TabHost 内 Fragment

    我正在开发一个 Android 应用程序 并且使用了 android FragmentPager 选项卡example来自developer android com 此示例使用片段作为选项卡内容 现在我想在其中一个片段中放置一个tabHos
  • MySQL 中标志的 BIT(1) 或 TINYINT

    我经常有一些表需要存储一个标志 该标志可以是 1 或 0 真或假等 我以前用过 TINYINT 我应该使用 BIT 1 吗 为什么或者为什么不 如果您使用的 mysql 版本高于 5 0 3Bit不再是别名Tinyint但如果你创建一个bi
  • 如何从视图中获取托管活动?

    我有一个Activity with 3 EditText和一个自定义视图 它充当专用键盘以将信息添加到EditTexts 目前我正在通过Activity进入视图 以便我可以获得当前聚焦的编辑文本并从自定义键盘更新内容 有没有办法引用父活动并
  • preg_match 多个单词

    我想测试一个字符串以查看它包含某些单词 i e string The rain in spain is certain as the dry on the plain is over and it is not clear preg mat
  • 将Cocos2D添加到xCode 4中的项目中

    我正在从事一个项目 需要具有相同的原生部分 cocoa touch 和 cocos2d 中的某些部分 我已经把所有的原生都做了 现在我需要在项目中集成cocos2d 我的问题是我无法成功地将 cocos2d 库添加到我的项目中 有什么好的解
  • java.net.SocketException:套接字失败:EPERM(不允许操作)

    我正在开发一个包含多项活动的 Android Studio 项目 我目前正在尝试读取本地主机上 Java Servlet 的输出 但由于套接字权限 它似乎崩溃了 我创建了一个新项目 使用完全相同的代码并且工作完美 所以我不明白为什么不愿意为
  • 对每个数据库运行查询 (mysql)

    我正在寻找一种直接的方法来对我的 mysql 服务器上托管的所有数据库运行查询 我有一堆 Magento 安装 我想截断所有数据库上的所有 Magento 日志表 日志客户 访客日志 日志访客信息 log url 日志地址信息 日志引用 报
  • android模拟器可以播放音频吗

    我想录制并将录制的声音传递到手机扬声器 但我无法使录制代码正常工作 应用程序崩溃 在这里查看我的尝试 所以我现在试图看看模拟器是否可以做任何与音频相关的事情 我将 1 秒的录音复制到了 SD 卡上 格式为 wav 16 位 pcm 44k
  • Color 和 SolidColorBrush 之间的区别澄清

    好吧 这一直困扰着我 我还没有真正找到任何明确的答案作为两者之间差异的原因 原因Color SolidColorBrush所以我想知道是否有人可以在这方面教育我 我已经知道用法上的差异 例如我可以使用SolidColorBrush就像我说的
  • 使用自签名 SSL 证书时出现“SocketException:未实现未连接的套接字”

    我已经向 jmeter user 邮件列表提出了同样的问题 但我也想在这里尝试 所以至少我可以在找到答案后更新它 我使用时遇到问题JMeter使用自签名 SSL 证书测试 Tomcat Web 应用程序 JMeter 抛出带有消息的 Soc
  • Firefox、Chrome、Safari 对于 MP4 HTML5 视频有灰色背景

    任何具有白色背景的视频 我可以制作 在 Firefox Chrome 和 Safari 中都会变成灰色 在 IE 中为白色 嗯 在我的 Windows 机器上它是灰色的 在我的 Android 手机 平板电脑和 Mac 上它是白色的 我正在
  • 如何使用 Node.js 实现安全的 REST API

    我开始计划使用 Node js express 和 mongodb 构建 REST API 该 API 为网站 公共和私人区域 提供数据 之后可能还会为移动应用程序提供数据 前端将使用 AngularJS 开发 几天来 我阅读了很多有关保护
  • 使用Kafka Connect时如何转换所有时间戳字段?

    我正在尝试将所有时间戳字段转换为格式为字符串类型yyyy MM dd HH mm ss 要转换多个字段 我必须为每个字段单独创建一个转换 transforms tsFormat1 tsFormat2 tsFormatN transforms