Kafka JDBC 连接器中的自定义分区分配

2024-03-03

我有一个用例,我需要编写自定义逻辑来根据消息中的某些关键参数分配分区。我对此做了一些研究,发现卡夫卡转换支持覆盖转换接口中的某些方法,但我无法在 git hub 或其他地方执行一些示例代码。有人可以分享示例代码或 git hub 链接来在 kafka JDBC 源连接器中进行自定义分区分配吗?

提前致谢!。


卡夫卡连接默认情况下分配分区使用:DefaultPartitioner (org.apache.kafka.clients.producer.internals.DefaultPartitioner)

如果您需要使用某些自定义覆盖默认值,这是可能的,但您必须记住,覆盖适用于所有源连接器。 为此,您必须设置producer.partitioner.class财产,前producer.partitioner.class=com.example.CustomPartitioner。 此外,您必须使用分区器将 jar 复制到 Kafka Connect 库的目录中。

改造方式:

在 Transformation 中也可以设置分区,但这不是正确的方法。 从Transformation您无权访问主题元数据,这对于分配分区至关重要。

无论如何,如果您想为记录设置分区,代码应如下所示:

public class AddPartition <R extends ConnectRecord<R>> implements Transformation<R> {

    public static final ConfigDef CONFIG_DEF = new ConfigDef();

    @Override
    public void configure(Map<String, ?> props) {
        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    }

    @Override
    public R apply(R record) {
        return record.newRecord(record.topic(), calculatePartition(record), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
    }

    private Integer calculatePartition(R record) {
        // Partitions calcuation based on record information
        return 0;
    }

    @Override
    public void close() {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka JDBC 连接器中的自定义分区分配 的相关文章

  • SPNEGO 密码身份验证问题

    我已将我的应用程序配置为通过 SPNEGO 与 Websphere 使用 Kerberos 身份验证 这是详细信息 krb5 conf libdefaults default realm ABC MYCOMPANY COM default
  • java.lang.VerifyError:JVMVRFY012堆栈形状不一致;

    在 WAS 8 5 5 中部署 Maven 项目时出现以下错误 我在WAS中安装了JDK 1 6和1 7 错误500 org springframework web util NestedServletException 处理程序处理失败
  • 从文本文件中读取阿拉伯字符

    我完成了一个项目 在该项目中我读取了用记事本编写的文本文件 我的文本文件中的字符是阿拉伯语 文件编码类型是UTF 8 当在 Netbeans 7 0 1 中启动我的项目时 一切似乎都正常 但是当我将项目构建为 jar 文件时 字符以这种方式
  • 在 Java 中使用 Batik 检查和删除 SVG 中的属性

    这个问题基本上说明了一切 如何检查 SVG 是否具有 viewBox 属性 我正在使用蜡染库 我需要这个 因为我需要 至少 通知用户有一个 viewBox 属性 我可以删除它吗 使用 org w3c dom 类 您可以按照以下方式做一些事情
  • Java 卡布局。多张卡中的一个组件

    一个组件 例如JLabel 在多张卡中使用CardLayout 目前看来该组件仅出现在它添加到的最后一张卡上 如果有办法做到这一点 我应该吗 这是不好的做法吗 或者有其他选择吗 你是对的 它只出现在 添加到的最后一张卡 中 但这与CardL
  • 迭代函数可以调用自身吗?

    当观看下面的 MIT 6 001 课程视频时 讲师在 28 00 将此算法标记为迭代 但是 在 30 27 他说这个算法和实际的 递归 算法都是递归的 该函数正在使用基本情况调用自身 那么这次迭代情况如何 private int itera
  • 记录共享和映射的诊断上下文

    据我所知 其他人做了什么来解决 Commons Logging 项目 针对 NET 和 Java 不支持映射或嵌套诊断上下文这一事实 执行摘要 我们选择直接使用实现者日志框架 在我们的例子中为 log4j 长答案 您是否需要一个抽象日志框架
  • 无法从资源加载图片

    So I am trying to load a image file from a resource so that when I export my application into a jar file it could be use
  • 在grails控制器中识别ajax请求或浏览器请求

    我正在开发一个使用大量ajax的grails应用程序 如果请求是ajax调用 那么它应该给出响应 这部分正在工作 但是如果我在浏览器中输入URL 它应该带我到主页 索引页面而不是请求的页面 下面是ajax调用的示例gsp代码
  • 为什么无法从 WEB-INF 文件夹内加载 POSModel 文件?

    我在我的 Web 项目中使用 Spring MVC 我将模型文件放在 WEB INF 目录中 String taggerModelPath WEB INF lib en pos maxent bin String chunkerModelP
  • 使用单独的线程在java中读取和写入文件

    我创建了两个线程并修改了 run 函数 以便一个线程读取一行 另一个线程将同一行写入新文件 这种情况会发生直到整个文件被复制为止 我遇到的问题是 即使我使用变量来控制线程一一执行 但线程的执行仍然不均匀 即一个线程执行多次 然后控制权转移
  • BadPaddingException:无效的密文

    我需要一些帮助 因为这是我第一次编写加密代码 加密代码似乎工作正常 但解密会引发错误 我得到的错误是 de flexiprovider api exceptions BadPaddingException 无效的密文 in the 解密函数
  • 如何以编程方式创建 CardView

    我正在开发一个 Android 应用程序Java Android Studio 我想在活动中创建CardView以编程方式 我想将以下属性设置为CardView layout width wrap content layout row 0
  • javax.media.jai 类的公共下载?

    这是一个非常简单的问题 我一直在寻找可以下载 javax media jai 库的地方 我找到了 jai imageio 库 但是我发现的所有其他 jai 内容要么已经过时 2008 年及之前 然后我遇到了登录屏幕 是否有 javax me
  • 如何制作一个makefile只用于编译一些java文件?

    我有三个java文件 名为A java B java C java A将创建对象B B将创建对象C 但我以前从未构建过makefile 有谁可以帮我构建一个 makefile 来编译这三个 java 文件吗 我应该使用什么工具来制作 mak
  • 受信任的 1.5 小程序可以执行系统命令吗?

    如果是的话 这个能力有什么限制吗 具体来说 我需要以 Mac OSX 为目标 我以前用过这个在 Windows 系统上启动东西 但从未在 Mac 上尝试过 public void launchScript String args Strin
  • 如何建立与 FileZilla Server 1.2.0 的 FTPS 数据连接

    使用 Apache commons net 的 Java FTPSClient 进行会话恢复是一个已知问题 会话恢复是 FTPS 服务器数据连接所需的一项安全功能 Apache FTPSClient 不支持会话恢复 并且 JDK API 使
  • Errors/BindingResult 参数应在模型属性、@RequestBody 或 @RequestPart 参数之后立即声明

    我通过剖析示例应用程序来自学 Spring 然后到处添加代码来测试我在剖析过程中开发的理论 在测试添加到 Spring 应用程序中的一些代码时 我收到以下错误消息 An Errors BindingResult argument is ex
  • 决策树和规则引擎 (Drools)

    In the application that I m working on right now I need to periodically check eligibility of tens of thousands of object
  • mybatis:使用带有 XML 配置的映射器接口作为全局参数

    我喜欢使用 XML 表示法来指定全局参数 例如连接字符串 我也喜欢 Mapper 注释 当我尝试将两者结合起来时 我得到这个例外 https stackoverflow com questions 4263832 type interfac

随机推荐