spring boot2整合kafka及遇到Exception thrown when sending a message with key='null'问题

2023-11-08

spring boot2整合kafka及遇到Exception thrown when sending a message with key=’null’问题

  1. 最近在学习spring boot2和kafka。就用学着使用spring boot2与kafka集成。项目环境

    • 开发工具:IDEA
    • spring kafka :2.1.6.RELEASE
    • spring boot2:2.0.2.RELEASE
    • Apache kafka:2.11-1.0.0

项目的github地址:https://github.com/sweetcczhang/springkafka

  1. 项目目录
    这里写图片描述

  2. application.properties文件

    
    #kafka server address
    
    spring.kafka.bootstrap-servers=10.108.208.51:9092
    
    
    # Provider
    
    spring.kafka.producer.retries=0
    spring.kafka.producer.batch-size=16384
    
    # 指生产者的key和value的编码方式
    
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    
    # Consumer
    
    
    #消费者组
    
    spring.kafka.consumer.group-id=test-consumer-group
    spring.kafka.consumer.auto-offset-reset=earliest
    
    # 指定消费者的解码方式
    
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
    
    # 日志
    
    spring.output.ansi.enabled=DETECT
    debug=true
  3. 生产者配置文件KafkaProviderConfig.java

    @Configuration
    @EnableKafka
    public class KafkaProviderConfig {
    
       @Value("${spring.kafka.bootstrap-servers}")
       private String bootstrapServers;
    
       @Value("${spring.kafka.producer.key-serializer}")
       private String keySerializer;
    
       @Value("${spring.kafka.producer.value-serializer}")
       private String valueSerializer;
    
       @Bean
       public Map<String,Object> producerConfig(){
           Map<String,Object> props = new HashMap<>();
           props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
           return props;
       }
    
       @Bean
       public ProducerFactory<String,String> producerFactory(){
           return new DefaultKafkaProducerFactory<>(producerConfig());
       }
    
       @Bean
       public KafkaTemplate<String,String> kafkaTemplate(){
           return new KafkaTemplate<>(producerFactory());
       }
    }
  4. 生产者产生消息发送到kafka

    @Component
    public class KafkaSender {
       private static final Logger logger = LoggerFactory.getLogger(KafkaSender.class);
    
       @Autowired
       private KafkaTemplate<String,String> kafkaTemplate;
    
       private Gson gson = new GsonBuilder().create();
    
       //发送消息的方法
       public void send(){
           Message message = new Message();
           message.setId(System.currentTimeMillis());
           message.setMsg(UUID.randomUUID().toString());
           message.setSendTime(new Date());
           logger.info("+++++++++++++++++++ message = {}", gson.toJson(message));
           kafkaTemplate.send("sweetzcc",gson.toJson(message);
       }
    }
  5. 消费者配置

    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
       @Value("${spring.kafka.bootstrap-servers}")
       private String bootstrapServer;
    
       @Value("${spring.kafka.consumer.key-deserializer}")
       private String keySerializer;
    
       @Value("${spring.kafka.consumer.value-deserializer}")
       private String valueSerializer;
    
       @Value("${spring.kafka.consumer.group-id}")
       private String groupId;
    
       @Value("${spring.kafka.consumer.auto-offset-reset}")
       private String autoOffsetReset;
    
       @Bean
       public Map<String,Object> consumerConfig(){
           Map<String,Object> props = new HashMap<>();
           props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keySerializer);
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerializer);
           props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
           return props;
       }
    
       @Bean
       public ConsumerFactory<String,String> consumerFactory(){
           return new DefaultKafkaConsumerFactory<>(consumerConfig());
       }
    
       @Bean
       public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(){
           ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(consumerFactory());
           return factory;
       }
    }
  6. 消费者

    @Component
    public class KafkaReceiver {
       private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
    
       @KafkaListener(topics = "sweetzcc")
       public void listen(@Payload String message){
           logger.info("received message={}",message);
       }
    }
  7. 运行测试

    @EnableKafka
    @SpringBootApplication
    public class SpringKafkaApplication implements CommandLineRunner {
    
    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaApplication.class, args);
    }
    
    @Autowired
    private KafkaSender kafkaSender;
    
    @Override
    public void run(String... strings) throws Exception {
        for (int i=0;i<10;i++){
            kafkaSender.send();
        }
    }
    }

    运行结果:
    这里写图片描述

  8. 在开发中遇到的问题。

    我在服务上搭建完一个单节点的kafka服务后,在服务器山启动命令行消费者和服务者进行测试是成功的但是总是报如下错误。

    2018-06-09 14:04:13.490  INFO 6268 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-consumer-group] Marking the coordinator node2:9092 (id: 2147483646 rack: null) dead
    2018-06-09 14:04:41.305 ERROR 6268 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":1528524250944,"msg":"056aa258-09ca-49e9-be25-d72383f96e50","sendTime":"Jun 9, 2018 2:04:10 PM"...' to topic sweetzcc:
    
    org.apache.kafka.common.errors.TimeoutException: Expiring 10 record(s) for sweetzcc-0: 30037 ms has passed since batch creation plus linger time

    网上的回答比较奇怪。说是在服务器上配置配置host和名称相对应。我查看服务器,发现已经配置完成。,有的说重启kafka服务就可以了,但是都不行。后来发现:错误中有:Marking the coordinator node2:9092 (id: 2147483646 rack: null) dead。发现是通过节点名去查找服务器的。但是我在本地的hosts中并没有配置10.108.208.51 node2。配置完成后问题就解决了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spring boot2整合kafka及遇到Exception thrown when sending a message with key='null'问题 的相关文章

