spring集成kafka并对消息进行监听

2023-11-02

spring集成kafka


需要依赖zookeeper,需提前启动

在server.properties文件中配置kafka连接zookeeper相关信息

############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

在zookeeper.properties中配置zookeeper所需配置

# 数据文件保存地址
dataDir=/tmp/zookeeper
# 客户端端口
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# 设置此功能端口将不在冲突
admin.enableServer=false
# admin.serverPort=8080

kafka本地安装启动

windows下载kafka二进制包到本机:http://kafka.apache.org/downloads
2、在config下面的server.properties文件,修改:
listeners=PLAINTEXT://localhost:9092
log.dirs=F:\kafka_2.13-2.5.0\logs
3、在bin同级目录下打开shell窗口,启动kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties
4、创建主题 查看可用主题
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
5、删除指定topic
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic topic_kedacom_icms_alarm_jy_3725
5.1、如果出现临时存储的topic需要到zookeeper删除指定的topic
#查看存储的topic
ls /brokers/topics
#删除指定的topic
rmr /brokers/topics/topicName
6、另起窗口,开启指定topic
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_kedacom_icms_alarm_jy_3725
7、另起窗口、开启生产端
.\bin\windows\kafka-console-producer.bat --broker-list 189.1.0.55:9092 --topic topic_kedacom_icms_spdc_jy_3725
8、另起窗口,开启消费端
chcp 65001
.\bin\windows\kafka-console-consumer.bat --bootstrap-server 189.1.0.55:9092 --topic topic_kedacom_icms_spdc_sj_3725 --from-beginning
如果遇到文本过长 指令识别错误,是因为存放目录过长不规范引起

pom文件

#在选择版本,高版本会提示缺少anntnationprocess...
   <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.1.8.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-api</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>commons-httpclient</groupId>
      <artifactId>commons-httpclient</artifactId>
      <version>3.1</version>
    </dependency>
  </dependencies>

生产配置

/**
 * @Auther: lyp
 * @Date: 2021/11/22 15:46
 */
 @Configuration
 @EnableKafka
 public class KafkaProducerConfig {

     @Value("${bootstrap.servers}")
     private String bootstrapServers;

     public KafkaProducerConfig(){
         System.out.println("kafka--------------------------------生产配置");
     }

     /**
     * 创建生产值消息工厂
     */
     @Bean
     public ProducerFactory<Integer, String> producerFactory() {
         return new DefaultKafkaProducerFactory(producerProperties());
     }

    /**
     * 生产基本配置
     */
    @Bean
     public Map<String, Object> producerProperties() {
         Map<String, Object> props = new HashMap<String, Object>();
         //设置kafka访问地址
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         //消息转化
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
         //重试次数
         props.put(ProducerConfig.RETRIES_CONFIG,1);
         //分批处理内存设置
         props.put(ProducerConfig.BATCH_SIZE_CONFIG,1048576);
         props.put(ProducerConfig.LINGER_MS_CONFIG,1);
         //使用内存配置
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432L);
         //确认标志符使用配置
         props.put(ProducerConfig.ACKS_CONFIG,"all");
         return props;
     }
 
     @Bean
     public KafkaTemplate<Integer, String> kafkaTemplate() {
         KafkaTemplate kafkaTemplate = new KafkaTemplate<Integer, String>(producerFactory(),true);
         kafkaTemplate.setDefaultTopic(KafkaSendEnum.ALARM_WARN_PUSH.getTopic());
         return kafkaTemplate;
     }
 
 }

消费者配置

package com.huating.jfp.msg.api.kafka.config;

import com.huating.jfp.msg.api.kafka.construct.KafkaConsumerEnum;
import com.huating.jfp.msg.api.kafka.listener.KafkaConsumerListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
 * @author lyp
 * @ClassName KafkaConsumerConfig
 * @description: 消费者配置
 * @datetime 2022年 07月 20日 9:15
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${bootstrap.servers}")
    private String bootstrapServers;

    public KafkaConsumerConfig() {
        System.out.println("kafka消费者配置加载...");
    }

    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<String, Object>();
        //Kafka服务地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //消费组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerEnum.SD_SJ.getGroupId());
        //关闭自动提交位移
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //设置间隔时间,默认5000ms
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
        //Key反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 					                                             "org.apache.kafka.common.serialization.StringSerializer");
        //Value反序列化
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,                                                                   "org.apache.kafka.common.serialization.StringSerializer");
        //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
	   //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区		下的数据
	   //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的			offset,则抛出异常
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, String>(consumerProperties());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>                                                                  kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new                                                                          ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public KafkaConsumerListener kafkaConsumerListener() {
        return new KafkaConsumerListener();
    }
}

