我开发的 Web 应用程序是一个基于多租户云的应用程序(很多客户端,每个客户端都有自己独立的“环境”,但都在共享的硬件集上),我们正在引入用户批量处理的功能为后期处理工作。批处理工作的类型实际上并不重要,只是数量足够,没有工作队列就不太实际。我们选择 RabbitMQ 作为我们的底层队列框架。
因为我们是一个多租户应用程序,所以我们不一定希望客户端能够导致另一个客户端的队列处理时间过长,因此我们提出的一个想法是在每个客户端的基础上创建一个队列,并让一个共享工作池指向我们所有的客户端队列。问题是,据我所知,工作人员直接绑定到特定队列,而不是交换器。在我们的理想世界中,我们的客户端队列仍将在共享工作池中进行处理,而不会出现一个客户端阻塞另一个客户端的情况,我们可以根据需要通过启动更多工作线程或关闭空闲工作线程来扩大或缩小该工作线程池。从实际意义上讲,将工作人员绑定到特定队列可以防止我们这样做,因为我们经常有很多工作人员在队列中闲置而没有任何活动。
有没有一个相对直接的方法可以实现这一点?我对 RabbitMQ 相当陌生,还没有真正能够完成我们所追求的目标。我们也不想编写非常复杂的多线程消费者应用程序,这会浪费我们可能无法承受的开发和测试时间。如果确实如此,我们的堆栈是基于 Windows/.Net/C# 的,但我认为这不会对当前的问题产生重大影响。
您可以查看优先级队列实现(最初提出此问题时尚未实现):https://www.rabbitmq.com/priority.html https://www.rabbitmq.com/priority.html
如果这对您不起作用,您可以尝试其他一些技巧来实现您想要的效果(应该适用于旧版本的 RabbitMQ):
您可以将 100 个队列绑定到主题交换,并将路由键设置为用户 ID % 100 的哈希值,即每个任务将具有 1 到 100 之间的键,并且同一用户的任务将具有相同的键。每个队列都与 1 到 100 之间的唯一模式绑定。现在,您拥有一组工作人员,它们以随机队列号开始,然后在每个作业后递增该队列号,再次 % 100 在队列 100 后循环回到队列 1。
现在,您的工作人员队列可以并行处理最多 100 个唯一用户,或者如果没有其他工作要做,所有工作人员都可以专注于单个用户。如果工作人员需要在每个作业之间循环遍历所有 100 个队列,那么在只有单个用户在单个队列上有大量作业的情况下,每个作业之间自然会产生一些开销。减少队列数量是解决此问题的一种方法。您还可以让每个工作线程保持与每个队列的连接,并使用每个队列最多一条未确认的消息。如果未确认的消息超时设置得足够高,工作人员就可以更快地循环浏览内存中的待处理消息。
或者,您可以创建两个交换器,每个交换器都有一个绑定队列。所有工作都进入第一个交换器和队列,由工作池消耗。如果一个工作单元花费的时间太长,工作人员可以取消它并将其推送到第二个队列。当第一个队列上没有任何内容时,工作人员仅处理第二个队列。您可能还需要几个具有相反队列优先级的工作人员,以确保当有永无休止的短任务流到达时,长时间运行的任务仍然得到处理,以便最终始终处理用户批次。这不会真正将您的工作人员队列分布在所有任务中,但它会阻止一个用户的长时间运行任务,从而阻止您的工作人员为同一用户或另一个用户执行短期运行任务。它还假设您可以取消作业并稍后重新运行它,不会出现任何问题。这还意味着超时且需要以低优先级重新运行的任务将会浪费资源。除非你能提前识别快任务和慢任务
如果单个用户有 100 个缓慢任务,然后另一个用户发布一批任务,则第一个建议的 100 个队列也可能存在问题。在其中一项缓慢任务完成之前,不会查看这些任务。如果事实证明这是一个合理的问题,您可以将这两种解决方案结合起来。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)