我使用 Kafka 和 Kafka Streams 作为 Spring Cloud Stream 的一部分。我的 Kafka Streams 应用程序中流动的数据正在按特定时间窗口进行聚合和具体化:
Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> oneHour = Materialized.as("one-hour-store");
oneHour.withLoggingEnabled(topicConfig);
events
.map(getStringSensorMeasurementKeyValueKeyValueMapper())
.groupByKey()
.windowedBy(TimeWindows.of(oneHourStore.getTimeUnit()))
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
(oneHour));
按照设计,正在具体化的信息也由变更日志主题支持。
我们的应用程序还有一个休息端点,它将像这样查询状态存储:
ReadOnlyWindowStore<String, Double> windowStore = queryableStoreRegistry.getQueryableStoreType("one-hour-store", QueryableStoreTypes.windowStore());
WindowStoreIterator<ErrorScore> iter = windowStore.fetch(key, from, to);
查看创建的变更日志主题的设置,内容如下:
min.insync.replicas 1
cleanup.policy delete
retention.ms 5259600000
retention.bytes -1
我假设当地国营商店至少会将信息保留 61 天(约 2 个月)。然而,商店中似乎只保留了最后一天的数据。
是什么导致数据这么快就被删除了?
更新解决方案Kafka Streams 2.0.1版本不包含Materialized.withRetention方法。对于这个特定版本,我可以使用以下代码设置状态存储的保留时间,这解决了我的问题:
TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
timeWindows.until(retentionMs);
使我的代码写成:
...
.groupByKey()
.windowedBy(timeWindows)
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
(oneHour));
...