创建topic工具类

/**
 * @author lyp
 */
public class KafkaTopicUtil {

    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicUtil.class);

    /**
     * 功能描述:创建topic,并返回创建结果
     * @param: topicName
     * @return: boolean
     * @auther: lyp
     * @date: 2021/11/12 16:06
     */
    public static boolean createTopics(String bootstrapServers,String topicName,int partitions,short replication) {
        boolean res = false;
        try {
            Properties properties = new Properties();
            properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            properties.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");
            AdminClient adminClient = KafkaAdminClient.create(properties);
            NewTopic newTopic = new NewTopic(topicName, partitions, replication);
            adminClient.createTopics(Arrays.asList(newTopic));
            logger.info("创建Topic:"+topicName+"成功!");
            res = true;
        } catch (Exception e) {
            e.printStackTrace();
            logger.info("创建异常!");
        }
        return res;
    }

    /**
     * 功能描述:获取当前kafka所存在的topic列表
     * @return: set
     * @auther: lyp
     * @date: 2021/11/12 16:07
     */
    public static Set<String> getTopics(String bootstrapServers){
        Set<String> nameSet = new HashSet<>();
        try {
            Properties properties = new Properties();
            properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            AdminClient adminClient = KafkaAdminClient.create(properties);
            ListTopicsResult listTopicsResult = adminClient.listTopics();
            KafkaFuture<Set<String>> names = listTopicsResult.names();
            nameSet = names.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return nameSet;
    }
}

生产业务

public interface KafkaProduceService {

    /**设备报警消息发送*/
    boolean sendWarnMessage(DeviceWarnInfo deviceWarnInfo);
}
/**
 * @author lyp
 */
@Service("kafkaProducerService")
public class KafkaProducerServiceImpl implements KafkaProduceService {

    private static final Logger logger = LoggerFactory.getLogger(KafkaProduceService.class);

    @Value("${bootstrap.servers}")
    private String bootstrapServers;

    @Value("${topic.name}")
    private String topicName;

    @Value("${srcUnit.code}")
    private String srcUnitCode;

    @Value("${srcUnit.name}")
    private String srcUnitName;

    @Override
    public boolean sendWarnMessage(DeviceWarnInfo deviceWarnInfo) {
        boolean res = false;
        Map<String, Object> reportData = new HashMap<>();
        reportData.put("command","reportAlarm");
        deviceWarnInfo.setSrcUnitCode(srcUnitCode);
        deviceWarnInfo.setSrcUnitName(srcUnitName);
        reportData.put("data",deviceWarnInfo);
        //判断是否存在当前主题
        Set<String> topics = KafkaTopicUtil.getTopics(bootstrapServers);
        if (!topics.contains(KafkaSendEnum.ALARM_WARN_PUSH.getTopic())){
            if (!KafkaTopicUtil.createTopics(bootstrapServers,topicName,1,(short)1)){
                logger.info("topic创建失败,消息发送不成功!");
                return res;
            }
        }

        KafkaTemplate kafkaTemplate = SpringContextUtil.getBean("kafkaTemplate");
        ListenableFuture send = kafkaTemplate.sendDefault(topicName, JSONArray.toJSONString(reportData));
        send.addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                logger.error(ex.getMessage()+"发送失败!原因:"+ex.getCause());
                System.out.println("发送失败!");
            }

            @Override
            public void onSuccess(Object result) {
                logger.info("消息发送成功"+result.toString());
                System.out.println("发送成功!");
            }
        });
        return res;
    }

}

消费业务

消息接收类

package com.huating.jfp.msg.api.kafka.entity;

/**
 * @author lyp
 * @ClassName MesBody
 * @description: 消息实体
 * @datetime 2022年 07月 21日 14:48
 */
@Data
public class MesBody {
	//类型标记字段
    private String command;
	//消息实体字段
    private String data;
}


监听类

package com.huating.jfp.msg.api.kafka.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;

