java版kafka producer实现

2023-11-19

需求:

1. kafka server已经配置完全,且设定了访问限制

       基于这一点,必须要设定认证,及预先分配的账号密码

2. 由于项目开发环境是java,且不允许使用LogStash

       基于这一点,必须实现一个java版的producer

 

先贴一份代码,本地运行通过,祛除不相干部分之后的代码(未验证):

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Producer {
    private static Pruducer instance = null;
    private final KafkaProducer<String, String> producer;
    private final static String TOPIC = "your_topic";

    private Producer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafkaserver:9110");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
        properties.setProperty ("sasl.mechanism", "PLAIN");
        properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required\n" +
                "username=\"count\"\n" +
                "password=\"countxxxxx\";");

        producer = new KafkaProducer<String, String>(properties);
    }

    public static KafkaProducerClient getInstance() {
        if (instance == null) {
            instance = new KafkaProducerClient();
        }
        return instance;
    }

    public void send(String key, String msg) {
        producer.send(new ProducerRecord<String, String>(TOPIC, key, msg));
    }

}

consumer 实现和procuder差不多,很容易实现。

写这篇文章的主要目的在于:认证。

根据apache官网文档看,有两种认证方式,其一是设置java.security.auth.login.config属性,其二则是设置sasl.jaas.config属性(如代码示例)。

第一种方式运行JVM时添加参数:-Djava.security.auth.login.config=/tmp/kafka_client_jaas.conf

kafka_client_jaas.conf 文件内容如下:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    username="count"
    password="countxxxx";
};

注意:配置文件中的关键词不可随意更改,而且大小写敏感。

网上好多说System.setProperty("java.security.auth.login.config", "/tmp/kafka_client_jaas.conf")也可以达到设置目的。但经过测试无奈的发现,sdk可以获取到设置的property,但不知为何总是报告无法解析文件。不知道到同行们怎么做到的。

但由于这种方式时全局设置,可能会对服务器造成不可预估的影响,故而舍弃。

第二种方式不需要添加其他任何东西,只需要在代码中设置即可。填写sasl.jaas.config的值时一定要注意分号的转义和换行符的使用,错误的话解析依然失败。

同时若两种都设置了,那么以sasl.jaas.config为主。官方解析如下:

If both static JAAS configuration system property java.security.auth.login.config and client property sasl.jaas.config are specified, the client property will be used.

 

由于时初次接触这部分,着实浪费了不少时间,特写此文章,以示警戒。

 

参考网址:

