通过使用 Docker-Compose 和 Spring Boot 运行的 IP 地址访问远程主机中的 Kafka

2024-03-01

我有一个 docker-compose.yml 在其中运行动物园管理员, Kafka, 卡夫卡连接, and KafDrop,问题是,当我在本地运行时,我可以从我的春季启动应用程序来消费一些主题消息。

我需要的是在 Linux 机器上运行相同的配置,并能够以相同的方式从 Spring Boot 应用程序进行连接。

当在 Linux 机器上远程运行它时,一切似乎都运行正常,但是当我尝试从 Spring Boot 应用程序连接时,我收到一些错误,表明连接中出现了问题。

我将尝试逐步解释,看看是否有人可以对此进行“说明”:

docker-compose.yml:

version: '3'

services:

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    networks: 
      - broker-kafka
    ports:
      - 2181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    networks: 
      - broker-kafka
    restart: unless-stopped
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: 
         INTERNAL://kafka:29092,
         EXTERNAL://localhost:9092
      KAFKA_ADVERTISED_LISTENERS: 
         INTERNAL://kafka:29092,
         EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
         INTERNAL:PLAINTEXT,
         EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_RETENTION_HOURS: 12
    
  connect:
    image: cdc:latest
    networks: 
      - broker-kafka
    depends_on:
      - zookeeper
      - kafka
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: connect-1
      CONNECT_CONFIG_STORAGE_TOPIC: connect-1-config
      CONNECT_OFFSET_STORAGE_TOPIC: connect-1-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-1-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_OFFSET.STORAGE.REPLICATION.FACTOR: 1
      CONNECT_CONFIG.STORAGE.REPLICATION.FACTOR: 1
      CONNECT_OFFSET.STORAGE.PARTITIONS: 1
      CONNECT_STATUS.STORAGE.REPLICATION.FACTOR: 1
      CONNECT_STATUS.STORAGE.PARTITIONS: 1
      CONNECT_REST_ADVERTISED_HOST_NAME: localhost
      
  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    networks: 
      - broker-kafka
    depends_on:
      - kafka
    ports:
      - 19000:9000
    environment:
      KAFKA_BROKERCONNECT: kafka:29092
      
networks: 
  broker-kafka:
    driver: bridge

我需要的是向我的网络公开此 IP 机器,以便我的 Spring Boot 应用程序访问。 假设这台 Linux 机器的 IP 为 10.12.54.99。 如何使 Kafka 可以通过以下方式访问: 10.12.54.99:9090 ?

这是我的 application.properties:

spring.kafka.bootstrap-servers=10.12.54.99:9092

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.group-id=connect-sql-server
spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.listener.ack-mode=manual-immediate
spring.kafka.listener.poll-timeout=3000
spring.kafka.listener.concurrency=3

spring.kafka.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.properties.spring.deserializer.value.delegate.class=org.apache.kafka.common.serialization.StringDeserializer

这是唯一特定于消费者的应用程序(此处不使用生产者)。

当我运行应用程序时:

2020-12-07 10:59:40.361  WARN 58716 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-connect-sql-server-1, groupId=connect-sql-server] Connection to node -1 (/10.12.54.99:9092) could not be established. Broker may not be available.
2020-12-07 10:59:40.362  WARN 58716 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-connect-sql-server-1, groupId=connect-sql-server] Bootstrap broker 10.12.54.99:9092 (id: -1 rack: null) disconnected

Linux 防火墙机器中的所有防火墙端口均已启用。

任何启发将不胜感激。


您需要绑定服务器的公共IP才能远程访问代理。但是,如果您不想硬编码 IP,则可以使用 .env 文件。

请执行下列操作:

  1. 创建 config.env 文件。

  2. 在 config.env 中添加这一行并添加您的主机 IP,如下所示:

    DOCKER_HOST_IP=111.111.11.111

  3. 更新你的 docker-compose:

version: '3'