import com.alibaba.fastjson.JSONObject;
import com.huating.jfp.msg.api.kafka.construct.KafkaMesType;
import com.huating.jfp.msg.api.kafka.construct.KafkaTopics;
import com.huating.jfp.msg.api.kafka.consumer.service.KafkaConsumerService;
import com.huating.jfp.msg.api.kafka.entity.InspectorIssue;
import com.huating.jfp.msg.api.kafka.entity.MesBody;
import com.huating.jfp.msg.api.kafka.entity.Notice;

/**
 * @author lyp
 * @ClassName KafkaConsumerListener
 * @description: 主题监听
 * @datetime 2022年 07月 20日 9:27
 */

public class KafkaConsumerListener {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);
    @Autowired
    private KafkaConsumerService consumerService;

    /**
     * 功能描述: 监听指定topic,多个使用,
     * groupId:分组id
     * topics:监听当前topic数组
     * topic:监听单个topic
     */
    @KafkaListener(groupId = "${group.id}",topics = "#{'${consumer.topics}'.split(',')}",containerFactory = "")
    public void listener(ConsumerRecord<String, String> consumerRecord) {
        logger.info("开始消费" + KafkaTopics.SD_JY_DUTY_TOPIC.getTopicName() + "的消息{}", consumerRecord.value());
        MesBody mesBody = JSONObject.parseObject(consumerRecord.value(), MesBody.class);
        logger.error("kafka监听-当前消息类型:"+mesBody.getCommand());
        //督查督办
        if (mesBody.getCommand().equals(KafkaMesType.SD_INSPECTOR_ISSUE.getMesCode())
                || mesBody.getCommand().equals(KafkaMesType.SD_INSPECT_DISPOSE.getMesCode())
                || mesBody.getCommand().equals(KafkaMesType.SD_INSPECT_RES.getMesCode())) {
            logger.error("督查督办监听消息处理开始----->----->");
            InspectorIssue inspectorIssue = JSONObject.parseObject(mesBody.getData(), InspectorIssue.class);
            consumerService.inspectorListener(inspectorIssue);
        }

        //通知通报
        if (mesBody.getCommand().equals(KafkaMesType.SD_NOTICE_ISSUE.getMesCode())) {
            logger.error("通知通报开始监听");
            Notice notice = JSONObject.parseObject(mesBody.getData(), Notice.class);
            consumerService.noticeListener(notice);
        }
    }
}

业务处理

package com.huating.jfp.msg.api.kafka.consumer.service;


import com.huating.jfp.msg.api.kafka.entity.InspectorIssue;
import com.huating.jfp.msg.api.kafka.entity.Notice;

/**
 * @author lyp
 */
public interface KafkaConsumerService {

    /**
     * 功能描述: 督查下发 督查办结监听处理
     *
     * @param inspectorIssue
     */
    void inspectorListener(InspectorIssue inspectorIssue);

    /**
     * 功能描述: 通知通报下发监听
     *
     * @param notice
     */
    void noticeListener(Notice notice);
}


package com.huating.jfp.msg.api.kafka.consumer.service.impl;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.huating.jfp.common.dao.MsgConfigureDao;
import com.huating.jfp.common.dao.MsgDao;
import com.huating.jfp.common.entity.Msg;
import com.huating.jfp.common.entity.MsgConfigure;
import com.huating.jfp.common.entity.MsgReceive;
import com.huating.jfp.common.service.MsgReceiveService;
import com.huating.jfp.core.base.ViewPublicRewrite;
import com.huating.jfp.msg.api.http.servcie.HttpRequestService;
import com.huating.jfp.msg.api.kafka.consumer.service.KafkaConsumerService;
import com.huating.jfp.msg.api.kafka.dao.InspectorEventMapper;
import com.huating.jfp.msg.api.kafka.dao.NoticeMapper;
import com.huating.jfp.msg.api.kafka.entity.*;
import com.huating.jfp.msg.api.kafka.producer.service.KafkaProducerService;
import com.huating.jfp.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * @author lyp
 * @ClassName KafkaConsumerServiceImpl
 * @description: 消费实现
 * @datetime 2022年 07月 20日 9:13
 */
