具有 kerberos 的 Kafka Java 生产者

2023-12-10

在 kerberos 环境中向 kafka 主题发送消息时出现错误。我们在 hdp 2.3 上有集群

我跟着这个http://henning.kropponline.de/2016/02/21/secure-kafka-java- Producer-with-kerberos/

但是为了发送消息,我必须首先显式执行 kinit,然后只有我才能将消息发送到 kafka 主题。 我尝试通过 java 类进行编织,但这也不起作用。 PFB代码:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {

    public static void main(String[] args) {

        String principalName = "ctadmin";
        String keyTabPath = "/etc/security/keytabs/ctadmin.keytab";
        boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);

        if (!authStatus) {
            System.out.println("Authntication fails, try something else  "  + authStatus);
        } else {
            System.out.println("Authntication successfull " + authStatus);
        }

        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");
        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
        System.setProperty("sun.security.krb5.debug", "true");

        try {
            long events = Long.parseLong("3");
            Random rnd = new Random();

            Properties props = new Properties();
            System.out.println("After broker list- " + args[0]);

            props.put("metadata.broker.list", args[0]);
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", "1");
            props.put("security.protocol", "PLAINTEXTSASL");

            //props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner");


            System.out.println("After config prop -1");

            ProducerConfig config = new ProducerConfig(props);

            System.out.println("After config prop -2 config" + config);

            Producer<String, String> producer = new Producer<String, String>(config);

            System.out.println("After config prop -3");

            for (long nEvents = 0L; nEvents < events; nEvents += 1L) {
                Date runtime = new Date();
                String ip = "192.168.2" + rnd.nextInt(255);
                String msg = runtime + " www.example.com, " + ip;
                KeyedMessage<String, String> data = new KeyedMessage<String, String>("test_march4", ip, msg);

                System.out.println("After config prop -1 data" + data);

                producer.send(data);
            }
            producer.close();

        } catch (Throwable th) {
            th.printStackTrace();

        }
    }
}

Pom.xml :从 hortonworks 存储库下载的所有依赖项。

        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.9.0.2.3.4.0-3485</version>
            </dependency>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.9.0.2.3.4.0-3485</version>
            </dependency>

            <dependency>
                <groupId>org.jasypt</groupId>
                <artifactId>jasypt-spring31</artifactId>
                <version>1.9.2</version>
                <scope>compile</scope>
            </dependency>

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.1.2.3.4.0-3485</version>
            </dependency>

        </dependencies>

错误 : Case1:当我指定 myuser kafka_jass.conf 时

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
After config prop -2 configkafka.producer.ProducerConfig@643293ae
java.lang.SecurityException: Configuration Error:
        Line 6: expected [controlFlag]
        at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:110)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:379)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:258)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:250)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:291)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
        at kafka.producer.Producer.<init>(Producer.scala:50)
        at kafka.producer.Producer.<init>(Producer.scala:73)
        at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
        at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
Caused by: java.io.IOException: Configuration Error:
        Line 6: expected [controlFlag]
        at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563)
        at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:413)
        at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383)
        at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283)
        at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219)
        at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:108)

MyUser_Kafka_jass.conf

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   doNotPrompt=true
   useTicketCache=true
   renewTicket=true
   principal="ctadmin/[email protected]";
   useKeyTab=true
   serviceName="kafka"
   keyTab="/etc/security/keytabs/ctadmin.keytab"
   client=true;
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/ctadmin.keytab"
   storeKey=true
   useTicketCache=true
   serviceName="zookeeper"
   principal="ctadmin/[email protected]";
};

case2:当我指定Kafkas自己的jaas文件时

Java config name: /etc/krb5.conf
Loaded from Java config
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner  authentication information from the user
        at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)
        at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
        at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
        at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
        at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
        at kafka.producer.Producer.<init>(Producer.scala:50)
        at kafka.producer.Producer.<init>(Producer.scala:73)
        at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
        at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)

