Java,在多线程环境中通过哈希统一划分传入的工作

2024-01-10

我已经实现了一个java代码来执行传入的任务(如Runnable) 具有基于 hashCode 模块的 n 个线程nThreads。理想情况下,工作应该在这些线程之间均匀地分布。 具体来说,我们有一个dispatchId作为每个任务的字符串。

这是这个java代码片段:

int nThreads = Runtime.getRuntime().availableProcessors(); // Number of threads
Worker[] workers = new Worker[nThreads]; // Those threads, Worker is just a thread class that can run incoming tasks
...
Worker getWorker(String dispatchId) { // Get a thread for this Task
    return workers[(dispatchId.hashCode() & Integer.MAX_VALUE) % nThreads];
}

重要的:在大多数情况下,dispatchId 是:

String dispatchId = 'SomePrefix' + counter.next()

但是,我担心 nThreads 的模除不是一个好的选择,因为 nThreads 应该是素数,以便更均匀地分布 dispatId 键。

对于如何更好地开展工作还有其他选择吗?

更新1:

每个Worker都有一个队列:Queue<RunnableWrapper> tasks = new ConcurrentLinkedQueue();

工作人员从中获取任务并执行它们。可以从其他线程将任务添加到此队列。

更新2:

任务相同dispatchId可以多次出现,因此我们需要通过以下方式找到他们的线程dispatchId.

最重要的是,每个工作线程必须顺序处理其传入的任务。因此,上面的更新1中就有了数据结构Queue。

更新3:此外,某些线程可能很忙,而另一些线​​程则空闲。因此,我们需要以某种方式将队列与线程解耦,但保持相同的 FIFO 顺序dispatchId用于任务执行。

解决方案:我已经实现了 Ben Manes 的想法(他的答案如下),代码可以找到here https://github.com/vibneiro/dispatching.


听起来您需要每个调度 ID 进行 FIFO 排序,因此理想的情况是将调度队列作为抽象。这可以解释您对散列的担忧,因为它不提供均匀分布,因为某些调度队列可能比其他队列更活跃,并且在工作人员之间不公平地平衡。通过将队列与工作线程分离,您可以保留 FIFO 语义并均匀地分散工作。

提供这种抽象的非活动库是轴调度 https://github.com/fusesource/hawtdispatch。它与 Java 6 兼容。

一个非常简单的 Java 8 方法是使用完整的未来 https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html作为排队机制,并发哈希映射 https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html注册和执行人(例如分叉连接池 https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html)用于计算。看事件调度器 https://github.com/ben-manes/caffeine/blob/master/jcache/src/main/java/com/github/benmanes/caffeine/jcache/event/EventDispatcher.java为了实现这个想法,注册是明确的。如果您的调度员更加动态,那么您可能需要定期修剪地图。基本思路如下。

ConcurrentMap<String, CompletableFuture<Void>> dispatchQueues = ...

public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
  return dispatchQueues.compute(queueName, (k, queue) -> {
    return (queue == null)
        ? CompletableFuture.runAsync(task)
        : queue.thenRunAsync(task);
  });
}

更新(JDK7)

上述想法的反向移植将用 Guava 翻译成类似的东西:

ListeningExecutorService executor = ...
Striped<Lock> locks = Striped.lock(256);
ConcurrentMap<String, ListenableFuture<?>> dispatchQueues = ...

