我在运行数据流作业时发现“PBegin”对象没有属性“windowing”。
我在 pardo 函数中调用 connectclass 类。
我正在尝试从 Beam python SDK 连接 NOSQL 数据库并运行 sql 从表中提取数据。然后使用另一个 pardo 将输出写入单独的文件。
class Connector(beam.DoFn):
def __init__(self,username,seeds,keyspace,password,datacenter=None):
self.username = username
self.password = password
self.seeds = seeds
self.keyspace = keyspace
self.datacenter = datacenter
super(self.__class__, self).__init__()
def process(self, element):
if datacenter:
load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=self.datacenter)
auth_provider = PlainTextAuthProvider(username=self.username, password=self.password)
cluster = Cluster(contact_points=self.seeds,
load_balancing_policy=load_balancing_policy,
auth_provider=auth_provider)
session=cluster.connect(self.seeds,self.keyspace,self.username, self.password, self.datacenter)
rows = session.execute(SQL Query)
yield rows
刚刚偶然发现了同样的问题。尝试连接到 RDBMS 源,但我想就实现设计而言,NoSQL 和 SQL 数据库之间没有区别。
除了 Jayadeep Jayaraman 的建议之外,恕我直言,这可以通过使用 ParDo 来实现。实际上,使用 ParDo 进行连接是什么梁文档推荐如果这样做的限制对于您的用例来说是可以接受的:
对于有界(批量)源,当前有两种创建 Beam 源的选项:
使用 ParDo 和 GroupByKey。
使用 Source 接口并扩展 BoundedSource 抽象子类。
ParDo 是推荐的选项,因为实现 Source 可能很棘手。请参阅何时使用>源接口,获取您可能想要使用源>>的一些用例列表(例如动态工作重新平衡)。
您没有展示您如何使用您的 DoFn。对我来说,记住 DoFn 作用于现有 PCollection 的元素是有帮助的。它本身无法从头开始创建 DoFn。所以为了解决你提到的问题,您可能希望从内存创建一个 PCollection,其中包含用于从源检索数据的查询的一个元素。然后应用从源读取的 ParDo 到此 PCollection。
顺便说一句:我为每个分区设计了一个元素,我想从 Pcollection 中的 RDBMS 中读取数据 - 这样就可以从 SQL 数据库中并行读取数据。
解决方案可能如下所示:
p | beam.Create(["Your Query / source object qualifier goes here"])
| "Read from Database" >> beam.ParDo(YourConnector())
我也提一下使用 DoFn 的 start_bundle 和 finish_bundle 方法来设置/断开连接可能是个好主意.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)