问题背景
公司内部多个系统共用一套用户体系库,对外(钉钉)我们是两个客户身份(这里是根据系统来的),例如当第三方服务向我们发起用户同步请求:是一个更新用户操作,它会同时发送一个 delete 和 insert 请求,这两个请求几乎是并发进来的,实际上应该是先发起的delete 再 insert, 实际情况可能和网络延迟也有关系,此时在我们系统中就无法保证这两个请求的顺序执行,即先 delete 处理完之后 再进行 insert 的数据处理(正常流程),又或者直接把一定时间内同一个用户的 delete 和 insert 操作合并为一个update操作(本质就是更新操作)。
还有一种情况是:第三方系统中添加或者 删除一个用户时,会以两个客户的身份去发送两个相同的用户同步请求,但同一个用户在我们系统内用户数据只有一份,对应的接口肯定也都是同一个,即相同的添加接口会在一瞬间被调用两次,删除即使执行两次的话也没什么问题,问题是添加 即使在添加前判断了用户账号是否存在 并发过来的情况下还是避免不了一些脏数据的产生,加锁的话对整体影响又特别大。
处理思路
根据userId
(账号)为每个请求分配一个房间(单独的线程),如果是第一次进来那么就new
一个房间(也就是类,里边会有一个单独的线程处理这个用户的行为),后边一定时间内相同的 userId 进来会找到对应已存在的房间,当设置的时间窗口到了之后,判断当前userId的同步行为有哪些,如果有 insert 和 delete,那么直接转为 update 操作。如果是两个insert行为,那么最后就只调用一次insert服务,如果是两个delete行为,那么就只调用一个delete服务。
注意事项
时间窗口的设定,如果时间设置过短,属于同一个操作的请求因为网络波动 请求到接口的时间会有一定间隔,如果你设置的时间间隔小于等待的时间,还是会把本就属于同一批次的操作 多次处理
测试过程:刚开始时间设置的1500ms,也就是当第一个userId进来后,等待1.5秒后根据这段时间内收集到的用户行为再去真正的处理,后来在测试中发现有些本就属于同一批次的请求还是会被处理多次,也就是时间调小了,改成2000ms,测试还是发现同样的问题。最后:采取的是根据最近一个的userId请求的时间 等待1500ms,即相同的userId的请求进来后 在当前时间再重新计算等待1500ms,时间到了之后没有发现新的用户行为即算是一个批次结束
ps:可以创建一个单独的服务来负责对请求进行合理的处理分发,处理之后再去调用对应的业务系统服务
代码实现
定义操作行为枚举
public enum OperationEnum {
INSERT("insert"),
DELETE("delete"),
;
private final String value;
OperationEnum(String operation) {
this.value = operation;
}
public String getValue() {
return value;
}
}
定义每个用户所属的房间,房间内存储用户的多个行为(insert、delete):
public class ActionRoomBean {
private Map<String, JSONObject> actionDataMap = new HashMap<>();
private String userId;
private DispatchTask dispatchTask;
public void addAction(String action, JSONObject data) {
if(dispatchTask != null){
dispatchTask.getIncrementAndGet();
}
if (actionDataMap.containsKey(action)) {
return;
}
actionDataMap.put(action, data);
}
public ActionRoomBean() {
}
public ActionRoomBean(String userId, Map<String, JSONObject> actionDataMap) {
this.actionDataMap = actionDataMap;
this.userId = userId;
}
public void startManager() {
dispatchTask = new DispatchTask(userId, actionDataMap);
new Thread(dispatchTask).start();
}
}
房间内真正的执行者(子线程):
public class DispatchTask implements Runnable {
private static long sleepTime = 1500;
private final AtomicInteger count = new AtomicInteger(0);
Map<String, JSONObject> actionDataMap;
String userId;
public DispatchTask(String userId, Map<String, JSONObject> dataLib) {
this.userId = userId;
this.actionDataMap = dataLib;
}
@Override
public void run() {
try {
int afterCount = 0;
while (afterCount == 0 || afterCount != count.get()){
afterCount = count.incrementAndGet();
Thread.sleep(sleepTime);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
String url = RestTemplateUtil.DD_READING_API_URL;
JSONObject param = null;
if (actionDataMap.containsKey(OperationEnum.INSERT.getValue()) && actionDataMap.size() == 1) {
url += "/nc/eduUserInsert";
param = actionDataMap.get(OperationEnum.INSERT.getValue());
if (param != null) {
RestTemplateUtil.postForObject(url, param.toJSONString());
}
} else if (actionDataMap.containsKey(OperationEnum.DELETE.getValue()) && actionDataMap.size() == 1) {
url += "/nc/eduUserDelete";
param = actionDataMap.get(OperationEnum.DELETE.getValue());
if (param != null) {
RestTemplateUtil.postForObject(url, param.toJSONString());
}
} else if (actionDataMap.containsKey(OperationEnum.INSERT.getValue()) && actionDataMap.containsKey(OperationEnum.DELETE.getValue())) {
url += "/nc/eduUserUpdate";
param = actionDataMap.get(OperationEnum.INSERT.getValue());
if (param != null) {
RestTemplateUtil.postForObject(url, param.toJSONString());
}
}
} finally {
DispatchController.closeRoom(userId);
}
}
public Integer getIncrementAndGet() {
return count.incrementAndGet();
}
}
控制器:
@RestController
@RequestMapping(value = "/api/dd")
public class DispatchController {
private final static Map<String, ActionRoomBean> allMap = new ConcurrentHashMap<>();
@Value("${url.secret}")
private String secret;
@PostMapping(value = "/dispatch")
public Result dispatch(@RequestBody JSONObject jsonObject,
@RequestHeader(value = "secret") String secret){
if(!Objects.equals(secret, this.secret)){
return Result.generateError("secret eroor");
}
String userId = jsonObject.getString("userId");
String operation = jsonObject.getString("operation");
if(EmptyUtil.isNotEmpty(userId) && EmptyUtil.isNotEmpty(operation)){
unboltRoom(userId, operation, jsonObject);
}
return Result.generateSuccess();
}
private void unboltRoom(String userId, String operation, JSONObject jsonObject) {
synchronized (this) {
ActionRoomBean room = allMap.get(userId);
if (room == null) {
Map<String, JSONObject> actionMap = new HashMap<>(4);
actionMap.put(operation, jsonObject);
room = new ActionRoomBean(userId, actionMap);
room.startManager();
allMap.put(userId, room);
}
room.addAction(operation, jsonObject);
}
}
public static void closeRoom(String userId) {
allMap.remove(userId);
}
}
整体核心代码就是上边这些,以上还可以通过线程池去优化一下。
如果涉及到批量导入,同时有大量用户同步数据过来,就需要在测试环境进行反复测试 看是否会丢数据(因为每个用户都是一个独立的子线程),对线程的数量进行优化。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)