需求:
1. kafka server已经配置完全,且设定了访问限制
基于这一点,必须要设定认证,及预先分配的账号密码
2. 由于项目开发环境是java,且不允许使用LogStash
基于这一点,必须实现一个java版的producer
先贴一份代码,本地运行通过,祛除不相干部分之后的代码(未验证):
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Producer {
private static Pruducer instance = null;
private final KafkaProducer<String, String> producer;
private final static String TOPIC = "your_topic";
private Producer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafkaserver:9110");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty ("sasl.mechanism", "PLAIN");
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required\n" +
"username=\"count\"\n" +
"password=\"countxxxxx\";");
producer = new KafkaProducer<String, String>(properties);
}
public static KafkaProducerClient getInstance() {
if (instance == null) {
instance = new KafkaProducerClient();
}
return instance;
}
public void send(String key, String msg) {
producer.send(new ProducerRecord<String, String>(TOPIC, key, msg));
}
}
consumer 实现和procuder差不多,很容易实现。
写这篇文章的主要目的在于:认证。
根据apache官网文档看,有两种认证方式,其一是设置java.security.auth.login.config属性,其二则是设置sasl.jaas.config属性(如代码示例)。
第一种方式运行JVM时添加参数:-Djava.security.auth.login.config=
/tmp/kafka_client_jaas
.conf
kafka_client_jaas
.conf 文件内容如下:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
username="count"
password="countxxxx";
};
注意:配置文件中的关键词不可随意更改,而且大小写敏感。
网上好多说System.setProperty("java.security.auth.login.config", "/tmp/kafka_client_jaas
.conf
")也可以达到设置目的。但经过测试无奈的发现,sdk可以获取到设置的property,但不知为何总是报告无法解析文件。不知道到同行们怎么做到的。
但由于这种方式时全局设置,可能会对服务器造成不可预估的影响,故而舍弃。
第二种方式不需要添加其他任何东西,只需要在代码中设置即可。填写sasl.jaas.config的值时一定要注意分号的转义和换行符的使用,错误的话解析依然失败。
同时若两种都设置了,那么以sasl.jaas.config为主。官方解析如下:
If both static JAAS configuration system property java.security.auth.login.config
and client property sasl.jaas.config
are specified, the client property will be used.
由于时初次接触这部分,着实浪费了不少时间,特写此文章,以示警戒。
参考网址:
https://kafka.apache.org/0110/documentation.html#security_sasl