如果我在运行此应用程序之前执行 kinit,则效果很好,否则它将出现上述错误。 我无法在我的生产环境中执行此操作,如果我们的应用程序本身有任何方法可以执行此操作,请帮助我。 如果您需要更多详细信息,请告诉我。

Thanks:)


该错误位于 jaas 文件中的分号中,正如您在这段输出中看到的那样:

Line 6: expected [controlFlag]

该行不能有分号:

principal="ctadmin/[email protected]";

它只能存在于最后一行:

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有 kerberos 的 Kafka Java 生产者 的相关文章

  • 无法在 Android 10 中创建目录

    我无法在 android 10 中创建目录 它可以在 android Oreo 之前的设备上运行 我尝试了两种创建文件夹的方法 Using File mkdir File f new File Environment getExternal
  • HashMap不写入数据库

    我尝试在我的数据库中写入 但只写入发件人和消息 我不明白为什么会发生这种情况 我认为问题出在我使用 sendMessage 的地方 我认为问题是我没有什么可以做的读 写其他用户的主键 我在数据库中写入消息的活动 public class M
  • 使用 GWT CellTableBuilder 构建树表

    Is it possible to build a tree table like this http www sencha com examples ExamplePlace basictreegrid with the new Cell
  • “_加载小部件时出现问题”消息

    加载小部件时 如果找不到资源或其他内容 则会显示 加载小部件时出现问题 就这样 惊人的 此消息保留在主屏幕上 甚至没有说明加载时遇到问题的小部件 我通过反复试验弄清楚了这一点 但我想知道发生这种情况时是否有任何地方可以找到错误消息 Andr
  • 添加动态数量的监听器(Spring JMS)

    我需要添加多个侦听器 如中所述application properties文件 就像下面这样 InTopics Sample QUT4 Sample T05 Sample T01 Sample JT7 注意 这个数字可以多一些 也可以少一些
  • Grails 2.3.0 自动重新加载不起作用

    我最近将我们的项目升级到 grails 2 3 0 一切工作正常 除了每当我更改代码时自动重新加载都无法工作的问题 这包括所有项目工件 控制器 域 服务 gsps css 和 javascript 文件 我的旧版本 grails 可以正常工
  • 为什么 java 编译器不报告 Intellij 中多播表达式的未经检查的强制转换警告?

    为什么下面的代码没有报告 Intellij IDEA 的未经检查的警告jdk 1 8 0 121自从Supplier
  • 如何将 android.net.Uri 转换为 java.net.URL? [复制]

    这个问题在这里已经有答案了 有没有办法从Uri to URL 我正在使用的库需要这个 它only接受一个URL但我需要在我的设备上使用图像 如果该方案的Uri is http or https new URL uri toString 应该
  • Java:正则表达式排除空值

    在问题中here https stackoverflow com questions 51359056 java regexp for a separated group of digits 我得到了正则表达式来匹配 1 到 99 之间的一
  • 将表值参数与 SQL Server JDBC 结合使用

    任何人都可以提供一些有关如何将表值参数 TVP 与 SQL Server JDBC 一起使用的指导吗 我使用的是微软提供的6 0版本的SQL Server驱动程序 我已经查看了官方文档 https msdn microsoft com en
  • Java 收集返回顶级项目的映射的嵌套流

    我有以下模型 class Item String name List
  • Cloudfoundry:如何组合两个运行时

    cloundfoundry 有没有办法结合两个运行时环境 我正在将 NodeJS 应用程序部署到 IBM Bluemix 现在 我还希望能够执行独立的 jar 文件 但应用程序失败 APP 0 bin sh 1 java not found
  • 如何配置 WebService 返回 ArrayList 而不是 Array?

    我有一个在 jax ws 上实现的 java Web 服务 此 Web 服务返回用户的通用列表 它运行得很好 Stateless name AdminToolSessionEJB RemoteBinding jndiBinding Admi
  • 尝试使用等于“是”或“否”的字符串变量重新启动 do-while 循环

    计算行程距离的非常简单的程序 一周前刚刚开始 我有这个循环用于解决真或假问题 但我希望它适用于简单的 是 或 否 我为此分配的字符串是答案 public class Main public static void main String a
  • 对象锁定私有类成员 - 最佳实践? (爪哇)

    I asked 类似的问题 https stackoverflow com questions 10548066 multiple object locks in java前几天 但对回复不满意 主要是因为我提供的代码存在一些人们关注的问题
  • 挂钩 Eclipse 构建过程吗?

    我希望在 Eclipse 中按下构建按钮时能够运行一个简单的 Java 程序 目前 当我单击 构建 时 它会运行一些 JRebel 日志记录代码 我有一个程序可以解析 JRebel 日志文件并将统计信息存储在数据库中 是否可以编写一个插件或
  • 哪个集合更适合存储多维数组中的数据?

    我有一个multi dimensional array of string 我愿意将其转换为某种集合类型 以便我可以根据自己的意愿添加 删除和插入元素 在数组中 我无法删除特定位置的元素 我需要这样的集合 我可以在其中删除特定位置的数据 也
  • Java的-XX:+UseMembar参数是什么

    我在各种地方 论坛等 看到这个参数 并且常见的答案是它有助于高并发服务器 尽管如此 我还是找不到 sun 的官方文档来解释它的作用 另外 它是Java 6中添加的还是Java 5中存在的 顺便说一句 许多热点虚拟机参数的好地方是这一页 ht
  • Android - 9 补丁

    我正在尝试使用 9 块图片创建一个新的微调器背景 我尝试了很多方法来获得完美的图像 但都失败了 s Here is my 9 patch 当我用Draw 9 patch模拟时 内容看起来不错 但是带有箭头的部分没有显示 或者当它显示时 这部
  • Java &= 运算符应用 & 或 && 吗?

    Assuming boolean a false 我想知道是否这样做 a b 相当于 a a b logical AND a is false hence b is not evaluated 或者另一方面 这意味着 a a b Bitwi

