Kafka流处理器线程安全吗?

2023-12-11

我知道这个问题之前在这里被问过:卡夫卡流并发?

但这对我来说很奇怪。根据文档(或者也许我遗漏了一些东西),每个分区都有一个任务,意味着不同的处理器实例,并且每个任务都由不同的线程执行。但是当我测试它时,我发现不同的线程可以获得不同的处理器实例。因此,如果您想在处理器中保留任何内存状态(老式方式),则必须锁定?

示例代码:

public class SomeProcessor extends AbstractProcessor<String, JsonObject> {

   private final String ID = UUID.randomUUID().toString();

   @Override
   public void process(String key, JsonObject value) {
     System.out.println("Thread id: " + Thread.currentThread().getId() +" ID: " + ID);

OUTPUT:

线程 ID:88 ID:26b11094-a094-404b-b610-88b38cc9d1ef

线程 ID:88 ID:c667e669-9023-494b-9345-236777e9dfda

线程 ID:88 ID:c667e669-9023-494b-9345-236777e9dfda

线程 ID:90 ID:0a43ecb0-26f2-440d-88e2-87e0c9cc4927

线程 ID:90 ID:c667e669-9023-494b-9345-236777e9dfda

线程 ID:90 ID:c667e669-9023-494b-9345-236777e9dfda

有没有办法强制每个实例线程?


每个实例的线程数是一个配置参数(num.stream.threads默认值为1)。因此,如果您开始单个KafkaStreams你得到的实例num.stream.threads线程。

任务将工作拆分为并行单元(基于您的输入主题分区),并将分配给线程。因此,如果您有多个任务和一个线程,则所有任务都将分配给该线程。如果你有两个线程(总和KafkaStreams实例)每个线程执行大约 50% 的任务。

注意:由于 Kafka Streams 应用程序本质上是分布式的,因此如果运行单个应用程序没有区别KafkaStreams具有多个线程或多个的实例KafkaStreams每个实例都有一个线程。任务将分布在应用程序的所有可用线程上。

如果您想在任务之间共享任何数据结构,并且您有多个线程,则您有责任同步对此数据结构的访问。请注意,任务到线程的分配可能会在运行时发生变化,因此所有访问都必须同步。但是,不建议使用此模式,因为它限制了可扩展性。您应该设计没有共享数据结构的程序!这样做的主要原因是,您的程序通常分布在多台机器上,因此,不同的KafkaStreams实例无论如何都无法访问共享数据结构。共享数据结构只能在单个 JVM 中工作,但使用单个 JVM 会阻止应用程序的水平扩展。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka流处理器线程安全吗? 的相关文章

随机推荐