为什么 kafka-python 无法连接到 Bluemix 消息中心服务?

2024-02-25

我正在尝试连接到 Bluemix Message Hub 实例http://bluemix.net http://bluemix.net。这个简单的脚本

#!/usr/bin/env python 

from kafka import KafkaProducer 
from kafka.errors import KafkaError 

kafka_brokers_sasl = [
  "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093",
  "kafka02-prod01.messagehub.services.us-south.bluemix.net:9093",
  "kafka03-prod01.messagehub.services.us-south.bluemix.net:9093",
  "kafka04-prod01.messagehub.services.us-south.bluemix.net:9093",
  "kafka05-prod01.messagehub.services.us-south.bluemix.net:9093" ] 
sasl_plain_username = "xxxxxxxxxxxxxxx" 
sasl_plain_password = "xxxxxxxxxxxxxxxxxxxxxxxxx" 
sasl_mechanism = 'SASL_PLAINTEXT' 

producer = KafkaProducer(bootstrap_servers = kafka_brokers_sasl,
                         sasl_plain_username = sasl_plain_username,
                         sasl_plain_password = sasl_plain_password,
                         sasl_mechanism = sasl_mechanism ) 

以以下例外结束:

Traceback (most recent call last): 
  File "./test-mh.py", line 12, in <module> 
    producer = KafkaProducer(bootstrap_servers = kafka_brokers_sasl, sasl_plain_username = sasl_plain_username, sasl_plain_password = sasl_plain_password, sasl_mechanism = sasl_mechanism ) 
  File "/usr/local/lib/python2.7/dist-packages/kafka/producer/kafka.py", line 328, in __init__ 
    **self.config) 
  File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 202, in __init__ 
    self.config['api_version'] = self.check_version(timeout=check_timeout) 
  File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 791, in check_version 
    raise Errors.NoBrokersAvailable() 
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

我从 messagehub 服务凭证对象中获取了 kafka_brokers_sasl、sasl_plain_username 和 sasl_plain_password。 我正在使用 kafka-python 1.3.1,它似乎支持 SASL 身份验证机制。 知道我做错了什么吗?谢谢。


Message Hub 要求客户端使用 TLS 1.2 连接进行连接。这意味着指定一个security_protocol参数为KafkaProducer还有一个ssl.SSLContext通过ssl_context参数 - 看来 Python Kafka 客户端创建了一个SSLv23默认情况下的上下文。

以下是连接所需的更改:

import ssl
from kafka import KafkaProducer 
from kafka.errors import KafkaError 

kafka_brokers_sasl = [
    "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093",
    "kafka02-prod01.messagehub.services.us-south.bluemix.net:9093",
    "kafka03-prod01.messagehub.services.us-south.bluemix.net:9093",
    "kafka04-prod01.messagehub.services.us-south.bluemix.net:9093",
    "kafka05-prod01.messagehub.services.us-south.bluemix.net:9093" ] 
sasl_plain_username = "xxxxxxxxxxxxxxx" 
sasl_plain_password = "xxxxxxxxxxxxxxxxxxxxxxxxx" 

sasl_mechanism = 'PLAIN'       # <-- changed from 'SASL_PLAINTEXT'
security_protocol = 'SASL_SSL'

# Create a new context using system defaults, disable all but TLS1.2
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1

producer = KafkaProducer(bootstrap_servers = kafka_brokers_sasl,
                         sasl_plain_username = sasl_plain_username,
                         sasl_plain_password = sasl_plain_password,
                         security_protocol = security_protocol,
                         ssl_context = context,
                         sasl_mechanism = sasl_mechanism)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么 kafka-python 无法连接到 Bluemix 消息中心服务? 的相关文章

随机推荐