目前在做一个车联网APP项目。 项目中历史轨迹的处理模式为kafka推送给我车辆报文,然后我自行判断车辆熄火点火来进行历史轨迹行程的保存。
项目开始车辆较少,每次kafka推送到我的后端,然后我进行处理轨迹开始的插入和轨迹结束的保存就行了,但是最近发现生产环境的kafka老是出现这样的错误:
2018-07-12 00:06:59.910 [messageListenerContainer_realtime-kafka-consumer-1] INFO o.a.k.c.consumer.internals.AbstractCoordinator: Marking the coordinator 2147483645 dead.
2018-07-12 00:07:00.014 [messageListenerContainer_realtime-kafka-consumer-1] INFO o.a.k.c.consumer.internals.AbstractCoordinator: Attempt to join group broker4 failed due to unknown member id, resetting and retrying.
2018-07-12 00:07:01.204 [messageListenerContainer_open-kafka-consumer-1] INFO o.a.k.c.consumer.internals.AbstractCoordinator: Attempt to heart beat failed since the group is rebalancing, try to re-join group.
大概意思就是:在kafka超时时间内,有些消息没有处理完成,consumer coordinator
会由于没有接受到心跳而挂掉 然后自动提交offset失败,然后重新分配partition给客户端 。接下来导致的问题是:
1.由于自动提交offset失败,导致重新分配了partition的客户端又重新消费之前的一批数据
2.接着consumer重新消费,又出现了消费超时,无限循环下去。
然后我修改了kafka的配置(spring-kafka)
1.enable.auto.commit=false; (关闭自动提交)
2.session.timeout.ms=100000(增大session超时时间)
3.request.timeout.ms=110000(socket握手超时时间,默认是3000 但是kafka配置要求大于session.timeout.ms时间)
修改了以上配置以后,我认为不能从根本上解决消费能力低的问题,因为我这边后端处理涉及到 mongoDB查询和redis的频繁交互,而且已知生产环境mongoDB的内存较低,经过分析以后,认为消费能力低的原因在于mongoDB查询。
所以我对代码做了一些修改,在收到kafka的推送以后,将消息加入队列,这样的话kafka服务端会认为此消息已经被消费,然后我再开启一个线程从队列连拿数据进行异步处理。
代码如下:
import com.alibaba.fastjson.JSONObject;
import com.jmev.driveData.service.SynDriveDataService;
import com.jmev.web.util.SpringUtils;
import com.jmev.web.view.DriveHistory;
import java.util.Date;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Created by FM-Qws on 2018/7/12.
*/
public class HistoryHolder {
private static final ConcurrentLinkedQueue<DriveHistory> queue = new ConcurrentLinkedQueue<>();
private static class HolderClass{
//静态内部类用到的时候再加载
private final static HistoryHolder instance = new HistoryHolder();
}
public static HistoryHolder getInstance() {
return HolderClass.instance;
}
private HistoryHolder() {
//开启线程(我注入的spring bean)
SynDriveDataService synDriveDataService = SpringUtils.getBean("synDriveDataService");
new Thread(new HistoryThread(synDriveDataService)).start();
}
public void addSchedule(String vin, Date time, Integer carId, JSONObject carInfoObj) {
DriveHistory driveHistory = new DriveHistory(vin,time,carId,carInfoObj);
queue.offer(driveHistory);
}
ConcurrentLinkedQueue<DriveHistory> getQueue(){
return queue;
}
}
import com.jmev.common.redis.RedisTool;
import com.jmev.driveData.service.SynDriveDataService;
import com.jmev.web.util.SpringUtils;
import com.jmev.web.view.DriveHistory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by FM-Qws on 2018/7/12.
*/
public class HistoryThread implements Runnable {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
private SynDriveDataService synDriveDataService;
HistoryThread(SynDriveDataService synDriveDataService){
this.synDriveDataService=synDriveDataService;
}
@Override
public void run() {
RedisTool redisTool = SpringUtils.getBean("redisTool");
while (true) {
DriveHistory driveHistory = HistoryHolder.getInstance().getPoll().poll();
if (null !=driveHistory ) {
long l = System.currentTimeMillis();
try {
synDriveDataService.updateByEnd(driveHistory.getVin(), driveHistory.getTime(), driveHistory.getCarId(), driveHistory.getCarInfoObj());
logger.info("结束保存历史轨迹的时间为: "+(System.currentTimeMillis() - l)+"ms");
}catch (Exception e){
redisTool.set("DriveData"+driveHistory.getVin(),"1.0");
logger.error("熄火数据修改失败,vin:" + driveHistory.getVin() + "失败原因: " + e.getMessage());
}
}
}
}
}
DriveHistory是一个实体bean 用于传递kafka的消息具体就不贴了
然后在kafka消费者里面进行添加队列的操作
HistoryHolder.getInstance().addSchedule(vin,time,carId,carInfoObj);
这样的话就是异步处理了,每次在kafka推送到后端以后,我只加入队列,然后其他线程异步处理。