java中防止线程重复处理

2024-01-08

问题陈述

我有一个JMS侦听器作为侦听主题的线程运行。一旦有消息进来,我就会生成一个新的Thread处理入界消息。因此,对于每条传入的消息,我都会生成一个新的Thread.
我有一个场景,当按顺序立即注入重复消息时,也会处理重复消息。我需要阻止它被处理。我尝试使用ConcurrentHashMap保持我在条目中添加的过程时间Thread已生成并尽快将其从地图中删除Thread完成其执行。但是,当我尝试以并发方式依次传递相同的场景时,它并没有帮助。

在您深入实际的代码库之前先概述一下我的问题

onMessage(){
    processIncomingMessage(){
        ExecutorService executorService = Executors.newFixedThreadPool(1000);
            //Map is used to make an entry before i spawn a new thread to process incoming message
            //Map contains "Key as the incoming message" and "value as boolean"
            //check map for duplicate check
            //The below check is failing and allowing duplicate messages to be processed in parallel
        if(entryisPresentInMap){ 
                //return doing nothing
        }else{
                //spawn a new thread for each incoming message
                //also ensure a duplicate message being processed when it in process by an active thread
        executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //actuall business logic
                    }finally{
                        //remove entry from the map so after processing is done with the message
                    }

                }
        }
    }

模拟场景的独立示例

public class DuplicateCheck {

private static Map<String,Boolean> duplicateCheckMap =
        new ConcurrentHashMap<String,Boolean>(1000);

private static String name=null;
private static String[] nameArray = new String[20];
public static void processMessage(String message){
    System.out.println("Processed message =" +message);

}

public static void main(String args[]){
    nameArray[0] = "Peter";
    nameArray[1] = "Peter";
    nameArray[2] = "Adam";
    for(int i=0;i<=nameArray.length;i++){
    name=nameArray[i];
    if(duplicateCheckMap.get(name)!=null  && duplicateCheckMap.get(name)){
        System.out.println("Thread detected for processing your name ="+name);
        return;
    }
    addNameIntoMap(name);
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                processMessage(name);
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                freeNameFromMap(name);
            }
        }
    }).start();
    }
}

private static synchronized void addNameIntoMap(String name) {
    if (name != null) {
        duplicateCheckMap.put(name, true);
        System.out.println("Thread processing the "+name+" is added to the status map");
    }
}

private static synchronized void freeNameFromMap(String name) {
    if (name != null) {
        duplicateCheckMap.remove(name);
        System.out.println("Thread processing the "+name+" is released from the status map");
    }
}

代码片段如下

public void processControlMessage(final Message message) {
    RDPWorkflowControlMessage rdpWorkflowControlMessage=    unmarshallControlMessage(message);
    final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
    final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
    if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
        log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
        return;
    }else {
        log.info("doing nothing");
    }
    Semaphore controlMessageLock = new Semaphore(1); 
    try{
    controlMessageLock.acquire();
    synchronized(this){
        new Thread(new Runnable(){
            @Override
            public void run() {
                try {
                    lock.lock();
                    log.info("Processing Workflow Control Message for the workflow :"+workflowName);
                    if (message instanceof TextMessage) {
                    if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
                        clearControlMessageBuffer();
                        enableControlMessageStatus(workflowName);
                        List<String> matchingValues=new ArrayList<String>();
                        matchingValues.add(workflowName);
                        ConcreteSetDAO tasksSetDAO=taskEventListener.getConcreteSetDAO();
                        ConcreteSetDAO workflowSetDAO=workflowEventListener.getConcreteSetDAO();
                        tasksSetDAO.deleteMatchingRecords(matchingValues);
                        workflowSetDAO.deleteMatchingRecords(matchingValues);
                        fetchNewWorkflowItems();
                        addShutdownHook(workflowName);
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                } finally {
                    disableControlMessageStatus(workflowName);
                    lock.unlock();
                }
            }
        }).start();
    }
    } catch (InterruptedException ie) {
        log.info("Interrupted Exception during control message lock acquisition"+ie);
    }finally{
        controlMessageLock.release();
    }
}

private void addShutdownHook(final String workflowName) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            disableControlMessageStatus(workflowName);
        }
    });
    log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}

