运行梁管道时,“PBegin”对象没有属性“windowing”

2023-12-03

我在运行数据流作业时发现“PBegin”对象没有属性“windowing”。 我在 pardo 函数中调用 connectclass 类。

我正在尝试从 Beam python SDK 连接 NOSQL 数据库并运行 sql 从表中提取数据。然后使用另一个 pardo 将输出写入单独的文件。

class Connector(beam.DoFn):
    def __init__(self,username,seeds,keyspace,password,datacenter=None):
    self.username = username
    self.password = password
    self.seeds = seeds
    self.keyspace = keyspace
    self.datacenter = datacenter
    super(self.__class__, self).__init__()

    def process(self, element):

    if datacenter:
        load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=self.datacenter)
    auth_provider = PlainTextAuthProvider(username=self.username, password=self.password)
    cluster = Cluster(contact_points=self.seeds,
                      load_balancing_policy=load_balancing_policy,
                      auth_provider=auth_provider)
    session=cluster.connect(self.seeds,self.keyspace,self.username, self.password, self.datacenter)
    rows = session.execute(SQL Query)
    yield rows

刚刚偶然发现了同样的问题。尝试连接到 RDBMS 源,但我想就实现设计而言,NoSQL 和 SQL 数据库之间没有区别。

除了 Jayadeep Jayaraman 的建议之外,恕我直言,这可以通过使用 ParDo 来实现。实际上,使用 ParDo 进行连接是什么梁文档推荐如果这样做的限制对于您的用例来说是可以接受的:

对于有界(批量)源,当前有两种创建 Beam 源的选项:

使用 ParDo 和 GroupByKey。

使用 Source 接口并扩展 BoundedSource 抽象子类。

ParDo 是推荐的选项,因为实现 Source 可能很棘手。请参阅何时使用>源接口,获取您可能想要使用源>>的一些用例列表(例如动态工作重新平衡)。

您没有展示您如何使用您的 DoFn。对我来说,记住 DoFn 作用于现有 PCollection 的元素是有帮助的。它本身无法从头开始创建 DoFn。所以为了解决你提到的问题,您可能希望从内存创建一个 PCollection,其中包含用于从源检索数据的查询的一个元素。然后应用从源读取的 ParDo 到此 PCollection。

顺便说一句:我为每个分区设计了一个元素,我想从 Pcollection 中的 RDBMS 中读取数据 - 这样就可以从 SQL 数据库中并行读取数据。

解决方案可能如下所示:

p | beam.Create(["Your Query / source object qualifier goes here"]) 
  | "Read from Database" >> beam.ParDo(YourConnector())

我也提一下使用 DoFn 的 start_bundle 和 finish_bundle 方法来设置/断开连接可能是个好主意.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

运行梁管道时,“PBegin”对象没有属性“windowing” 的相关文章