https://kafka.apache.org/0110/documentation.html#security_sasl

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java版kafka producer实现 的相关文章

  • 使用 BlobOutputStream 在 Azure 中上传 blob

    我正在尝试直接从流上传 blob 因为我不知道我决定尝试的流的长度这个答案 https stackoverflow com a 24621538 3695939 这不起作用 即使它从流中读取并且不会抛出任何异常 内容也不会上传到我的容器 我
  • 创建通用数组时出错

    public class TwoBridge implements Piece private HashSet
  • Google App Engine 数据存储写入:如何远程启用/禁用只读模式?

    在阅读备份时GAE 的数据存储 https developers google com appengine docs adminconsole datastoreadmin where 我们强烈建议您在备份或恢复期间将应用程序设置为只读模式
  • 从SQLite列中获取所有数字字符串并进行总和计算

    我是 Android 和 SQLite 的新手 我在 SQLite 中有一个只有数字的 AMOUNT 列 我可以在 ListView 中显示它 但我无法找到任何我理解的方法来将它们全部添加并显示在 TextView 中 这是数据库助手 im
  • selenium 2.0 中的 isElementPresent

    大家好 我正在使用 webdriver 所以如果我想使用 selenium s rc 函数 isElementPresent 我必须模拟 selenium rc 所以我会执行以下操作 import org openqa selenium B
  • Java中的字节和字符转换

    如果我将一个字符转换为byte然后回到char 那个角色神秘地消失了 变成了别的东西 这怎么可能 这是代码 char a line 1 byte b byte a line 2 char c char b line 3 System out
  • Mediaplayer 播放几次后停止播放

    我有一个按钮 按下它会播放一个随机声音剪辑 然后播放另一个声音剪辑 然后通过一个媒体播放器播放另一个声音剪辑 但是多次按下该按钮 15 20 次 后 所有音频都会停止 我在播放最后一个音频剪辑后释放媒体播放器 所以我不认为这是原因 有什么指
  • 如何在android中使用retrofit访问404错误?

    我正在使用改造 2 访问 REST API 以使用原始正文插入 JSON 数据 我从服务器获得成功响应 但在响应时收到 404 错误 我想访问404错误请帮我解决这个问题 ApiUtil getServiceClass sendFinalC
  • Java:等于和==

    让我们看看我们有 2 个对用户定义类实例的引用 即 Java 中的 a 和 b 会不会有一种情况 a b 但 a equals b 返回 false 当然 实施 equals 完全取决于班级 所以我可以写 class Foo public
  • 使用 SSL 和代理设置的 Rest 客户端获取连接超时

    我正在使用带有忽略 ssl 的 Rest 客户端 它工作正常 但在将来我尝试使用客户端证书进行的生产中将无法工作 我有 ca 证书和客户端证书 我用它创建了一个客户端 但我收到错误 Exception in thread main com
  • grails 中的 log4j:如何登录文件?

    我的 grails config groovy 中有这个 log4j 配置 log4j error org codehaus groovy grails web servlet controllers org codehaus groovy
  • 无法删除临时文件夹(有时)

    当我启动应用程序时 我创建一个临时文件夹 public static File createTempDir String name throws IOException File tempDir File createTempFile na
  • @Transactional 注解属于哪里?

    如果您将 Transactional in the DAO类和 或其方法 或者注释使用 DAO 对象调用的服务类是否更好 或者注释两个 层 是否有意义 我认为事务属于服务层 它是了解工作单元和用例的人 如果您将多个 DAO 注入到需要在单个
  • 覆盖Java中的属性[重复]

    这个问题在这里已经有答案了 在 Java 中 我最近有几个项目 我使用了这样的设计模式 public abstract class A public abstract int getProperty public class B exten
  • 有没有办法删除 JShell 中的导入?

    我正在发现 JShell 并且发现默认添加的导入 jshell gt imports import java io import java math import java net import java nio file import j
  • 为什么 CompletableFuture 的 thenAccept() 不在主线程上运行

    我在 CompletableFuture 的 SupplyAsync 中处理长时间运行的操作 并将结果放入 thenAccept 中 有时 thenAccept 在主线程上执行 但有时它在工作线程上运行 但我只想在主线程上运行 thenAc
  • 莫基托。验证方法参数是特定类

    我有一个方法 void putObject
  • 可空日期列合并问题

    我在 Geronimo 应用程序服务器上使用 JPA 和下面的 openjpa 实现 我也在使用MySQL数据库 我在更新具有可为空 Date 属性的对象时遇到问题 当我尝试合并 Date 属性设置为 null 的实体时 不会生成 sql
  • 在edittext android中插入imageview

    我想将 imageview 放在 edittext 中 可能吗 我检查了 evernote 应用程序 它能够将照片放在编辑文本部分 我想让我的应用程序完全相同 我如何才能将从图库中选择的图像视图放入编辑文本中 我首先尝试将 imagevie
  • ImageIO.read(...) - 非常慢,有更好的方法吗?

    我正在加载大量将在我的应用程序中使用的图标 我计划在服务器启动时从 jar 中加载所有这些 然而 由于数百张图像加起来刚刚超过 9MB 执行此任务仍然需要 30 秒多的时间 我现在正在一个单独的线程中执行此操作 但这让我想知道我是否在代码中

