卡夫卡主题_授权_失败

2023-11-23

我实际上正在努力使用 SASL 纯文本设置简单的 Kafka 身份验证并添加 ACL 授权。但当我尝试使用数据时遇到问题。

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : b8642491e78c5a13
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 1 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 2 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 3 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 4 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 5 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 6 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 7 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 8 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 9 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 10 : {test-topic=TOPIC_AUTHORIZATION_FAILED}

接下来,你可以看到我的配置文件。

服务器属性

listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

生产者属性

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
bootstrap.servers=localhost:9092
compression.type=none

消费者属性

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-group

kafka_server_jaas.conf

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_alice="alice-secret";
};

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="alice"
  password="alice-secret";
};

环境变量:

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_server_jaas.conf"

Commands

Set ACL:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --operation All --group test-consumer-group --topic test-topic

启动卡夫卡服务器:

./bin/kafka-server-start.sh config/server.properties

启动制作人:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --producer.config=config/producer.properties

启动消费者:

bin/kafka-console-consumer.sh --new-consumer --zookeeper localhost:2181 --topic test-topic --from-beginning --consumer.config=config/consumer.properties  --bootstrap-server=localhost:9092

当我尝试启动消费者时,我遇到了上述问题。另外,在卡夫卡日志中,我有这个:

[2016-10-22 20:17:14,091] ERROR [KafkaApi-0] Error when handling request {group_id=test-consumer-group} (kafka.server.KafkaApis)
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1
    at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createTopic(KafkaApis.scala:629)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createGroupMetadataTopic(KafkaApis.scala:651)
    at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657)
    at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657)
    at scala.Option.getOrElse(Option.scala:121)
    at kafka.server.KafkaApis.getOrCreateGroupMetadataTopic(KafkaApis.scala:657)
    at kafka.server.KafkaApis.handleGroupCoordinatorRequest(KafkaApis.scala:818)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
    at java.lang.Thread.run(Thread.java:745)

我怎样才能解决这个问题?


通过分离 jaas 客户端和 jaas 服务器解决了问题。

kafka_server_jaas.conf

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_alice="alice-secret";
};

kafka_client_jaas.conf

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="alice"
  password="alice-secret";
};

在同一终端上,导出 jaas 服务器conf文件并启动kafka代理:

$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_server_jaas.conf"
$ ./bin/kafka-server-start.sh config/server.properties

在客户端上,导出客户端 jaas conf 文件并启动消费者:

$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_client_jaas.conf"
$ ./bin/kafka-console-consumer.sh --new-consumer --zookeeper localhost:2181 --topic test-topic --from-beginning --consumer.config=config/consumer.properties  --bootstrap-server=localhost:9092

如果您还想生成,请在另一个终端窗口上执行此操作:

$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_client_jaas.conf"
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --producer.config=config/producer.properties
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

卡夫卡主题_授权_失败 的相关文章

  • 由于 jaas.conf 不正确而导致 Kafka TopicAuthorizationException

    我指的是JAAS登录配置文件 https docs oracle com javase 7 docs technotes guides security jgss tutorials LoginConfigFile html 它讨论了两种指
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO
  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • 安装 confluence-kafka 时“文件名或扩展名太长”?

    我在使用 pip install confluence kafka 安装 confluence kafka 时遇到一些问题 但我收到此错误 文件名或扩展名太长 详细信息如下 Collecting confluent kafka Using
  • 有没有办法重新分区 Kafka 流中的输入主题?

    我有一个由 byte 键控的主题 我想对其进行重新分区并通过消息正文中字段中的另一个键处理该主题 我发现有KGroupedStream and groupby功能 但它需要一个聚合函数来转换为 KTable KStream 我不需要聚合 我
  • Apache Kafka 与 Apache Storm

    Apache Kafka 分布式消息系统Apache Storm 实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据 就实时数据管道而言 在我看来 两者的工作都是相同的 我们如何在数据管道上使用这两种技术 您可以使用 Apa
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • 如何使用rest api设置kafka连接auto.offset.reset

    我创建了一个接收器 kafka 连接 将数据转换为其他存储 我想设置auto offset reset as latest当新连接器创建时kafka connect rest api 我已经设定consumer auto offset re
  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • Apache Kafka Streams 将 KTable 物化到主题似乎很慢

    我正在使用 kafka 流 并试图将 KTable 具体化为一个主题 它有效 但似乎每 30 秒左右完成一次 Kafka Stream 如何 何时决定将 KTable 的当前状态具体化为主题 有没有什么办法可以缩短这个时间 让其更加 实时
  • 使用 kafka java api 的 Avro 序列化器和反序列化器

    Kafka Avro 序列化器和反序列化器无法工作 我尝试使用 kafka 控制台消费者消费消息 我可以看到发布的消息 public class AvroProducer
  • 将数据从 Kafka 存储传输到 Kafka 主题

    我想在卡夫卡做这样的事情 继续将数据存储在 KStream Ktable Kafka store 中 当我的应用程序收到特定事件 数据时 仅将上述存储中的特定数据集发送到主题 我们可以在卡夫卡中做到这一点吗 我认为单独使用 Kafka 消费
  • 卡夫卡主题查看器? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想调试一些 Kafka 主题 这样我就知道消费者或生产者是否有问题 Kafka 是否有一个 UI 我
  • 在 JAX-WS 中使用安全性的最佳实践是什么

    这是场景 我有一些需要保护的 Web 服务 JAX WS 目前 为了身份验证需求 我提供了额外的 SecurityWService 它为授权用户提供了一些需要在请求其他服务时描述的 userid 和 sessionid 使用一些java安全
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153
  • 当我重新运行 Flink 消费者时,Kafka 再次消费最新消息

    我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者 每当我从某个主题传递一些消息时 它就会及时接收它们 但是 当我重新启动使用者时 它不会接收新的或未使用的消息 而是使用发送到该主题的最新消息 这
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或
  • Kafka不启动空白输出

    我正在努力安装 Kafka 和 Zookeeper 我已经运行了 Zookeeper 并且它当前正在运行 我将所有内容设置为 https dzone com articles running apache kafka on windows

