记一次Kafka消费能力低,重新分配节点问题优化

2023-11-02

         目前在做一个车联网APP项目。 项目中历史轨迹的处理模式为kafka推送给我车辆报文,然后我自行判断车辆熄火点火来进行历史轨迹行程的保存。

       项目开始车辆较少,每次kafka推送到我的后端,然后我进行处理轨迹开始的插入和轨迹结束的保存就行了,但是最近发现生产环境的kafka老是出现这样的错误:

2018-07-12 00:06:59.910 [messageListenerContainer_realtime-kafka-consumer-1] INFO  o.a.k.c.consumer.internals.AbstractCoordinator:  Marking the coordinator 2147483645 dead.

2018-07-12 00:07:00.014 [messageListenerContainer_realtime-kafka-consumer-1] INFO  o.a.k.c.consumer.internals.AbstractCoordinator:  Attempt to join group broker4 failed due to unknown member id, resetting and retrying.

2018-07-12 00:07:01.204 [messageListenerContainer_open-kafka-consumer-1] INFO  o.a.k.c.consumer.internals.AbstractCoordinator:  Attempt to heart beat failed since the group is rebalancing, try to re-join group.

大概意思就是:在kafka超时时间内,有些消息没有处理完成,consumer coordinator 会由于没有接受到心跳而挂掉  然后自动提交offset失败,然后重新分配partition给客户端 。接下来导致的问题是:

      1.由于自动提交offset失败,导致重新分配了partition的客户端又重新消费之前的一批数据

     2.接着consumer重新消费,又出现了消费超时,无限循环下去。

  然后我修改了kafka的配置(spring-kafka)

    1.enable.auto.commit=false; (关闭自动提交)

    2.session.timeout.ms=100000(增大session超时时间)

   3.request.timeout.ms=110000(socket握手超时时间,默认是3000 但是kafka配置要求大于session.timeout.ms时间)

  修改了以上配置以后,我认为不能从根本上解决消费能力低的问题,因为我这边后端处理涉及到 mongoDB查询和redis的频繁交互,而且已知生产环境mongoDB的内存较低,经过分析以后,认为消费能力低的原因在于mongoDB查询。

  所以我对代码做了一些修改,在收到kafka的推送以后,将消息加入队列,这样的话kafka服务端会认为此消息已经被消费,然后我再开启一个线程从队列连拿数据进行异步处理。

代码如下:

import com.alibaba.fastjson.JSONObject;
import com.jmev.driveData.service.SynDriveDataService;
import com.jmev.web.util.SpringUtils;
import com.jmev.web.view.DriveHistory;

import java.util.Date;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Created by FM-Qws on 2018/7/12.
 */
public class HistoryHolder {

    private static final ConcurrentLinkedQueue<DriveHistory> queue = new ConcurrentLinkedQueue<>();

    private static class HolderClass{
        //静态内部类用到的时候再加载
        private final static   HistoryHolder instance = new HistoryHolder();
    }
    public  static HistoryHolder getInstance() {
        return HolderClass.instance;
    }


    private HistoryHolder() {
        //开启线程(我注入的spring bean)
        SynDriveDataService synDriveDataService = SpringUtils.getBean("synDriveDataService");
        new Thread(new HistoryThread(synDriveDataService)).start();
    }

    public  void addSchedule(String vin, Date time, Integer carId, JSONObject carInfoObj) {
        DriveHistory driveHistory = new DriveHistory(vin,time,carId,carInfoObj);
        queue.offer(driveHistory);
    }

    ConcurrentLinkedQueue<DriveHistory> getQueue(){
        return  queue;
    }

}


import com.jmev.common.redis.RedisTool;
import com.jmev.driveData.service.SynDriveDataService;
import com.jmev.web.util.SpringUtils;
import com.jmev.web.view.DriveHistory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by FM-Qws on 2018/7/12.
 */
public class HistoryThread  implements  Runnable {

    protected  final Logger logger = LoggerFactory.getLogger(this.getClass());

    private  SynDriveDataService  synDriveDataService;
   HistoryThread(SynDriveDataService synDriveDataService){
        this.synDriveDataService=synDriveDataService;
   }

