我有一个 GCP Dataproc 集群,我正在尝试部署一个 pyspark 作业,该作业使用 SSL 生成一个主题。
pem 文件存储在存储桶 gs://dataproc_kafka_code/code 中,我正在使用下面所示的代码访问 pem 文件。
但是,代码无法找到pem文件,错误如下所示:
%3|1638738651.097|SSL|rdkafka#producer-1| [thrd:app]: error:02001002:system library:fopen:No such file or directory: fopen('gs://dataproc_kafka_code/code/caroot.pem','r')
%3|1638738651.097|SSL|rdkafka#producer-1| [thrd:app]: error:2006D080:BIO routines:BIO_new_file:no such file
Traceback (most recent call last):
File "/tmp/my-job6/KafkaProducer.py", line 21, in <module>
producer = Producer(conf)
cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Failed to create producer: ssl.ca.location failed: error:0B084002:x509 certificate routines:X509_load_cert_crl_file:system lib"}
Code :
from confluent_kafka import Producer
kafkaBrokers='<host>:<port>'
# CA Root certificate ca.crt
caRootLocation='gs://dataproc_kafka_code/code/caroot.pem'
# user public (user.crt)
certLocation='gs://dataproc_kafka_code/code/my-bridge-user-crt.pem'
# user.key
keyLocation='gs://dataproc_kafka_code/code/user-with-certs.pem'
password='<password>'
conf = {'bootstrap.servers': kafkaBrokers,
'security.protocol': 'SSL',
'ssl.ca.location':caRootLocation,
'ssl.certificate.location': certLocation,
'ssl.key.location':keyLocation,
'ssl.key.password' : password
}
topic = 'my-topic'
producer = Producer(conf)
for n in range(100):
producer.produce(topic, key=str(n), value=" val -> "+str(n*(-1)) + " on dec 5 from dataproc ")
producer.flush()
需要做什么来解决这个问题?
另外,这是提供对 SSL 证书的代码访问的正确方法吗?
tia!