TLDR;
如何使用 python SDK 更正触发计数窗口?
Problem
我正在尝试创建一个用于转换和索引维基百科转储的管道。
目标是:
- 从压缩文件中读取 - 只需一个进程并以流式传输方式,因为该文件不适合 RAM
- 并行处理每个元素 (ParDo)
- 将这些元素分组到一个进程中的计数窗口中(GroupBy 仅在一个键中进行流式处理 -> 批处理),以将它们保存在数据库中。
发展
为此,我创建了一个简单的源类,它返回以下形式的元组(索引、数据、计数):
class CountingSource(beam.io.filebasedsource.FileBasedSource):
def read_records(self, file_name, offset_range_tracker):
# timestamp = datetime.now()
k = 0
with gzip.open(file_name, "rt", encoding="utf-8", errors="strict") as f:
line = f.readline()
while line:
# Structure: index, page, index, page,...
line = f.readline()
yield line, f.readline(), k
k += 1
我制作了管道:
_beam_pipeline_args = [
"--runner=DirectRunner",
"--streaming",
# "--direct_num_workers=5",
# "--direct_running_mode=multi_processing",
]
with beam.Pipeline(options=PipelineOptions(_beam_pipeline_args)) as pipeline:
pipeline = (
pipeline
| "Read dump" >> beam.io.Read(CountingSource(dump_path))
| "With timestamps" >> beam.Map(lambda data: beam.window.TimestampedValue(data, data[-1]))
| "Drop timestamp" >> beam.Map(lambda data: (data[0], data[1]))
| "Process element" >> beam.ParDo(ProcessPage())
| "Filter nones" >> beam.Filter(lambda data: data != [])
# * not working, keep stuck at group - not triggering the window
| "window"
>> beam.WindowInto(
beam.window.GlobalWindows(),
trigger=beam.transforms.trigger.Repeatedly(beam.transforms.trigger.AfterCount(10)),
accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING,
)
| "Map to tuple" >> beam.Map(lambda data: (None, data))
# | "Print" >> beam.Map(lambda data: print(data))
| "Group all per window" >> beam.GroupByKey()
| "Discard key" >> beam.Values()
| "Index data" >> beam.Map(index_data)
)
如果我删除窗口并直接从“Filter nones”传递到“Index data”,管道会工作,但会单独索引元素。另外,如果取消注释打印步骤,我可以看到在“映射到元组”步骤之后我仍然有数据,但它挂在“每个窗口全部分组”上,没有任何日志。我也尝试过定时触发,将窗口更改为
>> beam.WindowInto(
beam.window.FixedWindows(10))
但这没有改变任何东西(这应该与我在数据提取上创建“计数时间戳”相同)。
我理解窗口有问题吗?目标是批量索引数据。
选择
我可以使用自定义 do.Fn 来“破解”最后一步,例如:
class BatchIndexing(beam.DoFn):
def __init__(self, connection_string, batch_size=50000):
self._connection_string = connection_string
self._batch_size = batch_size
self._total = 0
def setup(self):
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from scripts.wikipedia.wikipedia_articles.beam_module.documents import Base
engine = create_engine(self._connection_string, echo=False)
self.session = sessionmaker(bind=engine)(autocommit=False, autoflush=False)
Base.metadata.create_all(engine)
def start_bundle(self):
# buffer for string of lines
self._lines = []
def process(self, element):
# Input element is the processed pair
self._lines.append(element)
if len(self._lines) >= self._batch_size:
self._total += len(self._lines)
self._flush_batch()
def finish_bundle(self):
# takes care of the unflushed buffer before finishing
if self._lines:
self._flush_batch()
def _flush_batch(self):
self.index_data(self._lines)
# Clear the buffer.
self._lines = []
def index_data(self, entries_to_index):
"""
Index batch of data.
"""
print(f"Indexed {self._total} entries")
self.session.add_all(entries_to_index)
self.session.commit()
并将管道更改为:
with beam.Pipeline(options=PipelineOptions(_beam_pipeline_args)) as pipeline:
pipeline = (
pipeline
| "Read dump" >> beam.io.Read(CountingSource(dump_path))
| "Drop timestamp" >> beam.Map(lambda data: (data[0], data[1]))
| "Process element" >> beam.ParDo(ProcessPage())
| "Filter nones" >> beam.Filter(lambda data: data != [])
| "Unroll" >> beam.FlatMap(lambda data: data)
| "Index data" >> beam.ParDo(BatchIndexing(connection_string, batch_size=10000))
)
哪个“有效”,但并行执行最后一步(因此,压倒数据库或使用 sqlite 生成锁定数据库问题),并且我希望只有一个接收器与数据库通信。