随机推荐

  • 凛冬已至 冰凌垂挂 岁末年初

    时光荏苒 岁月蹉跎 时间一分一秒从我们身边流过 岁月的脚步声也是越来越小 还没来得及跟眼前的2022挥手道别 2023已经出现在我们的眼前向我们问好 2023 就是新的一年 总会给我们带来无数的幻想和憧憬 虽然现在的我还没有一个真正的新年
  • QT基础学习(12)---事件过滤

    文章目录 事件过滤 一 事件过滤 实现该功能的方法就是在目标部件 自定义的图片显示部件 上注册事件过滤器 此时的事件过滤器就是我们所说的监视对象 完成这些步骤之后 当目标部件有事件产生后 首先会传递给监视对象 事件过滤器 进行处理而不是该事
  • 华为OD机试 - 猜字谜(Java)

    题目描述 小王设计了一个简单的猜字谜游戏 游戏的谜面是一个错误的单词 比如nesw 玩家需要猜出谜底库中正确的单词 猜中的要求如下 对于某个谜面和谜底单词 满足下面任一条件都表示猜中 变换顺序以后一样的 比如通过变换w和e的顺序 nwes
  • Kibana报错:Kibana server is not ready yet解决方法

    环境及版本 elasticsearch和kibana均为包安装的7 6 2 系统为unbutu22 04 1 部署完后访问kibana的web界面 出现kibana server is not ready yet 遇到这个问题后也是搜索了一
  • python注意事项

    python注意事项 1 缩进问题 每一个缩进都可能会导致有bug 因此要格外注意缩进 对齐 空格问题 尤其是循环体下的空格 一定要对齐 一般是缩进4格 2 标点符号的使用小结 逗号后面要有空格 冒号也是 等号前后都要有空格 3 字符串使用
  • MBA-day31 绝对值的几何意义

    绝对值的几何意义 1 x 2 x 4 由图可知 x 有 3 处取值区间 x gt 2 无最大值 x gt 4 无最大值 2 lt x lt 4 当x取值为 2和4时 存在几何意义中的最小值为 6 2 x 2 x 4 3 是否有根 由题1中
  • JAVAWEB编程题

    1 登陆验证代码
  • 人工智能大模型加速数据库存储模型发展 行列混合存储下的破局

    数据存储模型 专栏内容 postgresql内核源码分析 手写数据库toadb 并发编程 toadb开源库 个人主页 我的主页 座右铭 天行健 君子以自强不息 地势坤 君子以厚德载物 概述 在数据库的发展过程中 关系型数据库是一个里程碑式的
  • STL标准模板库学习笔记三(STL哈希容器)

    关联式容器 排序 的底层实现采用的树存储结构 更确切的说是红黑树结构 无序容器 哈希 的底层实现采用的是哈希表的存储结构 基于底层实现采用了不同的数据结构 因此和关联式容器相比 无序容器具有以下 2 个特点 无序容器内部存储的键值对是无序的
  • 欢迎访问阿里云Go Module代理仓库服务

    简介 go module公共代理仓库 代理并缓存go模块 你可以利用该代理来避免DNS污染导致的模块拉取缓慢或失败的问题 加速你的构建 地址 https mirrors aliyun com goproxy 使用帮助 1 使用go1 11以
  • leetcode刷题__删除有序数组中的重复项

    文章目录 题目描述 Java解决方法 题目描述 Java解决方法 class Solution public int removeDuplicates int nums int len nums length if len 0 return
  • Day81-爱心代码音乐版

    项目地址 截止到2023 9月有效 点击跳转 先看效果 素材来自网络 自己加了播放音乐的效果 直接上代码
  • 快速排序算法及其改进算法实现

    快速排序算法不稳定 快速排序算法在大多数的计算机上运行得都比其他排序算法快 而且排序算法消耗资源少 就平均时间而言快排是所有内部排序中最好的一个 对于已经排好的数组 最速排序有最坏时间复杂度为o n 2 当数组长度很小时 快排往往比其他排序
  • 【openGL2021版】链接FBX模型库

    openGL2021版 链接FBX模型库 大家好 我是Lampard猿奋 欢迎来到船新的openGL基础系列的博客 今天主要实现的是链接FBX模型第三方库 一 demo回顾 上周我已经实现了粒子系统 可以看到场景中已经存在着像萤火虫一样的闪
  • Wsl 开发环境配置

    Apt换源 切换清华源 sudo sed i s http archive ubuntu com https mirrors tuna tsinghua edu cn g etc apt sources list sudo sed i s
  • 图文详解 排序算法对比以及能解决的问题

    我们会使用的排序算法 总结源自 极客时间 王争老师 地址 https time geekbang org column intro 126 能看到所有的算法和数据结构的网站 https visualgo net en 这里是关于排序算法的链
  • 六大设计原则--开闭原则

    定义 software entities like classes modules and functions should be open for extension but closed for modifications 一个软件实体
  • php background,CSS BACKGROUND定位背景上下左右偏移

    css中我们经常使用background加载图片作为背景 这个背景图片可以通过具体属性的设置调整其上下左右偏移 调整背景图的上下左右偏移可以通过设置background属性或background position属性来实现 我们以div加背
  • Linux:非root、普通用户安装rpm(fontconfig、fc-cache命令)、中文字体

    前提 非root帐号 普通帐号 不能使用yum rpm命令 没root权限 安装fontconfig 测试是否安装 安装字体后需要使用fontconfig包的fc cache fv命令刷新字体缓存 可以使用下面命令测试 二选一就行 fc l
  • java版kafka producer实现

    需求 1 kafka server已经配置完全 且设定了访问限制 基于这一点 必须要设定认证 及预先分配的账号密码 2 由于项目开发环境是java 且不允许使用LogStash 基于这一点 必须实现一个java版的producer 先贴一份