services:

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    networks: 
      - broker-kafka
    ports:
      - ${DOCKER_HOST_IP:-127.0.0.1}:2181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    networks: 
      - broker-kafka
    restart: unless-stopped
    depends_on:
      - zookeeper
    ports:
      - ${DOCKER_HOST_IP:-127.0.0.1}:9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: 
         INTERNAL://kafka:29092,
         EXTERNAL://localhost:9092
      KAFKA_ADVERTISED_LISTENERS: 
         INTERNAL://kafka:29092,
         EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
         INTERNAL:PLAINTEXT,
         EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_RETENTION_HOURS: 12
    
  connect:
    image: cdc:latest
    networks: 
      - broker-kafka
    depends_on:
      - zookeeper
      - kafka
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: connect-1
      CONNECT_CONFIG_STORAGE_TOPIC: connect-1-config
      CONNECT_OFFSET_STORAGE_TOPIC: connect-1-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-1-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_OFFSET.STORAGE.REPLICATION.FACTOR: 1
      CONNECT_CONFIG.STORAGE.REPLICATION.FACTOR: 1
      CONNECT_OFFSET.STORAGE.PARTITIONS: 1
      CONNECT_STATUS.STORAGE.REPLICATION.FACTOR: 1
      CONNECT_STATUS.STORAGE.PARTITIONS: 1
      CONNECT_REST_ADVERTISED_HOST_NAME: localhost
      
  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    networks: 
      - broker-kafka
    depends_on:
      - kafka
    ports:
      - 19000:9000
    environment:
      KAFKA_BROKERCONNECT: kafka:29092
      
networks: 
  broker-kafka:
    driver: bridge

如果未找到 DOCKER_HOST_IP,它将绑定到 127.0.0.1。

  1. 运行以下命令:
sudo docker-compose -f path-to-docker-compose.yml --env-file path-to-config.env up -d --force-recreate
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

通过使用 Docker-Compose 和 Spring Boot 运行的 IP 地址访问远程主机中的 Kafka 的相关文章

