我可以利用另一个运行 Kafka 应用程序/代码库的团队来使用相同的数据/将其加载到我们的新暂存表中,而不是他们的。他们在“Messages”文件夹中有许多不同的 kafka 侦听器适配器 .java 文件,每个文件消耗不同类型的数据。
每个 kafka 侦听器都有许多导入、一个公共类、实际插入数据的 try 语句等。我需要为每个 kafka 侦听器 adapter.java 文件编辑什么,才能将阶段/目标表从其他团队更改为我的?我发现我只需要更改“groupid”和“topic”变量+(查找具有此格式的文件来配置/添加新的groupid/id: spring.kafka.consumer.group-id=new-groupid spring.kafka.listener .id=新id)。这是两个变量在 .java 文件中的使用方式:
@KafkaListener(topics = "#{new java.util.HashMap(${dataflow.consumer.props.assets.crypto}).keySet()}", groupId = "${dataflow.consumer.assets.crypto.group}",containerFactory = "cryptoListenerContainerFactory", id = "${dataflow.consumer.assets.crypto.group}")
每个 kafka 侦听器适配器都有一个存储库变量,所以我不确定。看起来数据实际上被插入到引用“Repository”的代码块中:
try {
List<FundEntity> cryptoFilteredList = cryptoConsumerRecordList.stream()
.filter(cryptoConsumerRecord -> recordIsNotFilterable(cryptoConsumerRecord, FundEntity.class))
.map(ConsumerRecord::value)
.collect(Collectors.toList());
if (!cryptoFilteredList.isEmpty()) {
this.Repository.storecryptoCollection(cryptoFilteredList, topic, firstOffset, partition, cryptoGroupId);
kafkaDetailsSummary.setStatus(KafkaMessageSummary.KafkaStatus.PROCESSED);
log.info(LogHelper.generateCryptoReconMessage(cryptoFilteredList), keyValue(KAFKA_DETAILS, kafkaDetailsSummary));
}