    @Override
    public void run() {
        RedisTool redisTool = SpringUtils.getBean("redisTool");
        while (true) {
            DriveHistory  driveHistory = HistoryHolder.getInstance().getPoll().poll();
            if (null !=driveHistory ) {
                long l = System.currentTimeMillis();
                try {
                    synDriveDataService.updateByEnd(driveHistory.getVin(), driveHistory.getTime(), driveHistory.getCarId(), driveHistory.getCarInfoObj());
                    logger.info("结束保存历史轨迹的时间为: "+(System.currentTimeMillis() - l)+"ms");
                }catch (Exception  e){
                    redisTool.set("DriveData"+driveHistory.getVin(),"1.0");
                    logger.error("熄火数据修改失败,vin:" + driveHistory.getVin() + "失败原因: " + e.getMessage());
                }
            }

        }
    }
}

DriveHistory是一个实体bean 用于传递kafka的消息具体就不贴了

 然后在kafka消费者里面进行添加队列的操作

HistoryHolder.getInstance().addSchedule(vin,time,carId,carInfoObj);
 这样的话就是异步处理了,每次在kafka推送到后端以后,我只加入队列,然后其他线程异步处理。


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

记一次Kafka消费能力低,重新分配节点问题优化 的相关文章

  • HTTP 状态 404 - 请求的资源不可用

    在使用 MyEclipse IDE 中的 Tomcat 服务器和 Struts 2 框架时 我遇到了反复出现的问题 我将我的程序作为服务器应用程序运行 当它运行时 默认的index jsp 文件将成功打开 但应用程序的其他过去都不起作用 当
  • Spring控制器是线程安全的吗

    我遇到了这个控制器示例 想知道它是否是线程安全的 我特别想知道 gson 实例变量 import org springframework stereotype Controller import org springframework we
  • 任务“:app:dexDebug”执行失败

    我目前正在处理我的项目 我决定将我的 Android Studio 更新到新版本 但在我导入项目后 它显示如下错误 Information Gradle tasks app assembleDebug app preBuild UP TO
  • java中的csv到pdf文件

    我正在尝试获得一个csv文件解析为pdf 到目前为止我所拥有的内容附在下面 我的问题是这段代码最终出现在 pdf 中的文件在 csv 文件的第一行被截断 我不明白为什么 附示例 本质上我想要一个没有任何操作的 csv 文件的 pdf 版本
  • MP3:一种以毫秒为单位获取任何给定字节位置的位置的方法?

    我创建了一个 servlet 它返回从客户端请求的任何给定字节位置开始的流 来自 MP3 文件 这允许客户端在任何给定字节位置立即开始播放 而无需进行任何本地查找 现在 我有一个滑块可以直观地显示进度 我正在使用当前字节位置来更新滑块 但是
  • 如何打印整个字符串池?

    我想打印包含文字的整个字符串池String使用添加的对象intern 就在垃圾收集之前 JDK有没有隐式的方法来进行这样的操作 我们如何检查字符串池 EDIT The comment suggests that there may be a
  • 无法加载 jar 文件的主类

    我使用 Eclipse IDE 开发了一个应用程序 创建应用程序后 我以 jar 格式导出项目 当我尝试运行此 jar 文件时 出现错误 无法加载主类 请帮忙 当您将项目导出为 jar 时 请参阅此所以问题 https stackoverf
  • 有没有好的方法来解析用户代理字符串?

    我有一个Java接收模块User Agent来自最终用户浏览器的字符串的行为需要略有不同 具体取决于浏览器类型 浏览器版本甚至操作系统 例如 FireFox 7 0 Win7 Safari 3 2 iOS9 我明白了User Agent由于
  • 使用 Spring 时实例化对象,用于测试与生产

    使用 Spring 时 应该使用 Spring 配置 xml 来实例化生产对象 并在测试时直接实例化对象 这样的理解是否正确 Eg MyMain java package org world hello import org springf
  • JavaFX - setVisible 隐藏元素但不重新排列相邻节点

    在 JavaFX 中 如果我有一个场景有 2VBox元素和每个VBox有多个Label in it 如果我设置顶部VBox to 无形的 为什么底部VBox 不向上移动顶部的场景VBox was The VBox is 无形的但我希望其他物
  • Java 变量的作用域

    我不明白为什么这段代码的输出是10 package uno public class A int x 10 A int x 12 new B public static void main String args int x 11 new
  • 场景生成器删除 fxml 文件中的导入

    我使用场景构建器 Gluon Scene Builder JavaFX Scene Builder 8 1 1 来创建应用程序的 UI 并使用 Eclipse 开发 JavaFX 现在 每次我在场景生成器中保存某些内容时 它都会从 fxml
  • 为什么我在 Mac 上看到“java.lang.reflect.InaccessibleObjectException: Unable to make private java.nio.DirectByteBuffer(long,int)accessibl

    我已经在工作中愉快地构建代码好几天了 但突然我的一个项目 不是全部 失败并出现此错误消息 看看下面的答案吧 我是如何修复它的 起初我用谷歌搜索 看到很多有这个问题的人正在使用 Java 16 但我认为 错误 我正在使用 Java 11 因为
  • Java:VM 如何在 32 位处理器上处理 64 位“long”

    JVM 如何在 32 位处理器上处理 64 位的原始 long 在多核 32 位机器上可以并行利用多个核心吗 64 位操作在 32 位机器上慢了多少 它可能使用多个核心来运行不同的线程 但不会并行使用它们进行 64 位计算 64 位长基本上
  • 覆盖 MATLAB 默认静态 javaclasspath 的最佳方法

    MATLAB 配置为在搜索用户可修改的动态路径之前搜索其静态 java 类路径 不幸的是 静态路径包含相当多非常旧的公共库 因此如果您尝试使用新版本 您可能最终会加载错误的实现并出现错误 例如 静态路径包含 google collectio
  • tomcat 过滤所有 web 应用程序

    问题 我想对所有网络应用程序进行过滤 我创建了一个过滤器来监视对 apache tomcat 服务器的请求 举例来说 它称为 MyFilter 我在 netbeans 中创建了它 它创建了 2 个独立的目录 webpages contain
  • 我想要一个 Java 阿拉伯语词干分析器

    我正在寻找阿拉伯语的 Java 词干分析器 我找到了一个名为 AraMorph 的库 但它的输出是无法控制的 并且它会形成不需要的单词 还有其他阿拉伯语词干分析器吗 这是新的阿拉伯语词干分析器 Assem 的阿拉伯语轻词干分析器 http
  • 如何移动图像(动画)?

    我正在尝试在 x 轴上移动船 还没有键盘 我如何将运动 动画与boat png而不是任何其他图像 public class Mama extends Applet implements Runnable int width height i
  • 重写Object类的finalize()方法有什么用?

    据我所知 在java中如果我们想手动调用垃圾收集器 我们可以执行System gc 1 我们在重写的finalize 方法中做了哪些操作 2 如果我们想手动调用JVM垃圾收集器 是否需要重写finalize 方法 我们在重写的 Finali
  • 使用 eclipse IDE 配置 angularjs

    我想开始使用 AngularJs 和 Java Spring 进行开发 我使用 Eclipse 作为 IDE 我想配置我的 Eclipse 以使这些框架无缝工作 我知道我可能要求太多 但相信我 我已经做了很多研究 你们是我最后的选择 任何帮