随机推荐

  • 使用 C# 确定谁打开了文件

    使用 C 如何获取有关谁打开了文件的信息 用户名和机器名就足够了 以防万一 我有 Windows 工作站通过 Samba 访问 Linux 文件服务器上的文件 我在工作站上运行的程序中需要此信息 核心 NET 库没有任何方法可以做到这一点
  • 访问指针指向的整数数组时,“sizeof”对不完整类型“int[]”的无效应用

    我正在尝试学习 C 中的指针 并正在编写这个小整数数组指针练习 但遇到了无效的应用程序sizeof不完整类型int 问题 请告诉我哪里出了问题以及如何解决 谢谢 include
  • 在非标准位置安装带有库的 sf 包

    所需的库位于非标准位置 我可以通过以下命令安装 rgdal install packages rgdal type source configure args c with gdal config home programs anacond
  • 您会使用 实现轻量级 XML 解析器吗?

    如果您必须实现一个轻量级 XML 解析器 您会选择使用正则表达式吗 在我的例子中 XML 解析是最简单的 只有标签和文本内容 没有命名空间 没有属性 没有模式支持 当然是在一开始 但也许 我认为学习新的 C 0x 库对我来说是一个很好的练习
  • 将日期中的 NA 替换为另一个日期

    Data DB1 lt data frame orderItemID 1 10 orderDate c 2013 01 21 2013 03 31 2013 04 12 2013 06 01 2014 01 01 2014 02 19 20
  • WAMP服务器呈绿色但只得到404

    好吧 女士们先生们 我有一个很令人困惑的问题 我在工作中的 WIN7 机器上安装了 WAMP 服务器 一切都工作正常 有几个星期没有使用它 因为我被其他事情吸引了 有一天 我尝试启动它 图标是绿色的 我认为我们做得很好 然后我尝试打开 lo
  • 邮件脚本 - 解析错误:语法错误,意外的“=”[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我正在尝试从包含下拉列表的 html 表单设置一个简单的邮件脚本 但在声明所选变量的行上我收到一条错误消息 解析错误 语法错误 意外的 我
  • 如何手动解密 EncryptedAssertion

    我想解密 EncryptedAssertion 我尝试使用 OpenSaml Decrypter 但它对我不起作用 我无法解密 EncryptedData 我已经问过这个问题了 EncryptedAssertion 解密失败 https s
  • Selenium:遍历元素列表

    我正在使用 XPath CSS 和 Selenium 来定位网站上的元素 我想创建一种方法 在该方法中迭代定位器列表 XPath CSS 程序选择有效的一个 换句话说 它从定位器一开始 如果定位器存在 则返回 true 并存在循环 否则 它
  • winHTTP GET 请求 C++

    我就开门见山吧 这就是浏览器请求的样子 获取 index html HTTP 1 1 这就是 winHTTP 的作用 GET http site com index html http site com index htmlHTTP 1 1
  • Java用户类

    如何解析java用户类和JDBC用户类 问题是当我用完 put 时 sql 中有数据 例如 public User authenctication String eMail String password try con DriverMan
  • String 是关于 switch 的数字类型并且总是编译为 LookupSwitch 吗?

    以下代码返回是否给定String s等于任何其他硬编码字符串 该方法使用switch 这样做的声明 public class SwitchOnString public static boolean equalsAny String s s
  • 在 Laravel 中使用自动控制器路由是一个坏主意

    我从 CodeIgniter 转到 Laravel 那么 使用自动路由到所有控制器是一个坏主意吗 Route controller Controller detect 我应该使用它而不是在routes php 中创建路由吗 是的 这很糟糕
  • 应该如何使用 std::Optional?

    我正在阅读以下文档std experimental optional http en cppreference com w cpp utility optional我很清楚它的作用 但我不明白when我应该使用它或者我应该如何使用它 该网站
  • 如何在perl中将十六进制转换为字符字符串

    我需要将 xx 十六进制字符更改为字符 我正在尝试使用此代码 但它不起作用 usr bin perl w my cadena 40 61 62 print cadena n cadena s g print cad cadena n my
  • 使用 opencv 的 Android 文档扫描仪

    我正在尝试在我的应用程序中开发文档扫描仪 作为我想要实现的目标的一个示例 您可以看一下 Google 云端硬盘应用程序中内置的 Google 文档扫描仪 这允许 检测边缘 操纵透视 显示文档的自上而下视图 我一直在研究一些第三方 api 看
  • android:媒体记录器:启动失败:-38

    简介 如何检查录音是否已在其他应用程序的后台运行 详细信息 如果录音已在本机应用程序的后台运行 录音机 现在我已将录音作为我的应用程序中的功能之一 问题 当我尝试同时在我的应用程序中录制时 出现错误 E MediaRecorder star
  • 子类化 swift 通用可解码类型

    EDIT As 罗布 纳皮尔写道 https stackoverflow com a 49540809 9565342 Xcode 9 2 中存在该问题 在 Xcode 9 3 中 该问题不再相关 我的服务器 json 响应都打包在里面da
  • 检查类的实例是否存在,如果不存在则创建实例

    我想知道是否可以创建一个函数并向其传递一个类名 然后该函数检查当前类的实例是否存在 如果不存在 则创建该类的实例 此外 如果可能的话 将该变量设置为全局变量并要求将其返回 我意识到 回国可能是唯一的选择 function class nam
  • 通过使用 Docker-Compose 和 Spring Boot 运行的 IP 地址访问远程主机中的 Kafka

    我有一个 docker compose yml 在其中运行动物园管理员 Kafka 卡夫卡连接 and KafDrop 问题是 当我在本地运行时 我可以从我的春季启动应用程序来消费一些主题消息 我需要的是在 Linux 机器上运行相同的配置