随机推荐

  • 如何在Python中模糊匹配数组列中的项目?

    我有一系列来自 NCAA 的球队名称 以及与其相关的统计数据 学校名称通常会被缩短或完全省略 但名称的所有变体通常都有一个共同元素 例如阿拉巴马州赤潮与赤潮 这些名称全部包含在一个数组中 没有特定的顺序 我希望能够通过模糊匹配来获取团队名称
  • 在“for”循环中读取带有空格、制表符和多个输入文件的完整行

    我看过以下文章 for 环形 它会在出现空格 制表符或换行符等空白时进行分割 为了解决这个问题 我有以下额外的命令 IFS n 但是当我尝试根据以下细节解决上述场景时 我有两个文件 输入1 txt and 输入 txt 在我当前的目录中 b
  • 在 ruamel.yaml 迭代期间获取评论

    当我迭代 YAML 对象时如何获取注释 yaml YAML with open path r as f yaml data yaml load f for obj in yaml data how to get the comments h
  • Wordpress 的 nginx 配置位于同一服务器的 Rails 应用程序的子目录中

    当我尝试访问 example com blog 时 我不断收到 文件未找到 错误 并且在 var log nginx error log FastCGI sent in stderr Primary script unknown while
  • 返回一个随机偶数

    我有以下几种方法 rnd 方法返回两个边界之间的单个随机整数 Create next batch of 55 random numbers void advance random int j1 double new random for j
  • 文件上传控制和 GWT 外观

    我正在编写我的第一个 GWT 项目 我有一个带有标签 文件上传控件和上传按钮的表单 但它们在各种网络浏览器中看起来很糟糕并且差异很大 另外 文件上传控件中的按钮与 GWT 按钮控件的样式不同 另一个问题是 在 Chrome 中 文件选择按钮
  • Android Widget 在更新期间显示奇怪的图像

    我这里有一个奇怪的 我有一台配备 Nextel 的 Motorola i1 运行 Android 1 5 我有一个安卓小部件 当我开始运行更新此小部件的服务时 我看到来自另一个小部件的图像 启动 DC Contact 它在我的小部件的位置显
  • 使用 Newton-Raphson 方法在 C 中求平方根

    在下面的代码中 我想将终止条件替换为 如果猜测平方与x的比率接近1 则while循环应该终止 我尝试了各种表达式 但没有一个能够正确运行代码 有什么建议吗 include
  • R:带有 geom_map 的 ggplot2 返回“x 和单位必须具有长度 > 0”错误,尽管值已转换为因子

    我正在开发一个原始的闪亮应用程序 它将映射来自苏格兰开放数据项目 我制定了 SPARQL 查询 用于生成类似于下面提供的摘录的数据框 dz label overall quantiles S010001 8 S010002 9 我有强化的形
  • 从 XCode 调用本地 HTML 文件时出现问题

    这是我的 WebView 代码 webView loadRequest NSURLRequest requestWithURL NSURL fileURLWithPath NSBundle mainBundle pathForResourc
  • Android SpeechRecognizer 只能在应用程序的主线程中使用

    我正在尝试将一些 Android 语音 API 集成到我的基于 AndEngine 的游戏中 我将代码放在 BaseGame 活动中 但是在运行时出现此错误 05 06 23 51 28 955 错误 AndroidRuntime 553
  • 4D 到 3D 透视投影

    我正在尝试计算 3D 世界中 4D 点的位置 我从 2D 开始 尝试将其扩展到 3D 然后再扩展到 4D 首先 我发现计算直线上二维点的投影位置很容易 Whoops there should be in the first equation
  • 如何在 Dart 中替换 unicode 转义字符

    我需要清理具有转义字符的字符串 但无法这样做 这是我的测试代码 test Replace unicode escape character String originalText Jeremiah 52 1 u201334 String r
  • 为初学者保护 PHP 表单?资源?

    我成功构建了第一个 html PHP 表单 该表单使用 POST 全局变量在多个页面之间传递变量 然后使用 mail 函数将结果通过电子邮件发送给我 我确信这种形式非常不安全 因为它现在很容易受到各种攻击 我想知道如何修补这些漏洞 但我几乎
  • Java错误路径中没有lwjgl64?

    我正在尝试制作一个游戏 它在 eclipse 中运行良好 但是当我将其导出并作为 jar 文件运行时 出现此错误 Exception in thread main java lang UnsatisfiedLinkError no lwjg
  • 这里建议API返回PARSING_ERROR

    从昨天开始 我们用于获取地点建议的 Here API 系统地返回 PARSING ERROR 状态页面在这里https status here com status说一切都很好 那可能是什么问题呢 我们的代码几周以来一直在运行 if fro
  • 无法使用字符串在 PowerShell 中设置别名

    我尝试通过运行在 PowerShell 中设置别名Set Alias Name artisan Value php aritsan 虽然命令运行成功 但是当我调用别名时 出现以下错误 artisan The term php aritsan
  • 使用 iPhone 和 iPad 的 Safari 中的传单地图问题

    我在 iPhone 和 iPad 中使用 Safari 浏览传单地图时遇到一些奇怪的问题 我正在使用 AJAX GET 请求来获取地图上的标记并在获取时绑定弹出内容 在弹出窗口中 我有一个按钮 当用户单击它以查看详细信息时 该按钮会打开引导
  • 如何使用 VoiceOver 逐段阅读 UITextView?

    我正在为盲人编写一个应用程序 并想在其主页上放置一些说明 该指令是多段静态文本 我把它放在 UITextView 中 我希望盲人用户能够使用 VoiceOver 逐段阅读说明 但是 当用户将 VoiceOver 焦点移到 UITextVie
  • 运行梁管道时,“PBegin”对象没有属性“windowing”

    我在运行数据流作业时发现 PBegin 对象没有属性 windowing 我在 pardo 函数中调用 connectclass 类 我正在尝试从 Beam python SDK 连接 NOSQL 数据库并运行 sql 从表中提取数据 然后