Kafka - 无法建立与节点-1的连接

2024-01-25

我正在尝试使用 apache flink 流处理 kafka 主题。 但我遇到了这个问题。

2018-04-10 02:55:59,856|- ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 1
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2018-04-10 02:56:00,052|- The configuration 'auto.create.topics.enable' was supplied but isn't a known config.
2018-04-10 02:56:00,064|- Kafka version : 1.0.0
2018-04-10 02:56:00,064|- Kafka commitId : aaa7af6d4a11b29d
2018-04-10 02:56:40,064|- ConsumerConfig values: 
    auto.commit.interval.ms = 1000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = engine-kafka-consumer
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 30000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2018-04-10 02:56:40,073|- ConsumerConfig values: 
    auto.commit.interval.ms = 1000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = engine-kafka-consumer
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 30000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2018-04-10 02:56:40,079|- ConsumerConfig values: 
    auto.commit.interval.ms = 1000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = engine-kafka-consumer
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 30000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2018-04-10 02:56:40,082|- ConsumerConfig values: 
    auto.commit.interval.ms = 1000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = engine-kafka-consumer
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 30000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2018-04-10 02:56:40,783|- Kafka version : 1.0.0
2018-04-10 02:56:40,783|- Kafka commitId : aaa7af6d4a11b29d
2018-04-10 02:56:40,818|- Kafka version : 1.0.0
2018-04-10 02:56:40,819|- Kafka commitId : aaa7af6d4a11b29d
2018-04-10 02:56:40,819|- Kafka version : 1.0.0
2018-04-10 02:56:40,819|- Kafka commitId : aaa7af6d4a11b29d
2018-04-10 02:56:40,820|- Kafka version : 1.0.0
2018-04-10 02:56:40,820|- Kafka commitId : aaa7af6d4a11b29d
2018-04-10 02:56:41,906|- [Consumer clientId=consumer-2, groupId=engine-kafka-consumer] Connection to node -1 could not be established. Broker may not be available.
2018-04-10 02:56:41,925|- [Consumer clientId=consumer-4, groupId=engine-kafka-consumer] Connection to node -1 could not be established. Broker may not be available.
2018-04-10 02:56:41,931|- [Consumer clientId=consumer-1, groupId=engine-kafka-consumer] Connection to node -1 could not be established. Broker may not be available.
2018-04-10 02:56:41,948|- [Consumer clientId=consumer-3, groupId=engine-kafka-consumer] Connection to node -1 could not be established. Broker may not be available.
2018-04-10 02:56:42,013|- [Consumer clientId=consumer-2, groupId=engine-kafka-consumer] Connection to node -1 could not be established. Broker may not be available.

以下是我使用的版本。

// kafka
 "org.apache.kafka" %% "kafka" % "1.0.0"
 "net.manub" %% "scalatest-embedded-kafka" % "0.14.0" % Test

//flink
"org.apache.flink" %% "flink-scala" % "1.4.2"
"org.apache.flink" %% "flink-streaming-scala" % "1.4.2"
"org.apache.flink" %% "flink-connector-kafka-0.9" % "1.4.0"
"org.apache.flink" %% "flink-connector-cassandra" % "1.4.2"

我遇到了同样的问题。当消费者运行另一台计算机而不是kafka服务器时,就会发生这种情况。请修改
配置/服务器.属性 . 听众=PLAINTEXT://serverip:9092.

注意:severip 无法设置为 127.0.0.1 或 localhost,应设置为消费者可以连接的 IP。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka - 无法建立与节点-1的连接 的相关文章

