我想在同一个实例上运行 2 个拓扑。 1个拓扑涉及状态存储,其他涉及全局存储。我如何成功地做到这一点?
我创建了 1 个具有 3 个分区的主题,然后在 1 个拓扑中添加了状态存储,在第二个拓扑中添加了全局存储。
拓扑1:
public void createTopology() {
Topology topology = new Topology();
topology.addSource("source", new KeyDeserializer(), new ValueDeserializer(), "topic1");
topology.addProcessor("processor1", new CustomProcessorSupplier1(), "source");
final KeyValueStoreBuilder<Bytes, byte[]> rStoreBuilder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier("rstore"), Serdes.Bytes(), Serdes.ByteArray(), Time.SYSTEM);
rStoreBuilder.withLoggingEnabled(new HashMap<>());
topology.addStateStore(rStoreBuilder, "processor1");
Properties p = new Properties();
p.put(APPLICATION_ID_CONFIG, "stream1");
p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
streams = new KafkaStreams(topology, p);
streams.start();
}
拓扑2:
public void createTopology() {
Topology topology = new Topology();
final KeyValueStoreBuilder<Bytes, byte[]> rStoreBuilder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier("rstoreg"), Serdes.Bytes(), Serdes.ByteArray(), Time.SYSTEM);
rStoreBuilder.withLoggingDisabled();
topology.addGlobalStore(rStoreBuilder, "globalprocessname", Serdes.Bytes().deserializer(), Serdes.ByteArray().deserializer(), "topic1", "processor2", new CustomProcessorSupplier1());
Properties p = new Properties();
p.put(APPLICATION_ID_CONFIG, "stream1");
p.put(BOOTSTRAP_SERVERS_CONFIG, KafkaUtil.getBootStrapServers());
p.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, KeySerde.class);
p.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueSerde.class);
p.put(STATE_DIR_CONFIG, "/tmp/" + System.getProperty("server.port"));
streams = new KafkaStreams(topology, p);
streams.start();
}
}
运行单实例时:-
预期的:
state-store 和 global-store 都必须包含所有键(来自 topic1 的所有输入分区的数据)
实际的:
状态存储包含来自 2 个分区的数据
全局存储包含来自 1 个分区的数据
当运行此代码的 2 个实例时:-
预期:两个全局存储都必须包含所有数据。 3 个分区分为 2 个状态存储并包含部分数据
实际:(S表示statestore,G表示全局存储,P表示输入数据的分区)
S1 - P1
G1-P2
S2-P3
G2 - P1、P2、P3