public ListenableFuture<?> dispatch(String queueName, final Runnable task) {
  Lock lock = locks.get(queueName);
  lock.lock();
  try {
    ListenableFuture<?> future = dispatchQueues.get(queueName);
    if (future == null) {
      future = executor.submit(task);
    } else {
      final SettableFuture<Void> next = SettableFuture.create();
      future.addListener(new Runnable() {
        try {
          task.run();
        } finally {
          next.set(null);
        }
      }, executor);
      future = next;
    }
    dispatchQueues.put(queueName, future);
  } finally {
    lock.unlock();
  }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java,在多线程环境中通过哈希统一划分传入的工作 的相关文章

  • Base36 编码字符串?

    我一直在网上查找 但找不到解决此问题的方法 在 Python Ruby 或 Java 中 如何对以下字符串进行 Base 36 编码 nOrG9Eh0uyeilM8Nnu5pTywj3935kW 5 Ruby 以 36 为基数 s unpa
  • (Java) App Engine 中的静态文件无法访问

    The 示例文档 http code google com appengine docs java gettingstarted staticfiles html表示您只需将文件放在 war 或子目录 中 并且应该可以从主机访问它们 只要它
  • tomcat 7.0.50 java websocket 实现给出 404 错误

    我正在尝试使用 Java Websocket API 1 0 JSR 356 中指定的带注释端点在 tomcat 7 0 50 上实现 websocket 以下是我如何对其进行编码的简要步骤 1 使用 ServerEndpoint注解编写w
  • FileNotFoundException - Struts2 文件上传

    Strange FileNotFoundException使用Struts2上传文件时 这是 JSP 的一部分
  • 如何在java Spring Boot中实现通用服务类?

    我有许多具有重复代码的服务 我想知道如何实现通用服务 以便我的所有服务都可以扩展它 服务接口示例 重复代码 Service public interface IUserService List
  • Spring数据中的本机查询连接

    我有课 Entity public class User Id Long id String name ManyToMany List
  • 如何检测图像是否像素化

    之前有人在 SO 上提出过这样的问题 在Python中检测像素化图像 https stackoverflow com questions 12942365 detecting a pixelated image in python还有关于q
  • Android蓝牙java.io.IOException:bt套接字已关闭,读取返回:-1

    我正在尝试编写一个代码 仅连接到运行 Android 5 0 KitKat 的设备上的 目前 唯一配对的设备 无论我尝试了多少方法 我仍然会收到此错误 这是我尝试过的最后一个代码 它似乎完成了我看到人们报告为成功的所有事情 有人能指出我做错
  • 在 MongoDB 和 Apache Solr 之间同步数据的简单方法

    我最近开始使用 MongoDB 和 Apache Solr 我使用 MongoDB 作为数据存储 并且希望 Apache Solr 为我的数据创建索引 以实现应用程序中的搜索功能 经过一些研究 我发现 基本上有两种方法可以在 MongoDB
  • Java:从集合中获取第一项

    如果我有一个集合 例如Collection
  • 从直方图计算平均值和百分位数?

    我编写了一个计时器 可以测量任何多线程应用程序中特定代码的性能 在下面的计时器中 它还会在地图中填充花费了 x 毫秒的调用次数 我将使用这张图作为我的直方图的一部分来进行进一步的分析 例如调用花费了这么多毫秒的百分比等等 public st
  • 当 minifyEnabled 为 true 时 Android 应用程序崩溃

    我正在使用多模块应用程序 并且该应用程序崩溃时minifyEnabled true in the installed模块的build gradle 以下是从游戏控制台检索到的反混淆堆栈跟踪 FATAL EXCEPTION Controlle
  • Spring Data JPA:查询如何返回非实体对象或对象列表?

    我在我的项目中使用 Spring Data JPA 我正在演奏数百万张唱片 我有一个要求 我必须获取各种表的数据并构建一个对象 然后将其绘制在 UI 上 现在如何实现我的 Spring 数据存储库 我读到它可以通过命名本机查询来实现 如果指
  • JAVA中遍历JSON数据

    我是 JSON 新手 我使用 HTTPUrlConnections 并在 JAVA 程序中获得一些响应 响应数据将类似于 data id 1 userId 1 name ABC modified 2014 12 04 created 201
  • 什么是竞争条件?

    编写多线程应用程序时 最常见的问题之一是竞争条件 我向社区提出的问题是 竞赛条件是什么 你如何检测它们 你如何处理它们 最后 如何防止它们发生 当两个或多个线程可以访问共享数据并且它们试图同时更改它时 就会出现竞争条件 由于线程调度算法可以
  • 我可以限制分布式应用程序发出的请求吗?

    我的应用程序发出 Web 服务请求 提供商处理的请求有最大速率 因此我需要限制它们 当应用程序在单个服务器上运行时 我曾经在应用程序级别执行此操作 一个对象跟踪到目前为止已发出的请求数量 并在当前请求超出允许的最大负载时等待 现在 我们正在
  • 如何处理 StaleElementReferenceException

    我正在为鼠标悬停工作 我想通过使用 for 循环单击每个链接来测试所有链接的工作条件 在我的程序中 迭代进行一次 而对于下一次迭代 它不起作用并显示 StaleElementReferenceException 如果需要 请修改代码 pub
  • 源值 1.5 的错误已过时,将在未来版本中删除

    我使用 scala maven plugin 来编译包含 scala 和 java 代码的项目 我已经将源和目标设置为1 7 但不知道为什么maven仍然使用1 5 这是我在 pom xml 中的插件
  • 如何高效计算连续数的数字积?

    我正在尝试计算数字序列中每个数字的数字乘积 例如 21 22 23 98 99 将会 2 4 6 72 81 为了降低复杂性 我只会考虑 连续的数字 http simple wikipedia org wiki Consecutive in
  • HttpClient请求设置属性问题

    我使用这个 HttpClient 库玩了一段时间 几周 我想以某种方式将属性设置为请求 不是参数而是属性 在我的 servlet 中 我想使用 Integer inte Integer request getAttribute obj 我不

随机推荐

  • PHP 是否优化尾递归?

    我写了一小段代码 我相信如果尾递归被优化的话应该会成功 但是它炸毁了堆栈 我应该得出 PHP 没有优化尾递归的结论吗 function sumrand n sum if n 0 return sum else return sumrand
  • C#,后台工作者

    我有一个示例 WinForms 应用程序 它使用BackgroundWorker成分 它工作正常 但是当我点击Cancel按钮取消后台线程 但它不会取消线程 当我击中Cancel呼叫按钮 CancelAsync 方法 然后在RunWorke
  • Facebook 示例拼图:河内塔

    这是 Facebook 招聘样本测试中的一个问题 有 K 个钉子 当从桩的底部到顶部观察时 每个桩可以按半径递减的顺序容纳圆盘 有N个圆盘 半径为1到N 给定钉子的初始配置和钉子的最终配置 输出从初始配置转换到最终配置所需的移动 您需要以最
  • pandas 混合位置和标签索引,无需链接

    Since ix已经自 Pandas 0 20 起已弃用 http pandas docs github io pandas docs travis whatsnew html deprecate ix 我想知道在 Pandas 中混合基于
  • 是否可以调整嵌入的 .mov 的大小?

    我嵌入的 mov 剪辑有时比显示它的位置大 所以我想调整剪辑的大小 曾尝试过width and height但这只会改变显示它的区域 它不会调整实际电影的大小 可以调整影片大小吗 如果是 怎么办 是的 您需要scale属性 有 QuickT
  • 如何从aspx页面中抓取图像?

    我在尝试着scrape来自 aspx 页面的图像 我有这段代码 可以从普通网页中抓取图像 但无法抓取 aspx 页面 因为我需要将 http post 请求发送到 aspx 页面 即使在阅读了几个线程之后 我也不知道如何做到这一点是原来的代
  • 使用蓝牙 LE 在 iOS 和 Android 之间进行通信

    我有一个工作应用程序 使用 CoreBluetooth 在 iPad 中央 和 iPhone 外围 之间进行通信 我有一项服务有两个特点 我的 Nexus 7 运行最新的 Android 4 3 支持 BTLE Android 加入 BTL
  • 给定中序和后序遍历,如何输出树的前序遍历?

    给出当我在整数数组中具有先序和中序遍历时输出树的后序遍历的代码 如何使用给定的中序和后序数组来类似地获取前序 void postorder int preorder int prestart int inorder int inostart
  • 手势字符识别

    I use Dio dictionary function gesture on screen get text to search Library that in Android can make to be the function o
  • 如何使用 SI 符号格式化 noUiSlider 的刻度标签?

    这是我的 noUiSlider 的样子 我想将刻度标签格式设置为 1000 gt 1K 900000 gt 900K 5000000 gt 5M 等 即用适当的 SI 符号缩写数字 library shiny library shinyWi
  • function '<-'/2 undefined 接收块中出现错误 Elixir

    这是我的 Elixir 代码 defmodule ErlProcess do def receiver do receive do sayHello msg gt sender lt ok ok end end end 但它给出了这个错误
  • NVM无法在ubuntu 18.04上安装nodejs

    我尝试在 ubuntu 18 04 上使用 nvm 安装nodejs 但每次它都会抛出以下错误堆栈 pasindu pasindu HP EliteBook 850 G7 Notebook PC nvm install 0 10 35 Do
  • 关于端点接口的 jax-ws

    使用注释定义了 Java 接口 网络服务编译了代码 一切顺利 Example WebService public interface HelloWorldIfc 现在我尝试将端点接口定义为 WebService endpointInterf
  • 是否有一个简单的 Bugzilla/Trac 客户端供非软件人员使用?

    我知道这不完全是一个编程问题 但它直接影响我们的开发人员和我们分配编写的代码 如果有另一个类似的论坛可以更好地发布这个问题 请告诉我 我会从这里记下这个问题并将其发布到那里 我们的工作环境是几个开发人员为工厂生产车间创建 20 30 和维护
  • JavaFX 项目结构

    JavaFX 使用 FXML 的 MVC 模型听起来很棒 但我在找出如何组织我的项目包时遇到了困难 我发现的关于 JavaFX 的每一个教程都太简单且无组织 他们只是创建一个包并在那里创建所有内容 每个控制器 每个 fxml 每个 css
  • 如何在属性文件的数值中包含 _ ?

    我怎样才能拥有 下划线 在我的数值属性中 同时注入 ValueSpring中的注解 如果我包括 按照我的价值观 Spring 会抛出TypeMismatchException properties 文件 min score 20 000 j
  • 编译器:理解小程序生成的汇编代码

    我正在自学编译器是如何工作的 我正在通过阅读反汇编来学习GCC从小型 64 位 Linux 程序生成代码 我写了这个C程序 include
  • Rails 4 - 仅当当前密码正确时才允许更改密码

    在我的应用程序中 用户可以编辑他们的个人资料信息 在编辑个人资料表单上 用户可以更改所有字段 姓名 职务等 在同一个表单上有三个字段 current password password and password confirmation 我
  • C# 中的值类型类定义?

    是否可以创建一个不是结构而是值类型的类 或者类似于值类型 因为它在传递时进行复制而不是通过引用传递 edit 抱歉 问题在提出后必须进行编辑 另外 请参阅此问题以获取更多信息 在不存在的struct布局中循环 https stackover
  • Java,在多线程环境中通过哈希统一划分传入的工作

    我已经实现了一个java代码来执行传入的任务 如Runnable 具有基于 hashCode 模块的 n 个线程nThreads 理想情况下 工作应该在这些线程之间均匀地分布 具体来说 我们有一个dispatchId作为每个任务的字符串 这