随机推荐

  • 使用自定义属性有效吗?

    我想取消任何链接并为每个链接添加额外的属性 下面是我是如何实现这一目标的 function anularEnlaces nav a each function var href this attr href var id this attr
  • C# 按字母顺序和长度对 Arraylist 字符串进行排序

    我正在尝试排序ArrayList of String Given A C AA B CC BB Arraylist Sort gives A AA B BB C CC 我需要的是 A B C AA BB CC ArrayList list
  • +0和-0一样吗?

    阅读通过ECMAScript 5 1 规范 0 and 0是杰出的 那么为什么呢 0 0评估为true JavaScript 使用IEEE 754 标准来表示数字 从维基百科 签名零为零并带有相关符号 在普通算术中 0 0 0 然而 在计算
  • Android 应用程序 - 如何获取联系人的生日

    我正在开发一个 Android 应用程序 我需要将每个联系人的生日与当前日期进行匹配 如果是的话 则处理一些业务逻辑 这需要完整的联系人详细信息 我找到了分别读取联系人生日或联系人本身的方法 但对如何将两者结合起来感到困惑 有人可以提供一些
  • 如何用段落标签包围所有文本片段? [关闭]

    Closed 这个问题需要多问focused 目前不接受答案 我想在任何文本项周围放置段落标签 因此应该避免表格和其他元素 我怎么做 我想它可以用某种方式制成preg replace 以下是一些可以帮助您完成您想做的事情的函数 nl2p T
  • Yii2 gridview - 模式仅在单击第一行时显示

    我正在使用 kartik grid GridView 和 kartik grid ExpandRowColumn 来显示摘要信息 在 ExpandRowColumn 上 我有另一个 gridview 来列出详细信息 这部分工作正常 在 Ex
  • swift 3 Playground 中的回调[重复]

    这个问题在这里已经有答案了 您好 我正在尝试在操场上执行这行代码 但得到任何响应输出 我的代码如下 func testCallbackEmpty callback escaping gt Void DispatchQueue main as
  • 如何在 WordPress 插件中使用多媒体上传器?

    我尝试在 WordPress 插件中添加多重上传选项 我在插件中重复了此代码 两次 仅更改了 id 名称
  • 我可以使用反射访问 ItemsControl 的 ItemsHost 吗?

    我正在创建自定义ItemsControl这是源自DataGrid 我需要访问 ItemsHost 这是Panel实际上包含行DataGrid 我见过一些丑陋的技巧来做到这一点 但我认为它们比使用反射更糟糕 那么我可以使用反射访问 Items
  • 如何使用/导入http模块?

    我一直在玩Angular 2 快速入门 如何在 Angular 2 中使用 导入 http 模块 我看过Angular 2 Todo s js 但它不使用 http 模块 我已经添加 ngHttp angular http to depen
  • to_number Oracle SQL 中数字格式的动态长度

    我有一个表 其中的数字存储为varchar2和 作为小数点分隔符 例如 5 92843 我想使用 来计算这些数字 因为这是系统默认值 并且使用了以下内容to number去做这个 TO NUMBER number 99999D9999 NL
  • Neo4j 桌面打开时出错 - “无法读取未定义的属性“名称”

    我有一个运行 Windows Server 2016 的盒子 我在这里找到了有关 MAC 的问题的答案 neo4j 初始化错误 TypeError 无法读取未定义的属性 名称 这个答案似乎在 Windows 中不起作用 因为删除 AppDa
  • PHP 通过键和值创建数组

    我在下面的 PHP 代码中有一个数组 我想将该数组转换为按数据值分组 简化数组总是很困难 Array 0 gt Array video id gt 14 video title gt test1 video category name gt
  • 如何从聚合物组件访问父模型

    我将 my app 作为 index html 文件中的主要应用程序组件 并使用 model dart 作为其模型 这是我的应用程序模型 my app 以 my component 作为其内容 当用户与 my component 交互时 我
  • 如何在Python中获取字符串的大小(长度)

    例如 我得到一个字符串 str please answer my question 我想把它写入一个文件 但在将字符串写入文件之前我需要知道字符串的大小 我可以使用什么函数来计算字符串的大小 如果你正在谈论字符串的长度 你可以使用len g
  • 如何自动将当前路由中的特定值添加到所有生成的链接?

    我的 URL 中有网站文化 如下所示 routes MapRoute Default language controller action id languageDefaults languageConstraints 它的工作方式就像一个
  • 结合 lapply、svyby、svyratio 计算许多具有置信区间的比率

    我正在使用surveyR 中的包可与美国人口普查局的 PUMS 人口数据集配合使用 我为每个广泛的行业创建了一个布尔值和一个字符变量MigrationStatus具有三个值 Stayed Left Entered 我想按移民身份检查每个行业
  • 将数据从 Rails 视图传递到 webpacker 中的 VueJS 组件

    我正在尝试摆弄 Rails 5 1 的新 webpacker gem 以及 VueJS 但无法让我的 erb 视图将数据传递给 VueJS 组件 假设我有一个用户显示视图 view users show html erb 还有我的 Java
  • 使用VBA生成短哈希字符串

    我正在寻找一个 VBA 函数 它可以根据字符串内容生成非常短的哈希码 例如 3 个字符 From http www lammertbies nl forum viewtopic php t 302 Sub CRC16 Dim x As Lo
  • 具有 kerberos 的 Kafka Java 生产者

    在 kerberos 环境中向 kafka 主题发送消息时出现错误 我们在 hdp 2 3 上有集群 我跟着这个http henning kropponline de 2016 02 21 secure kafka java Produce