如果这听起来很愚蠢,请道歉!我们正在使用 flink 进行异步 IO 调用。很多时候,IO 调用会重复(相同的参数集),并且我们调用的大约 80% 的 API 对相同的参数返回相同的响应。因此,我们希望避免再次拨打电话。我们认为我们可以使用状态来存储以前的响应并再次使用它们。问题是,虽然我们的响应可以再次使用,但此类响应的数量很大,因此需要大量内存。有没有办法将其持久化以在需要时驱动和查询?
根本不是一个愚蠢的问题!
一些事实揭示了为什么这并不简单:
- Flink 状态对于单个算子来说是严格本地化的。您无法访问另一个运算符中的状态。
- Flink 提供了一种可以溢出到磁盘的状态后端,即 RocksDB。只有键控状态存储在 RocksDB 中——非键控状态始终存在于堆上。
- 异步 i/o 运算符不能在键控流上使用——它只能在非键控上下文中工作。
- 将迭代(作业图中的循环连接)与 DataStream API 结合使用是一个非常糟糕的主意(因为它会破坏检查点)。
当然,缓存不一定需要处于Flink的托管状态。
一些选项:
- 不要对缓存使用键控状态。您可以使用诸如单独的 RocksDB 实例之类的东西作为缓存,并直接在异步 i/o 运算符中实现缓存。如果缓存适合内存,我建议使用 Guava。
- 不要使用异步 I/O。按照 @YuvalItzchakov 的建议,在 ProcessFunction 中自行进行获取和缓存。
- 你可以使用有状态函数 https://statefun.io反而。这是一个新的库和 API,位于 Flink 之上,克服了上面列出的一些限制。
- 您可以构建如下图所示的东西。这里,缓存在 CoProcessFunction 中以键控状态保存。如果缓存未命中,则使用下游异步 I/O 运算符来获取丢失的数据。然后必须使用外部队列(例如 Kafka、Kinesis 或 Pulsar)将其循环回缓存。
+---------------------+ +------+
| +--results from cache+---------------^--> SINK |
+--requests+------> | CoProcessFunction | | +------+
| | |
+--cache misses+--> | cache in RocksDB | +-----------+ |
| +--side output: | fetch via +---+-> loop back
SOURCES +---------------------+ cache misses+---> | async i/o | as 2nd input
+-----------+ to fill cache
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)