Kafka接入

2023-10-30

Kafka接入

1、引入依赖

<!-- kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.2</version>
</dependency>

2、配置kafka

# Spring
spring: 
  kafka:
    bootstrap-servers: 192.168.133.69:9092
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: mf-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

3、kafka生产者

package com.unionman.iot.platform.system.kafka;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * @description 消息生产者
 * @author junlin.ru
 * @date 2022/4/29 9:49
 **/
@Component
@Slf4j
public class KafkaProducer<T> {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * kafka 发送消息
     * @param obj 消息对象
     */
    public void send(T obj, String topic) {

        String jsonObj = JSON.toJSONString(obj);
        log.info("------------ message = {}", jsonObj);

        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, jsonObj);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("Produce: The message failed to be sent:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //TODO 业务处理
                log.info("Produce: The message was sent successfully:");
                log.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString());
            }
        });
    }
}

4、kafka消费者

package com.unionman.iot.platform.system.kafka;

import com.alibaba.fastjson.JSONObject;
import com.unionman.iot.platform.entity.DeviceMessageMonitor;
import com.unionman.iot.platform.es.DeviceMessageMonitorRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * @description kafka生产者
 * @author junlin.ru
 * @date 2022/4/29 10:11
 **/
@Component
@Slf4j
public class KafkaConsumer {

    @Autowired
    private DeviceMessageMonitorRepository deviceMessageMonitorRepository;

    @KafkaListener(topics = {"monitor_msg"})
    public void listen(ConsumerRecord<?, String> record){
        log.debug(record.toString());
        Optional<String> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            String message = kafkaMessage.get();
            DeviceMessageMonitor deviceMessageMonitor = JSONObject.parseObject(message, DeviceMessageMonitor.class);
            deviceMessageMonitorRepository.save(deviceMessageMonitor);
        }
    }

}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka接入 的相关文章

随机推荐

  • linux桌面小程序开发日记_1(pyqt5 + yolov5)

    linux桌面小程序开发日记1 从零开始配置VM虚拟机 安装必要的软件vscode anaconda和pyqt5环境 最后一篇博客地址 https blog csdn net Liuchengzhizhi article details 1
  • Au cs6怎样才能导入和导出m4a或者就是aac格式的文件呢?

    求解 Au cs6怎样才能导入和导出m4a或者就是aac格式的文件呢 汉化版 解决发法 首选项 常规 媒体与暂存盘 动态链接媒体下面的启动DLMS格式支持打钩 在媒体浏览器内启用DLMS预览打钩 英文版edit preference med
  • C++ 逻辑与或非 逻辑与逻辑或 逻辑非

    文章目录 逻辑非 逻辑与 逻辑或 逻辑运算符 与 或 非 作用 根据表达式的值返回真值或者假值 逻辑非 include
  • 链接库介绍

    什么是库 计算机中 有些文件专门用于存储可以重复使用的代码块 例如功能实用的函数或者类 我们通常将它们称为库文件 简称 库 Library 以 C 语言为例 如下展示一个函数库 myMath c int add int a int b re
  • jdbc连接SQLite操作

    项目导入sqlite jdbc 3 7 2 jar 地址 http pan baidu com s 1kVHAGdD 示例 package test import java sql Connection import java sql Dr
  • 腾讯视频TS文件转MP4

    代码 我的 GitHub 仓库 https github com ylsislove ts2mp4 提取下载目录 腾讯视频缓存目录 Android data com tencent qqlive files videos XXXXX 目录下
  • 常用LVDS接口LCD屏

    1 常用LVDS接口LCD屏 具体请参考网站 LCD显示 配套软硬件模块 英创 各种屏实物介绍和手册 英创公司ESMARC系列中的ESM6802 ESM7000等主板型号可以直接引出LVDS信号和显示屏连接 我们提供了一些现成的屏幕模块 客
  • Pytorch中如何加载数据、Tensorboard、Transforms的使用

    一 Pytorch中如何加载数据 在Pytorch中涉及到如何读取数据 主要是两个类一个类是Dataset Dataloader Dataset 提供一种方式获取数据 及其对应的label 主要包含以下两个功能 如何获取每一个数据以及lab
  • RHCE-----------配置DNS服务------实操练习

    安装bind 关闭防火墙和selinux 启动named服务 关闭防火墙 重启named服务 编辑配置文件 重启服务 删除客户端本地hosts文件域名解析配置 将DNS服务器设为本机IP
  • 论文添加引用遇到问题

    应该选择红圈里的
  • 剑指 Offer 36. 二叉搜索树与双向链表

    剑指 Offer 36 二叉搜索树与双向链表 难度中等285 输入一棵二叉搜索树 将该二叉搜索树转换成一个排序的循环双向链表 要求不能创建任何新的节点 只能调整树中节点指针的指向 为了让您更好地理解问题 以下面的二叉搜索树为例 我们希望将这
  • 机器学习实战2(决策树篇)

    目录 1 决策树 2 决策树的构造 3 决策树的可视化 4 测试和存储决策树 1 决策树 你是否玩过二十个问题的游戏 游戏的规则很简单 参与游戏的一方在脑海里想某个事物 其他参与者向他提问题 只允许提20个问题 问题的答案也只能用对或错回答
  • QList(增删改查)示例

    特点 支持随机访问 其界面也是基于索引的 在中间的任意一端插入或移除项都是非常快速的 注 访问QList中的值时 尽量采用value int i 因为value查不到此值时会返回一个默认值0 而at int i 则会引起崩溃 并且at返回的
  • QT字符串以16进制接收再转化为固定位数的二进制(QT系列11)

    代码 bool OK QString str 1E QString str1 16进制转化 int val str toInt OK 16 qDebug lt
  • GitHub Copilot 体验

    LHS 475 b NASA s Webb Confirms Its First Exoplanet NASA 介绍 什么是GitHub Copilot GitHub Copilot是由GitHub和OpenAI公司共同开发的基于云的AI编
  • Volatile的其他特性

    2 1 volatile总体概览 在上一节中 我们已经研究完了volatile可以实现并发下共享变量的可见性 volatile除了保证可见性外 volatile还具备如下一些突出的特性 volatile的原子性问题 volatile不能保证
  • 编写程序对给定的有向图(不一定连通)进行深度优先遍历_TypeScript 实战算法系列(七):实现图的遍历...

    本文由图雀社区认证作者 神奇的程序员 写作而成 图雀社区将连载其TypeScript 实战算法系列 点击阅读原文查看作者的掘金链接 感谢作者的优质输出 让我们的技术世界变得更加美好 前言 有一个图 我们想访问它的所有顶点 就称为图的遍历 遍
  • JSBinding iOS与JS交互(When-iOS-loves-JS)

    What s JSBinding It s Not Hybrid It s NOT a new technology JSBinding 绑定JS和Native JSBinding和HyBrid的对比 原生OC语音和脚本JS语言对比 API
  • v-html 不识别\n解决方法

    在 html 中的一些特殊场景中需要某一段文字遇到 n 就行换行 但是往往加了后没有效果 那是因为html标签不识别 n 认为 n 只是一个普通的文本 解决这种问题通常有以下几种方案 1 利用正则将html的 n换成 br div div
  • Kafka接入

    Kafka接入 1 引入依赖