随机推荐

  • 如何在录制时将音频添加到视频中[连续捕获活动] [Grafika]

    我使用实现视频录制ContinuousCaptureActivity java 工作完美 现在我想在此视频中添加音频 我知道使用媒体混合器可以在视频中添加音频 但问题是我不知道如何使用MediaMuxer 另外 如果您有任何其他解决方案而无
  • R,将多行文本数据框合并到一个单元格中

    我有一个如下所示的文本数据框 gt nrow gettext df 1 3 gt gettext df gettext 1 hello 2 Good to hear back from you 3 I ve currently writte
  • git github无法推送到原点

    我可能遗漏了一些东西 但我确信我已经检查了所有内容 我分叉了一个存储库并将其克隆到我的系统上 做了一些改变 提交后 做过git push origin master it says fatal remote error You can t
  • 如何让AngularJS编译指令生成的代码?

    请帮助我 我们如何让 AngularJS 编译指令生成的代码 您甚至可以在这里找到相同的代码 http jsbin com obuqip 4 edit HTML div names 0 names 1 br div
  • 异常 android.support.multidex.MultiDexApplication 无法转换类

    我有一个问题 我的应用程序生成此异常 但我不明白 我已经在 build gradle 中实现了 multiDexEnabled Caused by java lang ClassCastException android support m
  • 将 Log4j 输出写入 HDFS

    有没有人尝试过write log4j日志档案直接地to Hadoop分布式文件系统 如果是 请回复如何实现这一点 我想我必须为它创建一个Appender 是这样吗 我的需要是以特定的时间间隔将日志写入文件 并在稍后阶段查询该数据 我建议使用
  • 从 C# 中的字符串文件路径中删除多余的反斜杠“\”

    如何转换 String path C Abc Omg Why Me into String path C Abc Omg Why Me 我的做法是首先reverse the string进而移除所有 the 直到我们得到first char
  • IntelliJ Idea 中无副作用方法的未使用结果的警告

    当我不分配结果时BigDecimal divide 方法到变量时 我从 IntelliJ Idea 收到了一个很好的警告 BigDecimal divide 的结果被忽略 我可以以某种方式为我自己的 无副作用 函数获得相同的警告吗 比如为我
  • 创建常量字典对象

    我想完成一些类似于这篇文章中所做的事情 Objective C 中的常量 但是 我想构造一个 NSDictionary 如果我做类似的事情 常量 h extern NSArray const mFooKeys extern NSArray
  • 如何让matplotlib准确放置线条?

    默认情况下 matplotlib 绘图可能会非常不准确地放置线条 例如 请参阅附图中左端点的位置 至少有一整像素的空气是不应该存在的 事实上 我认为线中心偏离了 2 个像素 如何让matplotlib绘图准确 我不介意性能是否会受到影响 m
  • 通过成员数据在向量中搜索结构体项

    我对 C 非常陌生 我正在尝试找到一种方法来搜索结构向量以查找具有特定成员数据的结构 我知道这适用于向量中的简单类型 std find vector begin vector end item vector end 但假设我有一个这样的结构
  • 使用 JavaScript 更改标签文本

    为什么以下内容对我不起作用
  • Angular2 中的更改不会更新视图

    我已经开始探索 Angular2 我带着 Angular1 和一些 React 背景而来 但我遇到了一个问题 我想将某些击键绑定到组件中的操作 因此我决定使用 Angular2 生命周期来绑定 取消绑定操作 但是 如果我在 Mousetra
  • 如何比较两个 NSMutableArray?

    如何比较两个 NSMutableArray 如果两者相同则返回 true 否则返回 false 谢谢 return array1 isEqualToArray array2 returns YES如果数组相等 则返回NO
  • 将 matplotlib 图形嵌入到 iPython HTML 中

    我想在 Jupyter Notebook 中使用代码单元动态编写和显示 HTML 目标是生成 HTML 以我选择的某种方式显示 table div img 标签 我想捕获 img 数据并将其放置在自动生成的 HTML 中我想要的位置 到目前
  • 摆脱内联块图像下方的空间[重复]

    这个问题在这里已经有答案了 如何消除图像底部和包装器之间的空间 同时保持图像作为内联块 为什么会发生这种情况 http jsfiddle net dJVxb 2 HTML div img src https twimg0 a akamaih
  • Jquery 在按钮上打开弹出窗口,单击以进行引导

    当我点击下面的按钮时
  • 使用 Selenium IDE 访问 JavaScript 变量

    我想知道是否可以使用 Selenium 访问页面 JavaScript 变量 我有一个应用程序使用附加到窗口对象的变量 它具有全球范围 我可以通过以下方式访问它window myvar window myvar myvar this myv
  • 如何获取Azure网站中的本地文件系统路径

    我在磁盘上托管了一个文件以及我想要读取的网站 当我使用 System Environment CurrentDirectory 时 不确定如何访问该文件 它指向 D 驱动器位置 有人可以告诉我如何才能访问该文件吗 访问存储在我的网站托管位置
  • 卡夫卡主题_授权_失败

    我实际上正在努力使用 SASL 纯文本设置简单的 Kafka 身份验证并添加 ACL 授权 但当我尝试使用数据时遇到问题 main INFO org apache kafka common utils AppInfoParser Kafka