Context:我不一定指的是基于 KCL 的应用程序,只是纯粹的 Kinesis API 调用。
是否使用TRIM_HORIZON
分片迭代器类型立即为您提供流中最早发布的记录(即 Kinesis 内置 24 小时窗口内最早可用的记录),或者只是长达 24 小时前的某个时间段的迭代器/游标,然后您必须使用它来沿着溪流前进,直到达到最早发布的记录?
换句话说,以防不太清楚......
当使用分片迭代器类型时TRIM_HORIZON
,是预期的行为,它将返回 24 小时前可用的记录,但如果 24 小时前恰好发布了零条记录,而不是仅 3 小时前,则您的应用程序将需要迭代轮询之前的 21 条记录距离达到 3 小时前发布的记录还有几个小时?
时间线示例:
- 9 月 29 日上午 5:00 - 创建一个包含 1 个分片的流“foo”
- 9 月 29 日上午 5:02 - 将单个记录“Item=A”发布到“foo”流
- 9 月 29 日 5:03 am - 问题 a
GetShardIterator
打电话给TRIM_HORIZON
作为您的分片迭代器类型,然后发出GetRecords
使用该分片迭代器调用并接收记录“Item=A”
- 9 月 30 日上午 7:02 - 将第二条记录“Item=B”发布到“foo”流
- 9 月 30 日上午 7:03 - 问题 a
GetShardIterator
打电话给TRIM_HORIZON
作为您的分片迭代器类型,然后发出GetRecords
使用该分片迭代器进行调用。这次通话的结果应该是什么? (注意:我们不记得/重复使用步骤 3 中的分片迭代器)
对于上面的步骤 5,距离“Item=A”消息在流上发布已超过 24 小时,而距离“Item=B”发布仅一分钟。一个新的分片迭代器将与TRIM_HORIZON
立即为您提供最早的可用记录,或者您是否需要继续迭代,直到到达某个已发布内容的时间段?
我一直在尝试 Kinesis,昨天或两天前一切都工作正常(即我发布和使用时没有任何问题)。我对代码进行了一些额外的修改,并于今天再次开始发布。当我启动我的消费者时,即使让它运行几分钟也没有任何结果。我尝试同时发布和消费,但仍然一无所获。手动播放后AFTER_SEQUENCE_NUMBER
迭代器类型,并使用几天前消费者日志中的一些序列号,我能够访问我最近发布的消息。但如果我回去使用TRIM_HORIZON
输入,我根本没有看到任何消息。
我看过docs http://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-consumers.html,但我发现的大多数文档都假设您正在使用 KCL(我实际上最初使用的是 KCL,但当它开始失败时,我转向原始 API 调用)并提到您必须有一个应用程序名称,并且 DynamoDB 表用于跟踪状态。如果您使用纯 Kinesis API 调用或 Kinesis CLI(我最终尝试过这两种方法),我可以说这是不正确的。我终于写了一个纯API脚本来开始TRIM_HORIZON
并无限轮询,最终创下新记录(大约进行了 600 次迭代;开始时间晚于“现在”14 小时,发现记录晚于“现在”约 5 小时)。如果这是预期的行为,那么似乎文档中的措辞 http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType只是有点令人困惑/误导:
TRIM_HORIZON - 从分片中最后一个未修剪的记录开始读取
在系统中,这是分片中最旧的数据记录。
我假设(现在看来是错误的)术语“最旧的数据记录”意味着我已发布到流中的记录,而不仅仅是流中的一个时间段。
如果有人可以帮助确认/解释我所看到的行为,那就太好了。
Thanks!