我使用Kafka 0.10,我有一个主题logs
我的物联网设备将日志发布到其中,我的消息的关键是device-id
,所以同一设备的所有日志都在同一个分区。
我有一个 API/devices/{id}/tail-logs
需要显示呼叫时某台设备的最后 N 条日志。
目前,我以一种非常低效的方式(但有效)实现它,因为我从包含设备日志的分区的开头(即最旧的日志)开始,直到达到当前时间戳。
一种更有效的方法是,如果我可以获得当前的最新偏移量,然后向后消费消息(我需要过滤掉一些消息以仅保留我正在寻找的设备的消息)
可以用kafka来做吗?如果不是,如何解决这个问题? (我看到的一个更重的解决方案是将kafka-connect链接到弹性搜索,然后查询elasticsearch,但为此再增加2个组件似乎有点矫枉过正......)
由于您使用的是 0.10.2,我建议编写一个 Kafka Streams 应用程序。应用程序将是有状态的,并且状态将保存每个的最后 N 条记录/日志device-id
-- 如果新数据写入输入主题,Kafka Streams 应用程序将仅更新其状态(无需重新读取整个主题)。
此外,该应用程序还可以满足您的请求(“api/devices/{id}/tail-logs
" using 交互式查询 http://docs.confluent.io/current/streams/developer-guide.html#interactive-queries特征。
因此,我不会构建一个必须重新计算每个请求的答案的无状态应用程序,而是构建一个有状态应用程序,它为所有可能的请求(即,对于所有请求)急切地计算结果(并始终自动更新结果)device-id
s) 并在请求到来时返回已经计算的结果。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)