问题
每次系统从带有滑动窗口的 pubsub 收到一条消息时,它都会被复制
The code
| 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))
| 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
| 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
输出
如果我只从 pub/sub 发送一条消息,并尝试在滑动窗口完成后使用代码打印我所拥有的内容:
class print_row2(beam.DoFn):
def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))
结果
('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000
如果我之前打印消息'window' >> beam.WindowInto(window.SlidingWindows(30, 15))
我只得到一次
过程在“图形模式下:
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |=X===========| : :
w2: |==============| :
...
消息 X 在滑动窗口开始时只发送了一次,它应该只接收一次,但已经接收了两次
我尝试过使用 AccumulationMode 值和触发器=AftyerWatermark,但我无法解决问题。
可能出什么问题了?
Extra
对于固定 Windows,这是适合我的目的的正确代码:
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())
or
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())