如何使用相同的 APPLICATION_ID_CONFIG 运行两个或多个拓扑?

2024-06-19

我想在同一个实例上运行 2 个拓扑。 1个拓扑涉及状态存储,其他涉及全局存储。我如何成功地做到这一点?

我创建了 1 个具有 3 个分区的主题,然后在 1 个拓扑中添加了状态存储,在第二个拓扑中添加了全局存储。

拓扑1:

    public void createTopology() {
    Topology topology = new Topology();

    topology.addSource("source", new KeyDeserializer(), new ValueDeserializer(), "topic1");
    topology.addProcessor("processor1", new CustomProcessorSupplier1(), "source");

    final KeyValueStoreBuilder<Bytes, byte[]> rStoreBuilder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier("rstore"), Serdes.Bytes(), Serdes.ByteArray(), Time.SYSTEM);
    rStoreBuilder.withLoggingEnabled(new HashMap<>());

    topology.addStateStore(rStoreBuilder, "processor1");

    Properties p = new Properties();
    p.put(APPLICATION_ID_CONFIG, "stream1");
    p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
    p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
    p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
    streams = new KafkaStreams(topology, p);
    streams.start();
}

拓扑2:

public void createTopology() {
    Topology topology = new Topology();

    final KeyValueStoreBuilder<Bytes, byte[]> rStoreBuilder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier("rstoreg"), Serdes.Bytes(), Serdes.ByteArray(), Time.SYSTEM);
    rStoreBuilder.withLoggingDisabled();

    topology.addGlobalStore(rStoreBuilder, "globalprocessname", Serdes.Bytes().deserializer(), Serdes.ByteArray().deserializer(), "topic1", "processor2", new CustomProcessorSupplier1());

    Properties p = new Properties();
    p.put(APPLICATION_ID_CONFIG, "stream1");
    p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
    p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
    p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
    p.put(STATE_DIR_CONFIG, "/tmp/" + System.getProperty("server.port"));
    streams = new KafkaStreams(topology, p);
    streams.start();
}
}

运行单实例时:-

预期的: state-store 和 global-store 都必须包含所有键(来自 topic1 的所有输入分区的数据)

实际的: 状态存储包含来自 2 个分区的数据 全局存储包含来自 1 个分区的数据

当运行此代码的 2 个实例时:-

预期:两个全局存储都必须包含所有数据。 3 个分区分为 2 个状态存储并包含部分数据

实际:(S表示statestore,G表示全局存储,P表示输入数据的分区) S1 - P1 G1-P2 S2-P3 G2 - P1、P2、P3


问题在于StreamsConfig.APPLICATION_ID_CONFIG。您可以将相同的内容用于两种不同类型的应用程序。

的价值StreamsConfig.APPLICATION_ID_CONFIG用作group.id. group.id用于缩放应用程序。如果您有同一应用程序的两个实例(具有相同的group.id),他们开始处理来自分区子集的消息。

在您的情况下,您有两个不同的应用程序,但它们使用相同的StreamsConfig.APPLICATION_ID_CONFIG。为每个分区分配子集(App1:2 个分区,App2:1 个分区),并且它们仅处理整个消息的子集。它是消费者群体机制。

有关消费者组的更多信息,您可以找到:

  • https://www.confluence.io/blog/apache-kafka-data-access-semantics-consumers-and-membership https://www.confluent.io/blog/apache-kafka-data-access-semantics-consumers-and-membership
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用相同的 APPLICATION_ID_CONFIG 运行两个或多个拓扑? 的相关文章

随机推荐