apache_beam.transforms.util.Reshuffle() 不适用于 GCP 数据流

2024-02-18

我已通过以下方式升级到最新的 apache_beam[gcp] 包pip install --upgrade apache_beam[gcp]。然而,我注意到改组() https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L516没有出现在[gcp]分配。这是否意味着我将无法使用Reshuffle()在任何数据流管道中?有没有办法解决?或者 pip 包是否可能不是最新的,如果 Reshuffle() 在 github 上的 master 中,那么它将在数据流上可用?

根据对此的回应question https://stackoverflow.com/questions/46778848/google-cloud-dataflow-randomize-writetobigquery我正在尝试从 BigQuery 读取数据,然后将数据随机化,然后再将其写入 GCP 存储桶中的 CSV。我注意到我用来训练 GCMLE 模型的分片 .csv 并不是真正随机的。在张量流中,我可以随机化批次,但这只会随机化队列中构建的每个文件中的行,我的问题是当前生成的文件在某种程度上存在偏差。如果有任何关于在数据流中写入 CSV 之前进行洗牌的其他方法的建议,我们将不胜感激。


一种方法是自己重新创建随机播放。

import random

shuffled_data = (unshuffled_pcoll
        | 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
        | 'GroupByKey' >> GroupByKey()
        | 'RemoveRandomKeys' >> FlatMap(lambda t: t[1]))

我剩下的问题是我是否需要担心窗口或ExpandIterable部分来自code https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L497

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

apache_beam.transforms.util.Reshuffle() 不适用于 GCP 数据流 的相关文章

随机推荐