随机推荐

  • 如何在 C# 中将 XML 元素反序列化为具有属性和文本的元素数组?

    我在尝试反序列化 XML 时遇到问题
  • WPF Prism - 在哪里放置资源?

    我有一个棱镜应用程序和各种模块 我想知道哪里是找到样式 画笔 控件模板 数据模板等资源的最佳位置 我应该制作一个资源字典并将所有内容都放在那里吗 每个模块应该有自己的资源吗 还是每个视图 我想遵循 Prism 保持一切模块化的目标 但我也不
  • 使用 javascript 库跟踪用户活动 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 是否可以跟踪用户在网页上的每个操作并创建日志 这个想法是通过 AJAX 将用户操作日志传输到服务器并保
  • Maven WAR 依赖项 - 无法解析包?

    我有战争依赖 性
  • 辞去 ScrollView Touch 上的第一响应者职务

    如何在 ScrollView 触摸事件上隐藏键盘 场景是这样的 gt 视图 gt 滚动视图 gt 文本字段 我想在触摸滚动视图时隐藏键盘 我尝试覆盖滚动视图的类 但仍然无法做到 这样做会有所帮助 interface MyClass
  • 纯分裂的熵计算为 NaN

    我编写了一个函数来计算向量的熵 其中每个元素代表一个类的元素数量 function x Entropy a t sum a t repmat t 1 size a 2 x sum a t log2 a t end e g a 4 0 the
  • 如何在 jsonp ajax 调用中使用 type: "POST"

    我正在使用 JQuery ajax jsonp 我有下面jQuery 代码 ajax type GET url Login aspx Send the login info to this page data str dataType js
  • .erb 、 .rhtml 和 .html.erb 之间有什么区别?

    erb rhtml 和 html erb 之间有什么区别 真的没什么 这只是 Rails 1 和 Rails 2 之间理念的改变 在 Rails 2 之前 有 file rhtml file rxml 和 file rjs 在 Rails
  • 如何生成彼此不相交的正方形(随机定位、大小相等、随机旋转)?

    我一直致力于在 1x1 网格上生成一层随机旋转并放置的正方形 我已经能够生成在网格上随机放置和旋转的单个正方形 但我不确定如何改进代码以生成更多彼此不相交的随机正方形 当前代码如下所示 我的一个随机正方形的示例 https i stack
  • 如何正确重写克隆方法?

    我需要在我的一个没有超类的对象中实现深度克隆 处理检查的最佳方式是什么CloneNotSupportedException由超类抛出 即Object 一位同事建议我按以下方式处理 Override public MyObject clone
  • 通过 Groovy XML 解析器使用字符串作为代码

    我是 groovy 的新手 我希望这是一个简单的问题可以解决 我正在读取 xml 文档 然后我可以访问如下数据 def root new XmlParser parseText xmlString println root foo bar
  • 如何禁用 Google 跟踪代码管理器控制台日志记录

    将 Google 跟踪代码管理器添加到项目后 我在控制台中看到了很多日志条目 有办法禁用它吗 控制台日志充满了噪音 GoogleTagManager info Processing logged event vs with paramete
  • Node.js ENOENT 读取 PDF 文件

    我需要阅读 pdf 文件并使用pdf text extract 它在我的本地主机上完美运行 但是当我尝试在服务器上运行该程序时 出现以下错误 spawn called 0 pdftotext 1 layout enc UTF 8 tmp t
  • Keychain 中存储的字符串有长度限制吗?

    我想在iOS上将一些用户信息作为字符串存储在Keychain中 那么Keychain中的字符串有长度限制吗 Thanks 我组装了一个 iOS 应用程序 可以使用以下命令进行二分搜索这个图书馆 https github com kishik
  • twig - 将函数传递到模板中

    目前 我将函数放在一个类中 并将该类的实例传递到模板中 并将所需的函数作为类方法调用 unneededclass blah 我需要像下面这样做 blah 是否可以 更新 2015 年 5 月 14 日 评论者指出我大部分都是错的 如果您确实
  • 如何合并两个UIImage?

    我正在尝试合并两个不同的图像并创建一个新的图像 这就是我想做的方式 我有这张图片 A 这是一张 PNG 图像 我想将其与我从手机中获取的另一张图像 B 合并 以创建如下所示的图像 我需要一个将 A 与 B 合并创建 C 的函数 尺寸必须保留
  • Nuget Pack 不支持程序集版本上的位数

    I need nuget pack生成只有 3 位数字的包版本 我们想对其进行语义版本控制 但是当我在具有AssemblyVersion属性设置为 1 0 0 生成的 nupkg 文件在其元数据 和文件名 中以版本 1 0 0 0 结尾 为
  • 如何查看tomcat的容器日志?

    如何查看tomcat的容器日志 我从 catalina out 日志中收到此错误 SEVERE localhost startStop 1 org apache catalina core StandardContext startInte
  • 在 SQL Server 中,一行的 %%physloc%% 值总是相同吗?

    我一直在探索它的实际用途 physloc 伪列作为行标识符 不幸的是 我一直无法找到官方文档 physloc 我需要知道这个值是否会改变 我做了一些测试 看起来相当静态 即使我创建数据库的备份并在不同的服务器上恢复 physloc 每行保持
  • Kafka - 无法建立与节点-1的连接

    我正在尝试使用 apache flink 流处理 kafka 主题 但我遇到了这个问题 2018 04 10 02 55 59 856 ProducerConfig values acks 1 batch size 16384 bootst