在建立安全 TLS 连接之前,客户端网络套接字已断开连接。如何在 Node js 中使用 Kafka JS 连接到 kafka 集群?

2024-02-13

我获得的凭据:-(我也有kafka.keystore.jks and kafka.truststore.jks)

host: xxxxx-xxxxx-x.cloudclusters.net
port: xxxxx
ip: xxx.xxx.xxx.xx
trustore pw: xxxxxxxx
keystore pw: xxxxxxxx

我认为我没有使用提供的所有凭据。

import * as dotenv from 'dotenv' 
import express from 'express'
import { Kafka } from 'kafkajs';
import { Partitioners } from 'kafkajs';
import jks from 'jks-js';
import fs from 'fs';

const keystore = jks.toPem(
    fs.readFileSync('./kafka.keystore.jks'),
    'mypassword'
);

const trustore = jks.toPem(
    fs.readFileSync('./kafka.truststore.jks'),
    'mypassword'
);

const { 
  caroot: {ca},
   localhost: {key,cert} } = keystore;
// const { caroot: {ca} } = trustore;

console.log("**************** kafka.keystore.jks ****************");
// console.log(keystore)

console.log("ca ===>", ca);
console.log("key ===>", key);
console.log("cert ===>", cert);

console.log("**************** kafka.truststore.jks ****************");


// setting up kafka
const kafka = new Kafka({
  clientId: 'qa-topic',
  brokers: ['xxxxxxx.cloudclusters.net:xxxxx'], //HOST:PORT
  ssl: {
    rejectUnauthorized: false,
    ca: ca,
    key: key,
    cert: cert
  },
 
})


const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner })

producer.on('producer.connect', () => {
  console.log(`KafkaProvider: connected`);
});
producer.on('producer.disconnect', () => {
  console.log(`KafkaProvider: could not connect`);
});
producer.on('producer.network.request_timeout', (payload) => {
  console.log(`KafkaProvider: request timeout ${payload.clientId}`);
});

const run = async () => {
  // Producing
  await producer.connect()
  await producer.send({
    topic: 'supplier-ratings',
    messages: [
      {
        value: Buffer.from(JSON.stringify(
          {
            "event_name": "QA",
            "external_id": user_uuiD,
            "payload": {
              "supplier_id": i.supplier_id,
              "assessment": {
                "performance": 7,
                "quality": 7,
                "communication": 7,
                "flexibility": 7,
                "cost": 7,
                "delivery": 6
              }
            },
            "metadata": {
              "user_uuid": "5a12cba8-f4b5-495b-80ea-d0dd5d4ee17e"
            }
          }
        ))
      },
    ],
  })

  //Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

const port = process.env.PORT || 5000;
app.listen(port, () => {
  console.log(`I am listening at ${port}`);
});

我已经得到了ca,key,cert从我的kafka.keystore.jks。我将它们传递给SSL对象根据文档 https://kafka.js.org/docs/configuration#ssl。但仍然得到Client network socket disconnected before secure TLS connection was established error.

我无法与 kafka 集群建立连接。我相信我丢失了一些钥匙。我正在关注Kafka.js 的文档 https://kafka.js.org/docs/getting-started.


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在建立安全 TLS 连接之前,客户端网络套接字已断开连接。如何在 Node js 中使用 Kafka JS 连接到 kafka 集群? 的相关文章

随机推荐