@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class);
    private static final String SRC_UNIT_CODE = "gaol_code";
    private static final String SRC_UNIT_NAME = "gaol_name";
    @Autowired
    private InspectorEventMapper inspectorEventMapper;
    @Autowired
    private HttpRequestService httpRequestService;
    @Autowired
    private NoticeMapper noticeMapper;

    @Autowired
    private MsgDao msgMapper;

    @Autowired
    private MsgConfigureDao msgConfigureMapper;
    @Autowired
    private MsgReceiveService msgReceiveService;
    @Autowired
    private ViewPublicRewrite vp;

    @Override
    public void inspectorListener(InspectorIssue inspectorIssue) {
        if (!StrUtil.isEmpty(inspectorIssue.getUuid())) {
            if (!StrUtil.isEmpty(inspectorIssue.getDubanTime())) {
                logger.error("督办下发处理");
                InspectorEventDispose inspectorEventDispose = new InspectorEventDispose();
                //督查督办
                String uuid = StringUtil.getUUID();
                inspectorEventDispose.setIedUuid(uuid);
                inspectorEventDispose.setIedIeUuid(inspectorIssue.getUuid());
                inspectorEventDispose.setIedExpireTime(inspectorIssue.getDubanTime());
                inspectorEventDispose.setIedContent(inspectorIssue.getContent());

                //督办下发持久化
                inspectorEventMapper.insertDispose(inspectorEventDispose);
                logger.error("督办下发数据新增完成");
                //督办文件持久化
                List<FileEntity> files = inspectorIssue.getFiles();
                List<InspectorFile> fileList = new ArrayList<>();
                downloadFile(files, fileList, uuid);
                inspectorEventMapper.insertFiles(fileList);
                logger.error("督办下发完成");
            } else if (!StrUtil.isEmpty(inspectorIssue.getSrcUnitCode())) {
                logger.error("督查下发处理");
                InspectorEvent inspectorEvent = new InspectorEvent();
                //督查下发
                inspectorEvent.setIeUuid(inspectorIssue.getUuid());
                inspectorEvent.setIeAreaCode(vp.getBusinessValue("gaol_code"));
                inspectorEvent.setIeEventType(inspectorIssue.getType());
                inspectorEvent.setIeDescribe(inspectorIssue.getContent());
                inspectorEvent.setIeGrabTime(inspectorIssue.getPublishTime());
                inspectorEvent.setIeExpireTime(inspectorIssue.getQxTime());
                inspectorEvent.setIeCusNunmber(vp.getBusinessValue("base_cus"));
                inspectorEvent.setIeNature(inspectorIssue.getNature());
                inspectorEvent.setIeIsSj(0);
                //督查下发持久化
                inspectorEventMapper.insertSynData(inspectorEvent);
                logger.error("督查下发数据新增成功");
                //督查文件持久化
                List<FileEntity> files = inspectorIssue.getFiles();
                List<InspectorFile> fileList = new ArrayList<>();
                downloadFile(files, fileList, inspectorIssue.getUuid());
                inspectorEventMapper.insertFiles(fileList);
                logger.error("督查文件数据新增成功");
                logger.error("督查下发完成");
            } else {
                //督查办结
                if (inspectorEventMapper.searchIsSj(inspectorIssue.getUuid()) > 0) {
                    //修改督查状态为办结
                    inspectorEventMapper.updateState("3", inspectorIssue.getUuid());
                    logger.error("督查办结完成");
                }
            }
        }
    }

    @Override
    public void noticeListener(Notice notice) {
        logger.error("通知通报下发开始处理");
        //通知通报持久化
        noticeMapper.insertData(notice);

        Msg msg = new Msg();
        String uuid = StringUtil.getUUID();
        msg.setMUuid(uuid);

        MsgConfigure msgConfigure = new MsgConfigure();
        msgConfigure.setMcCode("NOTIC_ISSUE");
        MsgConfigure config = msgConfigureMapper.selectByData(msgConfigure).get(0);

        msg.setMcUuid(config.getMcUuid());
        msg.setMcMsglevel(config.getMcMsglevel());
        msg.setMStatus(Byte.parseByte(notice.getFeedback() == 0 ? "1" : "0"));
        msg.setMParam(notice.getUuid());
        msg.setMContent(notice.getTitle());
        msg.setCreateTime(new Date());
        if (notice.getFeedback() == 0) {
            msg.setMHandleTime(new Date());
            msg.setMHandleUser("当前通知通报无需处置");
        }
        msgMapper.insertMsg(msg);

        MsgReceive msgReceive = new MsgReceive();
        msgReceive.setMrUuid(StringUtil.getUUID());
        msgReceive.setmUuid(uuid);
        msgReceiveService.insertMsgReceive(msgReceive);

        //文件持久化
        List<FileEntity> files = notice.getFiles();
        noticeDownloadFile(files, notice.getUuid());
        noticeMapper.insertFiles(files);

    }

    private void downloadFile(List<FileEntity> files, List<InspectorFile> fileList, String uuid) {
        logger.error("文件下载开始");
        if (!files.isEmpty()) {
            for (FileEntity file : files) {
                InspectorFile inspectorFile = new InspectorFile();
                String fileName = file.getFileName();
                logger.error(fileName);
                inspectorFile.setIfFileName(fileName);
                String last = fileName.substring(fileName.lastIndexOf("."));
                if (last.equals(".jpg") || last.equals(".JPG") || last.equals(".png") || last.equals(".gif") || last.equals(".bmp")) {
                    inspectorFile.setIfFileType(1);
                } else {
                    inspectorFile.setIfFileType(2);
                }
                inspectorFile.setIfSourceType(1);
                inspectorFile.setIfIeUuid(uuid);
                //需要确定省局的其他类型文件详情
                inspectorFile.setIfPath(file.getFileName());
                String fileId = file.getFileId();
                //文件下载
                String token = httpRequestService.sendPostMessage();
                boolean res = httpRequestService.downloadFile(fileId, token, vp.getCusBusinessValue("duty_file_disk_mapping_path", "1000") + "/dutyUpLoad/");
                if (res) {
                    fileList.add(inspectorFile);
                }
            }
        }
    }

    private void noticeDownloadFile(List<FileEntity> files, String uuid) {
        files.stream().forEach((file) -> {
            file.setParentId(uuid);
            String token = httpRequestService.sendPostMessage();
            httpRequestService.downloadFile(file.getFileId(), token, vp.getBusinessValue("notice_file_disk_mapping_path") + "/noticeUpLoad/");
        });
    }

    public boolean checkSj() {
        return vp.getBusinessValue("is_sj") != null &&
                Boolean.parseBoolean(vp.getBusinessValue("is_sj"));
    }
}