private RDPWorkflowControlMessage unmarshallControlMessage(Message message) {
    RDPWorkflowControlMessage rdpWorkflowControlMessage = null;
    try {
        TextMessage textMessage = (TextMessage) message;
        rdpWorkflowControlMessage = marshaller.unmarshalItem(textMessage.getText(), RDPWorkflowControlMessage.class);
    } catch (Exception e) {
        log.error("Error extracting item of type RDPWorkflowTask from message "
                + message);
    }
    return rdpWorkflowControlMessage;
}

private void fetchNewWorkflowItems() {
    initSSL();
    List<RDPWorkflowTask> allTasks=initAllTasks();
    taskEventListener.addRDPWorkflowTasks(allTasks);
    workflowEventListener.updateWorkflowStatus(allTasks);
}

private void clearControlMessageBuffer() {
    taskEventListener.getRecordsForUpdate().clear();
    workflowEventListener.getRecordsForUpdate().clear();
}

private synchronized void enableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.put(workflowName, true);
        log.info("Thread processing the "+workflowName+" is added to the status map");
    }
}

private synchronized void disableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the "+workflowName+" is released from the status map");
    }
}

我已经修改了我的代码以纳入下面提供的建议,但它仍然不起作用

public void processControlMessage(final Message message) {
    ExecutorService executorService = Executors.newFixedThreadPool(1000);
    try{
        lock.lock();
        RDPWorkflowControlMessage rdpWorkflowControlMessage=    unmarshallControlMessage(message);
        final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
        final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
        if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
            log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
            return;
        }else {
            log.info("doing nothing");
        }
        enableControlMessageStatus(workflowName);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //actual code
                        fetchNewWorkflowItems();
                        addShutdownHook(workflowName);
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                } finally {
                    disableControlMessageStatus(workflowName);
                }
            }
        });
    } finally {
        executorService.shutdown();
        lock.unlock();
    }
}

private void addShutdownHook(final String workflowName) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            disableControlMessageStatus(workflowName);
        }
    });
    log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}

private synchronized void enableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.put(workflowName, true);
        log.info("Thread processing the "+workflowName+" is added to the status map");
    }
}

private synchronized void disableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the "+workflowName+" is released from the status map");
    }
}

这就是向地图添加值的方式。这种双重检查可确保在任何特定时刻只有一个线程向映射添加值,并且您可以在之后控制访问。之后删除所有锁定逻辑。它是如此简单

public void processControlMessage(final  String workflowName) {
    if(!tryAddingMessageInProcessingMap(workflowName)){
           Thread.sleep(1000); // sleep 1 sec and try again
            processControlMessage(workflowName);
        return ;
    }
     System.out.println(workflowName);
     try{
         // your code goes here
     } finally{
         controlMessageStateMap.remove(workflowName);
     }
}

private boolean tryAddingMessageInProcessingMap(final String workflowName) {
    if(controlMessageStateMap .get(workflowName)==null){
        synchronized (this) {
             if(controlMessageStateMap .get(workflowName)==null){
                 controlMessageStateMap.put(workflowName, true);
                 return true;
             }
        }
    }
    return false;
}

在这里阅读更多内容https://en.wikipedia.org/wiki/Double-checked_locking https://en.wikipedia.org/wiki/Double-checked_locking

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java中防止线程重复处理 的相关文章

