利用Nacos服务获取配置逻辑的特点,实现动态配置kafak认证

2023-11-15

我要做什么?

实现Nacos动态配置kafka认证信息,使每个微服务读取同一个kafka配置,并生成文件注入到环境变量中。

为什么要这么做?

首先我们看下

Kafka-java接入demo,如图:

1.prod_client_jaas.conf文件

KafkaClient{
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka_1"
  password="密码";
  };

2.cons_client_jaas.conf

KafkaClient{
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka_1"
  password="密码";
  };

3.producer

package com.sensetime.kafka;
 
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
 
 
 
/**
 * Producer
 *
 */
public class App
{
    public static void main( String[] args )
    {
 
 
        String fsPath=System.getProperty("user.dir");
        System.out.println(fsPath);
        System.setProperty("java.security.auth.login.config", ""+fsPath+"/conf/prod_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
        System.out.println("===================配置文件地址"+fsPath+"/conf/prod_client_jaas.conf");
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:9092,ip:9092,ip:9092");           //此处为kafka接入点
        props.put("acks", "1");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty ("security.protocol", "SASL_PLAINTEXT");
        props.setProperty ("sasl.mechanism", "PLAIN");
        Producer producer = null;
        try {
            producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                String msg = "Message " + i;
                producer.send(new ProducerRecord("testtime", msg));                  //此处为创建的topic
                System.out.println("Sent:" + msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

4.consumer

package com.sensetime.kafka;
 
import java.util.Arrays;import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
 
 
public class Consumer {
    public static void main(String[] args) {
    
        String fsPath=System.getProperty("user.dir");
        System.setProperty("java.security.auth.login.config", ""+fsPath+"/conf/cons_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
        System.out.println("===================配置文件地址"+fsPath+"/conf/cons_client_jaas.conf");
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:9092,ip:9092,ip:9092");           //kafka接入点
        props.put("group.id", "group1");                                                                //创建的group
        props.put("group.name", "1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty ("security.protocol", "SASL_PLAINTEXT");
        props.setProperty ("sasl.mechanism", "PLAIN");
        KafkaConsumer kafkaConsumer = new Kafkansumer<>(props);
        kafkaConsumer.subscribe(Arrays.asList("testtime"));                                           //此处为订阅的topic
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("线程1"+":"+"Partition: " + record.partition() + " Offset: " + record.offset() + " Value: " + record.value() + " ThreadID: " + Thread.currentThread().getId());
            }
 
        }
    }
 
}

这个配置认证方式的痛点在于1: 需要在springboot刚启动还未进行kafka建立连接之前,将认证信息注入到环境变量里边

2:需要每个微服务都要配置认证信息的文件。考虑到我们使用Nacos作为配置中心,我的想法是利用Nacos进行配置认证信息,并在springboot启动后kafka实例化前,读取认证信息,设置到环境变量里。

我是怎么做的?

首先看图所示,springboot启动时候会在refreshContext(context)之前执行初始化applyInitializers,当spring执行这个类PropertySourceBootstrapConfiguration的时候,会执行Nacos的相关获取配置解析配置的方法,所以,我只也搞一个同样的initalizer,并且改initalizer排序在Nacos执行之后,不就解决了吗,对的,就是这样的

 

第一种:手动设置initializer方式实现代码如下:

注意:如果我们自己定义启动执行前的类需如下防范
public class MyApiApplication {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(MyApiApplication.class);
        application.addInitializers(new KafkaSaslConfiguration());
        application.run(args);
    }
}

public class KafkaSaslConfiguration implements
        ApplicationContextInitializer<ConfigurableApplicationContext>, Ordered {

    private static Logger log = LoggerFactory.getLogger(KafkaSaslConfiguration.class);

    private static final String STORE_CONS_CLIENT_JAAS_PATH = "./conf/cons_client_jaas.conf";

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE + 11;
    }

    @Override
    public void initialize(ConfigurableApplicationContext applicationContext) {
        ConfigurableEnvironment environment = applicationContext.getEnvironment();
        String username = environment.getProperty("kafka-sasl-username");
        String password = environment.getProperty("kafka-sasl-password");
        if (StringUtils.isBlank(username) || StringUtils.isBlank(password)) {
            log.error("kafka sasl need kafka-sasl-username and kafka-sasl-password,please set value for Nacos.");
            System.exit(-1);
        }
        String kafkaClient = "KafkaClient{\n" +
                "  org.apache.kafka.common.security.plain.PlainLoginModule required\n" +
                "  username=\"" + username + "\"\n" +
                "  password=\"" + password + "\";\n" +
                "  };";

        String clientPath = System.getProperty("user.dir") + "/conf/cons_client_jaas.conf";
        System.setProperty("java.security.auth.login.config", clientPath);
        ResourceUtils.readStringToDisk(kafkaClient, STORE_CONS_CLIENT_JAAS_PATH);
        log.info("java.security.auth.login.config: {}", clientPath);
    }

}

第二种方法,是利用springboot自动配置原理,进行配置,自动配置原理https://blog.csdn.net/luman1991/article/details/106668582

# Initializers
org.springframework.context.ApplicationContextInitializer=\
com.项目包.KafkaSaslConfiguration

认真写写博客,写写生活点滴

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

利用Nacos服务获取配置逻辑的特点,实现动态配置kafak认证 的相关文章

随机推荐

  • Loughran&McDonald金融文本情感分析库

    今天看到一个预测股价的项目 其中用到pysentiment库对金融文本数据进行情感计算 查了下该库的官方文档 发现该库提供了两大情感分析 Harvard IV 4 英文通用情感分析 Loughran MCdonald 英文金融情感分析 py
  • [深度学习]C++调用Python-YOLO模型进行目标检测

    文章目录 前言 C 调用Python的步骤 修改YOLOv5源码 C 读取Python返回值 前言 目前深度学习算法大多数是基于Python实现 但一些项目的框架是用C 搭建 所以就出现了在C 中调用模型的问题 本文主要记录C 调用Pyth
  • Activity使用Dialog样式导致点击空白处自动关闭的问题

    将Activity设置成窗口的样式实现Dialog或者Popupwindow效果在开发中是很常用的一种方式 在AndroidMenifest xml中将需要设置的Activity增加android theme android style T
  • 秒杀系统设计,高并发下的下单功能设计

    来源 常大皮卡丘 blog csdn net u013815546 article details 53928912 如有好文章投稿 请点击 这里了解详情 功能需求 设计一个秒杀系统 初始方案 商品表设计 热销商品提供给用户秒杀 有初始库存
  • Bit、Byte、kb、KB、MB、KiB、MiB各表示什么意思?

    1 Byte bit bit表示 位 是计算机中最小的数据单位 每一位的状态只能是0或1Byte表示 字节 8个二进制位构成1个字节 即1 Byte 8 bit 1个英文字母或者数字占用1个字节的空间 1个汉字占据2个字节的空间 2 Bps
  • [网络安全提高篇] 一一一.ISC会议观后感之网络安全需要新战法和新框架

    这是作者参加第九届互联网安全大会 ISC 2021 未来峰会的观后感 看到了非常多的大佬和师傅 还有许多还有 很多知识都值得我们学习 这篇文章主要介绍360集团创始人 董事长周鸿祎分享的 面向未来的新一代安全能力框架 战略演讲 其大会主题是
  • 什么是index.html? 如何创建和使用index.html?

    Websites provide a lot of web pages with different URLs but there are some defacto pages that exist most of the web site
  • osg学习(四十四)读取earth文件的几种方式

    1 osg Group node MapNodeHelper load arguments viewer 2 osg ref ptr
  • redis数据库hset(有序集合)类型常用命令

    redis数据库hset类型常用命令 1 向有序集合添加一个或多个成员 或者更新已存在成员的分数 zadd key score1 member1 score2 member2 2 获取有序集合的成员数 zcard key 3 计算在有序集合
  • C++之监控文件是否被修改

    软件开发过程中经常会用到配置文件 某些应用场景要求在软件运行时动态修改配置文件 此时就需要监控配置文件是否被修改 如果修改了 重新加载 FileWatcher h ifndef FILEWATCHER H define FILEWATCHE
  • openfire服务器源码,Openfire源码部署以及编译运行.doc

    Openfire源码部署以及编译运行 Openfire源码下载 可以去官方网站 官网地址 projects openfire 也可以利用eclispe自带的SVN插件导入 再次就过多介绍 官网上写的很清楚 源码部署编译 将源码解压到硬盘上
  • git:恢复文件

    如果需要在提交历史中跳转查看某个文件 可以使用 git restore 仅仅恢复工作树为某个提交版本 而不用切换分支 HEAD 仍然保持不变 假设现在git仓库如图 git restore 命令用于从 index 或某个 commit 恢复
  • dedecms的图片轮换

    思路 在dedecms中引进js和css要用 dede global cfg templets skin 引入文件用 dede include filename head htm 当你点图片的时候会到那一篇文章中 他用到的是dede arc
  • 对于女生来说,软件测试和前端,学哪一个更好啊

    其实前端和软件测试都算是对新手比较友好的学科了 而且是两个女生选择相对比较多的学科 简单好学要看你是从哪方面来考虑 至于哪家培训机构好 这个还是要看你自己的综合考量 选择学科还是要综合考量一下 发展前景 学习内容 发展方向 薪资 自己兴趣
  • 华为慧通真相--关联企业迷局

    华为慧通真相 关联企业迷局 回顾我自己已走过的历史 扪心自问 我一生无愧于祖国 无愧于人民 无愧于事业与员工 无愧于朋友 在 我的父亲母亲 一文中 华为的灵魂人物誓言旦旦 但是随着近期一系列真正 华为真相 浮出水面 不得不让人怀疑 这位笼罩
  • git 仓库 端口 prot22 :拒绝连接

    今天新入职一家公司 遇到了git 拉不下代码的问题 http 方式是可以的 但是 ssh 方式是不行的 所以今天记录一下 我是配置了的 生成ssh ssh keygen t rsa C your email example com 然后 配
  • 基于元数据的数据治理分析功能说明

    数据对于企业来说是非常重要的 因为企业数据需要保证其完整性和准确性 所以需要数据治理 MDM基础数据平台是对各个业务系统的主数据进行治理 而各个业务系统中的业务数据则需要在DAP数据分析平台进行治理 DAP数据分析平台通过与ESB应用集成平
  • 华为od机试 Python 【寻找最大距离】

    题目 小明需要在一个沙地上种下一些树木 但是这片沙地上只有特定的一些位置可以种树 小明想要尽可能地增大树之间的距离来更好地防止沙尘暴 你的任务是帮助他找到这样一个距离 使得在这个距离下他可以种下所有的树 而且这个距离是所有可能距离中最大的
  • MySQL查看锁的sql

    MySQL查看锁的sql 查看数据库状态 会显示加锁的信息等等 show engine innodb status 查看正在执行的线程信息 show full processlist 查看正在锁的表 show open tables whe
  • 利用Nacos服务获取配置逻辑的特点,实现动态配置kafak认证

    我要做什么 实现Nacos动态配置kafka认证信息 使每个微服务读取同一个kafka配置 并生成文件注入到环境变量中 为什么要这么做 首先我们看下 Kafka java接入demo 如图 1 prod client jaas conf文件