异步 同步 ONEWAY

kafka消息发送方式有同步、异步和ONEWAY三种方式,producer.type参数指定同步或者异步,request.require.acks指定ONEWAY。

producer.type=sync默认同步

设置异步需配套配置

Property Default Description
queue.buffering.max.ms 5000 启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。
queue.buffering.max.messages 10000 启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。
queue.enqueue.timeout.ms -1 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。
batch.num.messages 200 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量)
以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。

在代码中如果需要同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。

kafkaTemplate.send().get("key",value);

异步发送只需要在发送成功获取消息是否成功即可:

ListenableFuture future = kafkaTemplate.send();
future.addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                logger.error(ex.getMessage()+"发送失败!原因:"+ex.getCause());
            }
            @Override
            public void onSuccess(Object result) {
                logger.info("消息发送成功"+result.toString());
            }
        });

消息可靠性

producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks",这个参数决定了producer要求leader partition收到确认的副本个数:

  • 如果acks设置为0,表示producer不会等待broker的相应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。
  • 若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。
  • 若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spring集成kafka并对消息进行监听 的相关文章

随机推荐

  • 基于Selenium+Python的web自动化测试框架(附框架源码+项目实战)

    目录 一 什么是Selenium 二 自动化测试框架 三 自动化框架的设计和实现 四 需要改进的模块 五 总结 总结感谢每一个认真阅读我文章的人 重点 配套学习资料和视频教学 一 什么是Selenium Selenium是一个基于浏览器的自
  • Android接入三方登录——QQ

    facebook错误1 feature unavailable facebook login is currently unavailable for this app since we are updating additional de
  • 记 ReactiveWebServerFactory bean defined in the context. 导致的异常

    异常内容 APPLICATION FAILED TO START Description Web application could not be started as there was no org springframework bo
  • chatgpt赋能python:Python多行注释

    Python 多行注释 在 Python 中 我们经常需要写注释来解释代码或者用于调试 Python 的注释分为单行注释和多行注释 本文主要介绍 Python 中如何多行注释 单行注释 在 Python 中 单行注释以符号 开头 可以写在代
  • Windows的密码生成算法——NTLM算法破解

    文章目录 方法一 Python代码暴力破解 方法二 hashcat工具破解 NTLM CDABE1D16CE42A13B8A9982888F3E3BE hint 密码长度不超过5 数字和符号组成 Windows下NTLM Hash生成原理
  • mysql5.7 免安装版配置

    文章目录 my ini 安装为服务 启动服务命令 移除服务命令 修改 root 账号的密码 初始化数据 my ini 复制 my default ini 或者新建 client port 3306 default character set
  • <mirrorOf>标签用于指定哪些仓库或仓库组需要使用该镜像源

    在Maven配置镜像源时 确实会使用到mirrorOf
  • 使用内核API函数找到I2C和串口控制器发送数据

    我们一般操作I2C或者串口都是编写应用程序调用内核硬件提供的设备节点操作这些硬件的 但是在某个项目中 需要在shutdown的时候往i2c和tty发送数据 发送数据给外置的mcu mcu几秒内就会给cpu断电 所以 这个动作无法在应用层中完
  • 靠谱的车 算法

    靠谱的车 程序员小明打了一辆出租车去上班 出于职业敏感 他注意到这辆出租车的计费表有点问题 总是偏大 出租车司机解释说他不喜欢数字4 所以改装了计费表 任何数字位置遇到数字4就直接跳过 其余功能都正常 比如 23再多一块钱就变为25 39再
  • Amazon S3 REST方式获取Object

    Amazon S3 用REST方式获取文件 具体参见API http docs aws amazon com AmazonS3 latest API RESTObjectGET html 利用C 构建下面的请求 GET ObjectName
  • MSCAN:Learning Deep Context-aware Features over Body and Latent Parts for Person ReID阅读笔记

    Learning Deep Context aware Features over Body and Latent Parts for Person Re identification 作者 DangWei Li等人 CVPR 2017 1
  • DOS常用命令(从入门到精通)

    DOS命令学习 一 DOS使用常识 DOS的概况 DOS Disk Operating System 是一个使用得十分广泛的磁盘操作系统 就连眼下流行的Windows9x ME系统都是以它为基础 常见的DOS有两种 IBM公司的PC DOS
  • 朴素贝叶斯理论推导与三种常见模型

    朴素贝叶斯 Naive Bayes 是一种简单的分类算法 它的经典应用案例为人所熟知 文本分类 如垃圾邮件过滤 很多教材都从这些案例出发 本文就不重复这些内容了 而把重点放在理论推导 其实很浅显 别被 理论 吓到 三种常用模型及其编码实现
  • C++ 和 OpenCV 实现卷积神经网络并加载 Keras 训练好的参数进行预测

    C 和 OpenCV 实现卷积神经网络并加载 Keras 训练好的参数进行预测 一 背景 二 Keras 定义神经网络结构 channels first 与 channels last channels first 与 channels l
  • Sourcetree 打开闪退怎么处理

    只需要把箭头指向的SourceTree exe Url 3vynpq3lkfkc3vf35ldq2wva2cs3o2zs文件删除 如果是多个一并删除 只留一个SourceTree文件夹即可
  • WebService 与 SOAP、WSDL、UDDI

    什么是WebService Web Services 是一种基于组件的软件平台 是面向服务的Internet 应用 Web Services 是应用于Internet 的 而不是限于局域网或试验环境 这要求提出的Web Services 框
  • 微服务2-nacos 配置中心

    1什么是配置中心 在微服务架构中 当系统从一个单体应用 被拆分成分布式系统上一个个服务节点后 配置文件也必须跟着迁移 分割 这样配置就分散了 不仅如此 分散中还包含着冗余 如下图 2为什么要使用配置中心 配置中心将配置从各应用中剥离出来 对
  • Unity 游戏框架搭建 2017 (五) 简易消息机制

    什么是消息机制 23333333 让我先笑一会 为什么用消息机制 三个字 解 耦 合 我的框架中的消息机制用例 1 接收者 Receiver cs using UnityEngine namespace QFramework Example
  • 如何运行ImageMagick的命令行工具

    在http www imagemagick org script index php网站下载相应的执行文件 这里以下载ImageMagick 6 6 5 10 Q16 windows static exe为例说明 将ImageMagick
  • spring集成kafka并对消息进行监听

    spring集成kafka 文章目录 spring集成kafka kafka本地安装启动 pom文件 生产配置 消费者配置 创建topic工具类 生产业务 消费业务 消息接收类 监听类 业务处理 异步 同步 ONEWAY 需要依赖zooke