随机推荐

  • 基于UE4/Unity绘制地图 - 确定展示区域

    前言 基于UE4 Unity绘制地图基础元素 线 基于UE4 Unity绘制地图基础元素 面和体 基础知识 在研究清楚如何绘制地图的线面体之后 接下来需要确定需要展示的地图区域了 地图可以看成是一个巨型的开放世界游戏场景 因此为了便于数据存
  • mysql mvcc

    mysql MVCC MVVC 实现 排他锁 undolog 版本事务链 一致性read view视图 版本事务链匹配规则 一致性非锁定读 在 InnoDB 存储引擎中 多版本控制 multi versioning open in new
  • switch语句判断范围_Linux C语言: switch语句的范围判断!

    在C语言中 除了循环结构 还有的就是分支结构 分支结构中有if分支与switch分支 一般地 需要判断的条件情况少时 就使用if分支 当判断的情况复杂时 就会使用switch语句 假设有一道题目 要求用户输入一个整数 如果该整数为100 则
  • (C++)将引用用作函数参数——讲解+程序例子

    引用经常被用作函数参数 使得函数中的变量名成为调用程序中的变量的别名 通俗易懂的讲 就是将形参变成实参的别名 这种传递参数的方法称为按引用传递 C语言只能按值传递 按值传递导致被调用函数使用程序的值的拷贝 因此C语言采用按指针传递的方式 避
  • Mac在命令行中打开Finder

    在当前目录下 使用如下代码 open 即可打开当前Finder 并定位当当前目录
  • PowerDesigner V16.5 安装文件 及 破解文件

    之前在网上找个假的 只能看 不能创建自己的DB 或者 不能破解的 比较伤脑筋 偶在这里提供一个 可长期使用的版本 PowerDesigner165 破解文件 rar http pan baidu com s 1hqEDUCG 636KB P
  • 微信小程序涉嫌通过中断用户体验、限制用户操作的方式,收集与服务无关的用户个人信息,包括但不限于,手机号、

    微信小程序涉嫌通过中断用户体验 限制用户操作的方式 收集与服务无关的用户个人信息 包括但不限于 手机号 身份证号 生日 住址等 违反 微信小程序平台运营规则 及相关规则 建议尽快整改 具体运营规范 xxxxxxx 解决方法 增加同意协议 如
  • HBase RowKey设计和实现

    HBase由于存储特性和读写性能 在OLAP即时分析中发挥重要作用 Rowkey的设计好坏关乎到HBase的使用情况 我们知道HBase中定位一条数据需要四个维度的限制 RowKey Column Family Column Qualifi
  • Vue 3.3 新特性-defineModel

    Vue 3 3 新特性 defineModel使用 定义子组件 将原来的defineProps改为defineModel 当子组件变化是 父级组件也会变化 修改对应值
  • Assembler messages error gcc and clang build

    Assembler 错误 Assembler messages 除了自己写的汇编程序会报 Assembler Error外 编译GCC Clang等公开的计算机语言 也会报此错误 自己的汇编 修改自己程序代码 编译GCC Clang等 查看
  • sql: Oracle 11g create procedure

    CREATE OR REPLACE PROCEDURE proc Insert BookKindList temTypeName nvarchar2 temParent int AS ncount number begin SELECT C
  • 关于Navicat连接MySQL出现2059错误解决方法

    一 进入Navicat连接MySQL出现下面的2059界面 原因 mysql8之前版本中加密规则为mysql native password mysql8以后的加密规则为caching sha2 password 将mysql用户登录加密规
  • AI鸟类识别实现自然生态环境数字化监测

    判断一个城市是否生态 是否环保 鸟类往往比人类更加客观公正 更有 发言权 在东营人眼里 鸟类生存繁衍就是生态文明典范城市建设的 晴雨表 和 试金石 现在 很多鸟儿来到保护区都不想走 刘静说 在工作人员细致入微的保护下 很多候鸟都变成了 留鸟
  • 验证码的制作(Canvas)

    目录 画布的知识点 1 绘图 创建画布 2 获取画步 3 获取画笔 4 绘制直线 5 设置描边颜色 6 设置闭合路径 7 填充路径 8 设置填充颜色 9 画弧线 arc x y r 开始角 结束角 方向 10 绘制矩形 rect x y w
  • Windows下获取视频设备的一种改进实现

    之前在https blog csdn net fengbingchun article details 102806822中介绍过在Windows下获取视频设备列表的方法 其实那种实现方法是有缺陷的 当PC机上连接多个视频设备 并且其中有设
  • vite+react搭建人力管理系统项目(2)

    一 登录页面以及相关信息存储 api gt path gt user ts登陆人要请求的api import Get Post from server interface FcResponse
  • 从0开发一个Django博客系统

    目录 一 项目准备 1 1 项目分析 1 2 工程创建和配置 二 注册 2 2 定义用户模型类 2 3 图形验证码 2 4 短信验证码 2 5 用户注册实现 2 6 展示首页 2 7 状态保持 三 登录 3 1 手机登录 3 2 首页用户名
  • Linux下面跑.NET程序

    mono环境的部署 介绍再多也不过是纸上谈兵 实战才是硬道理 工欲善其事必先利其器 要实操 还必须先要有这个环境 下面进入本回合的重点环节 Mono环境的配置 网上也有大量关于mono环境搭建的帖子 基本上都是搭配以下这两种类型的mono环
  • ue4 解决界面不能自动对焦问题

    在UE4界面的制作中遇到了一个小问题 在ue4选中视口的模式中的按钮点击是正常的但是在独立进程游戏模式下或者打包好的项目中点击就出现问题了 界面中的按钮需要点击两次才能够触发响应事件 这问题虽然不是大问题但是很别扭 按钮需要点击两次才行 首
  • 记一次Kafka消费能力低,重新分配节点问题优化

    目前在做一个车联网APP项目 项目中历史轨迹的处理模式为kafka推送给我车辆报文 然后我自行判断车辆熄火点火来进行历史轨迹行程的保存 项目开始车辆较少 每次kafka推送到我的后端 然后我进行处理轨迹开始的插入和轨迹结束的保存就行了 但是