随机推荐

  • 抛出新的异常最佳实践[关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 以下是好的做法吗 如果不是 应该做什
  • 如何使用 .NET / GDI+ 禁用子采样?

    我正在尝试使用 Bitmap 类保存 JPEG 图像 我注意到无论我指定的质量级别如何 锐利的边缘总是模糊的 我发现这是由于一个或多个通道的二次采样造成的 如何在保存图像时禁用子采样 我目前正在使用这段代码 EncoderParameter
  • 在“世界风”中单击时禁用地球仪移动

    我正在尝试在 世界风 中单击鼠标禁用地球仪的移动 我期望能够做到 void disableGlobeDrag WorldWindowGLCanvas ww ww addMouseMotionListener new MyMouseMotio
  • 获取 S&P 500 股票代码列表

    所以我用这个在 Python for Finance 上 它总是给我错误 1 line 22 in
  • 在 Windows 上的 Emacs 内从 py-shell 运行 Matplotlib 或 enthought.mayavi.mlab

    我可以从常规 cmd shell 正常运行以下代码 import matplotlib pyplot as plt fig plt figure plt show 它启动一个 Matplotlib 窗口 然而 使用 shell 或 py s
  • 在 web.config 的 appSettings 中使用“&”

    我正在尝试将网站地址存储在 web config 文件的 appSettings 部分中 URL 末尾有两个查询字符串参数 因此我需要使用 符号 当我在代码文件中硬编码 URL 时 如果我替换它就可以工作 amp 在配置文件中 这些字母是红
  • django 查询集上的复杂注释

    我遇到一个问题 无法从复杂的环境中获取所需的所有信息 annotate 调用我的 django 视图 这是我的模型 RECORD STATUS 0 Hidden 1 Unhidden 2 Deleted class Activity mod
  • 模拟网络连接不良

    我正在开发一个可以通过 LAN 访问互联网的嵌入式设备 我现在正处于测试阶段 我想测试当互联网连接较差时设备的性能 目前 该设备通过集线器连接到路由器 我用集线器通过 Wireshark 监控数据包 降低设备的互联网速度以模拟可能发生的场景
  • R计算数据帧中相似行的出现次数

    我有以下格式的数据 称为 DF 这只是一个简化的示例 eval num eval count fitness fitness mean green h 0 green v 0 offset 0 random 1 1 1500 1500 10
  • Winform 更新长时间运行的任务

    当应用程序执行长时间运行的任务时 您是否有一个好的解决方案来保持 请稍候 winform 绘制 我尝试在每个步骤中使用 form refresh 但是会发生一些长时间运行的查询 这意味着这不够频繁 基本上是这个那么问题 https stac
  • 简单英语中的乔姆斯基层次结构

    我试图找到乔姆斯基提出的 4 个级别的正式语法 无限制 上下文相关 上下文无关 常规 的简单 即非形式 解释 我已经很久没有学习正式语法了 各种定义现在让我难以想象 明确地说 我是not寻找随处可见的正式定义 例如here http en
  • 如何在管理面板中使用动态数据?

    class Message models Model date models DateField user models ForeignKey UserEx groups models ManyToManyField Group main
  • 如何通过仅编写一次来在客户端和服务器之间共享域类和业务规则,就像在 RIA 服务中一样

    在 ASP NET WCF 应用程序中 我希望在客户端和服务器之间共享域类和业务规则 而不需要重写它们 就像在 Silverlight RIA 服务中一样 将这些收集在一个程序集中并从客户端和服务器引用它可以解决问题 但是如何解决 通过向客
  • 反应性可观察订阅处置

    如果我有权访问一个我知道只会返回一项的 IObservable 这是否有效 它是最佳使用模式吗 IDisposable disposable null disposable myObservable Subscribe x gt DoThi
  • 禁用 Heroku 路由器日志

    我写了一个 Heroku 应用程序 这是一个非常简单的 API 一些GET向其发出的请求包含其参数中的敏感信息 最好不要让 Heroku 记录这些敏感信息 有什么办法可以拥有 Herokunot记录请求或者更好的是 截断路径以使其不包含参数
  • c++ 存储函数和参数列表以供以后使用

    所以我想通过用 C 编写一个小型线程池来挑战自己 并且我想尝试模仿 std thread 的易用方式 您可以创建一个线程并作为参数发送一个函数和参数与 pthreads 之类的函数相比 pthreads 强制您将 void 作为该函数的唯一
  • 刺激控制器在 Rails 7 应用程序中根本无法运行

    我真的很难让 Stimulus 控制器在我正在开发的 Rails 7 应用程序中运行 并且非常感谢任何人可能提供的帮助 我一直在旋转我的轮子 我的应用程序 js Configure your import map in config imp
  • (注意)child pid XXXX 退出信号分段错误(11),/etc/apache2 中可能存在 coredump

    我的 Apache 日志中不断收到以下错误 Wed Sep 18 17 59 20 2013 notice Apache 2 2 22 Ubuntu PHP 5 3 10 1ubuntu3 8 with Suhosin Patch conf
  • 如何为内存中的 HSQL 编写自定义函数

    我想在 HSQL 中编写一个简单的函数 以便它向后兼容 DB2 函数 理论上 我应该能够用 java 编写自定义函数并将其挂接到 HSQL 中 有这方面的任何指导 文件吗 用户定义的函数记录在 HSQLDB 指南中 有关用 Java 编写的
  • java中防止线程重复处理

    问题陈述 我有一个JMS侦听器作为侦听主题的线程运行 一旦有消息进来 我就会生成一个新的Thread处理入界消息 因此 对于每条传入的消息 我都会生成一个新的Thread 我有一个场景 当按顺序立即注入重复消息时 也会处理重复消息 我需要阻