随机推荐

  • 从技术的角度Struts1.1与WebWork2的比较

    从技术的角度Struts1 1与WebWork2的比较 标签 action webwork struts 拦截器 验证 从技术的角度Struts1 1与WebWork2的比较 特 征 Struts1 1 WebWork2 Action类 在
  • rootkit模拟木马病毒

    Rootkit是一种特殊的恶意软件 它的功能是在安装目标上隐藏自身及指定的文件 进程和网络链接等信息 比较多见到的是Rootkit一般都和木马 后门等其他恶意程序结合使用 而我们今天要模拟学习的就是与它很像的恶意软件 Rootkit 其中之
  • 【难受】SpirngBoot-Alibaba-nacos跨服务器访问接口的问题

    原想法 我首先准备了 一个网关 2个服务 分别将两个服务部署到不同的远程服务器当中 实现跨服务器访问接口 网关为本地调用 这里就不一一介绍了 问题 利用gateway做路由时出现服务不可用的情况 看日志发现服务调用的IP是172开头的网卡段
  • Sqoop安装与配置

    Sqoop安装与配置 一 了解Sqoop 二 下载Sqoop安装包 三 安装Sqoop 四 配置Sqoop 五 Sqoop基本命令 六 示例 一 了解Sqoop sqoop 是 Hadoop 和关系数据库服务器之间传送数据的工具 主要用于在
  • centos7 kafka安装并安装web界面监控工具

    kafka自带zookeeper 所以不需要下载zookeeper 1 下载 wget http mirrors shu edu cn apache kafka 2 0 0 kafka 2 12 2 0 0 tgz 2 安装 tar zxv
  • ACMP,二维狄洛尼三角剖分

    ACMP cpp std vector
  • python爬虫文字加密_Python爬虫进阶必备

    此次来分析某个小说网站 aHR0cHM6Ly9nLmhvbmdzaHUuY29tL2NvbnRlbnQvOTM0MTYvMTM4Nzc5MTIuaHRtbA node 分析请求 先来看看页面的请求 图1 1 数组 图1 1 通过查看请求 并
  • Error creating bean with name ‘org.apache.cxf.jaxws.spring.NamespaceHandler$SpringServerFactoryBean

    目录 问题描述 解决过程 总结 问题描述 我是在spring整合jaxws 使用webservice的时候报错的 解决过程 这个问题说实话卡了我很久 一直没找着原因 其实但看这个报错就能看出来 有个文件注入不了容器 我一直以为是配置问题 修
  • 强化学习代码练习q-learning-迷宫

    相比上一个demo 这个练习的环境更加复杂 但是就强化学习智能体而言 其整体是一样的 但是既然环境更加复杂 就需要把智能体和环境单独拉出来写 不能再放一个Python文件中 环境类 环境类总结起来就是定义了初始化的参数 构建迷宫 重置函数
  • 设计模式C++学习笔记之一(Strategy策略模式)

    http www cnblogs com wanggary archive 2011 04 07 2008796 html 无意中 从网上下到一本电子书 24种设计模式介绍与6大设计原则 很好奇这里有24种设计模式 印象中GOF写的 设计模
  • CTFSHOW网络迷踪-低碳环保

    记录一个解过的一道OSINT题目 低碳环保 题目来源 CTFshow 题目 解题 先下载附件 得到如图 首先尝试百度识图 但是识别不到 然后我看到右边建筑上方有 奉献清洁能源 几个红字 尝试搜索 搜索到了各种公司 还是没头绪 但是经过观察
  • 租车骑绿岛【C语言】

    租车骑绿岛 部门组织绿岛骑行团建活动 租用公共双人自行车 每辆自行车最多坐两人 最大载重m 给出部门每个人的体重 请问最多需要租用多少双人自行车 输入描述 第一行两个数字m n 分别代表自行车限重 部门总人数 第二行 n个数字 代表每个人的
  • JPA freemaker动态的拼接SQL

    spring data jpa extra https github com slyak spring data jpa extra spring data jpa template 项目地址 https gitee com silentw
  • cJSON解析JSON字符串

    一 为何选择cJSON 我们在使用JSON格式时 如果只是处理简单的协议 可以依据JSON格式 通过对字符串的操作来进行解析与创建 然而随着协议逐渐复杂起来 经常会遇到一些未考虑周全的地方 需要进一步的完善解析方法 此时 使用比较完善的JS
  • <>读书笔记

    lt
  • MySQL+jdbc理论考试【无答案】

    单选 共15题 每题2分 共30分 1 下面关于mysql的说法正确的是 A 默认的端口号是1521 B 默认的端口号是80 C 默认的端口号是3306 D 默认的端口号是443 2 下面排序的说法正确的是 A 默认是升序排序 B asc是
  • Hbuild点击发行,没有反应

    根目录下有 manifest json pages json 等等 才可以打包 换句话说 打开uniapp的文件时 要打开目录下有manifest json pages json的文件 文件上层不要再套一层文件
  • Linux内核移植

    目录 创建VSCode 工程 NXP官方开发板Linux 内核编译 修改顶层Makefile 配置并编译Linux内核 生成zImage和 dtb Linux 内核启动测试 根文件系统缺失错误 在Linux中添加自己的开发板 添加开发板默认
  • matlab仿真gmid电路,bandgap电路稳定性仿真---频响、相位裕度、环路增益

    仿真需要对原理图稍作修改 需在运放的闭环路径中加入iprobe元件 电路中存在两个反馈电路 一个正反馈 如图1组成路径 一个负反馈 如图2组成路径 两个反馈都经过了运放的输出端 故我这儿加在了输出端 可以同时仿真出两个反馈环路的频率响应 环
  • spring boot2整合kafka及遇到Exception thrown when sending a message with key='null'问题

    spring boot2整合kafka及遇到Exception thrown when sending a message with key null 问题 最近在学习spring boot2和kafka 就用学着使用spring boot