SlidingWindows Python Apache Beam 复制数据

2024-06-24

问题

每次系统从带有滑动窗口的 pubsub 收到一条消息时,它都会被复制


The code

 | 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))    
 | 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
 | 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

输出

如果我只从 pub/sub 发送一条消息,并尝试在滑动窗口完成后使用代码打印我所拥有的内容:

class print_row2(beam.DoFn):
    def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
        print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))

结果

('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000

如果我之前打印消息'window' >> beam.WindowInto(window.SlidingWindows(30, 15))我只得到一次


过程在“图形模式下:

  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |=X===========|       :       :
  w2:               |==============|       :
  ...

消息 X 在滑动窗口开始时只发送了一次,它应该只接收一次,但已经接收了两次

我尝试过使用 AccumulationMode 值和触发器=AftyerWatermark,但我无法解决问题。

可能出什么问题了?


Extra

对于固定 Windows,这是适合我的目的的正确代码:

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())

or

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

属于该窗口的所有元素都会被发出。如果一个元素属于多个窗口,它将在每个窗口中发出。

仅当您计划处理延迟数据/多次触发器触发时,累积模式才有意义。在这种情况下,当触发器再次触发时,丢弃模式只会在窗口中提供新元素,即仅发出自上次触发器触发以来到达同一窗口的元素,已发出的元素不会再次发出并被丢弃。在累积模式下,每次触发触发都会发出整个窗口,它将包括上次已发出的旧元素以及此后到达的新元素。

如果我理解你的例子,你有滑动窗口,它们的长度为 30 秒,每 15 秒启动一次。所以它们重叠了 15 秒:

  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |=============|       :       :
  w2:               |==============|       :
  w3:                      |===============|
  ...

因此,您的情况下的任何元素都将属于至少两个窗口(第一个和最后一个窗口除外)。

例如。在您的示例中,如果您的消息是在 17:07:15 和 17:07:30 之间发送的,它将出现在两个窗口中。

固定窗口不重叠,因此元素只能属于一个窗口:

  time: ----t+00---t+15---t+30----t+45----t+60------>
             :             :               :
  w1:        |=============|               :
  w2:                      |===============|
  w3:                                      |====...
  ...

有关 Windows 的更多信息请参见此处:https://beam.apache.org/documentation/programming-guide/#windowing https://beam.apache.org/documentation/programming-guide/#windowing

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SlidingWindows Python Apache Beam 复制数据 的相关文章

  • 将列表转换为字符串并返回

    我有一个虚拟机 它从嵌套在列表中的元组读取指令 如下所示 0 4738 0 36 0 6376 0 0 存储这种机器代码程序时 文本文件是最简单的 必须写成字符串 这显然很难转换回来 是否有任何模块可以将字符串读入列表 以可读的方式存储列表
  • async for 的语义等价物

    从相对于的文档async for根据 Python 3 5 中的语法 我发现引入它是为了迭代可等待的迭代器 不过 在描述之后的语义等价物中 我没有得到一些东西 iter ITER iter type iter aiter iter runn
  • Django 3.1:带有异步生成器的 StreamingHttpResponse

    Django 3 1 文档 https docs djangoproject com en 3 1 topics async async views 7E text The 20main 20benefits 20are 20the 20a
  • Python 元组列表到 int 列表

    所以我有x 12 1 3 元组列表 我想要x 12 1 3 整数列表 以最好的方式可能 你能帮忙吗 你没有说 最好 是什么意思 但大概你的意思是 最Pythonic 或 最易读 或类似的东西 F3AR3DLEGEND 给出的列表理解可能是最
  • Python 中的双 SSH 隧道

    今天 我在命令行中使用 ssh 将端口从远程服务器转发 使用中间服务器到我的本地计算机 这是我在 shell 中使用的命令 ssh user remote server L 2443 localhost 433 此 ssh 会话使用 ssh
  • Python 文件打开并立即关闭

    尝试首先通过 powershell 然后通过 cmd 运行此代码 甚至简单地单击它 我正在输入 start python myfile py 来运行它 在每种情况下 文件都会在屏幕上闪烁并立即关闭 我查看它的唯一方法是将文件直接拖到cmd中
  • 在 scipy 中按稀疏矩阵分组并返回一个矩阵

    关于使用 SO 处理有几个问题groupby与稀疏矩阵 然而输出似乎是列表 字典 https stackoverflow com questions 35410839 group by on scipy sparse matrix 数据框
  • 使用 Matplotlib 和 TeX 实现均匀间距

    我正在为数学课绘制一些图表 但我无法在绘图图例中正确地获得和平定义的间距 我目前正在使用 对于 TeX 中的单个空间 但会遇到一种情况 其中一个空间比另一个空间稍远 这可能是由于左边的方程占用了多少空间 这是我的代码 import matp
  • 如何根据父字段的值限制子字段的选择?

    我有几个相当复杂的表单 它们依赖于模型层次结构来分配ForeignKey价值观 我需要根据其父字段的值限制子字段可用的选择 问题是父字段的值可能来自初始数据 GET 请求 or来自绑定数据 POST 请求 因此 我最终多次复制了这种模式 c
  • 调整 MLPRegressor 超参数

    我一直在尝试调整 MLP 模型的超参数来解决回归问题 但总是收到收敛警告 这是我的代码 def mlp model X Y estimator MLPRegressor param grid hidden layer sizes 50 50
  • Python字典键(类对象)与多个比较器的比较

    我使用自定义对象作为 python 字典中的键 这些对象有一些默认值hash and eq定义的方法用于默认比较 但在某些功能中我需要使用不同的方式来比较这些对象 那么有什么方法可以覆盖或传递一个新的比较器来仅针对该特定函数进行这些关键比较
  • 如何在条形图上添加值标签

    我正在创建一个条形图 但我不知道如何在条形图上添加值标签 在条形图的中心或正上方 我相信解决方案是使用 文本 或 注释 但我 a 不知道该使用哪一个 一般来说 还没有弄清楚何时使用哪一个 b 无法看到任何一个来呈现值标签 这是我的代码 im
  • 如何在 pywebview 中使无框窗口可拖动?

    我最近一直在使用 pywebview 和 Flask 来开发桌面应用程序 我想使用无框窗口功能并创建自己的标题栏 但问题是我不知道如何使该无框窗口可拖动 文档说它可以通过拖动任何点来移动 但对我来说情况并非如此 有任何想法吗 拖动区域 ht
  • 命令错误,退出状态 1: python setup.py Egg_info 检查日志以获取完整的命令输出 - 通过 pip 安装 auto-py-to-exe 时

    我正在尝试在与我通常通过 pip 使用不同的 Windows 设备上下载 auto py to exe 但是 当运行时 我收到错误 抱歉 它太长了 ERROR Command errored out with exit status 1 c
  • RuntimeError:期望后端 CUDA 对象,但获得后端 CPU 作为参数:ret = torch.addmm(torch.jit._unwrap_Optional(bias), input, Weight.t())

    当 的时候forward我的神经网络的功能 训练阶段完成后 正在执行 我正在经历RuntimeError Expected object of backend CUDA but got backend CPU for argument 4
  • py.test 日志控制

    我们最近切换到 py test 进行 python 测试 顺便说一句 这非常棒 但是 我试图弄清楚如何控制日志输出 即内置的 python 日志记录模块 我们安装了 pytest capturelog 并且它按预期工作 当我们想查看日志时
  • pandas.algos._return_false 在 CentOS 上使用 dill.dump_session 导致 PicklingError

    我有一个代码框架 其中涉及使用 dill 转储会话 这曾经工作得很好 直到我开始使用 pandas 以下代码在 CentOS 6 5 版本上引发 PicklingError import pandas import dill dill du
  • 为什么我不能将 addstr() 添加到 pythoncurses 窗口中的最后一行/列?

    使用Python 我尝试使用addstr 将光标位置写入curses 窗口的右下角 但出现错误 ScreenH 2工作正常 但打印在窗口底部的第二行 ScreenH 1根本不起作用 我究竟做错了什么 import curses Screen
  • Scrapy在使用crawlerprocess运行时抛出错误

    我用 python 编写了一个脚本 使用 scrapy 来收集网站上不同帖子的名称及其链接 当我从命令行执行脚本时 它可以完美地工作 现在 我的意图是使用运行脚本CrawlerProcess 我在不同的地方寻找类似的问题 但我找不到任何直接
  • 从 pexpect 中提取 stderr

    我的问题很简单 我可以吗 expect 使用 pexpect 查看 stderr 上的某些输出 它似乎pexpect spawn 只能用于期望 stdout 上的输出 乌托邦的例子 import pexpect child pexpect

随机推荐

  • 如何在 C# 中使用 Int64

    问题很简单 在 C 中如何表示 64 位 int 64位int很长
  • Keras 预测给出的误差与评估不同,损失与指标不同

    我有以下问题 我在 Keras 中有一个自动编码器 并对其进行了几个时期的训练 训练概览显示验证 MAE 为 0 0422 MSE 为 0 0024 但是 如果我随后调用 network predict 并手动计算验证错误 我会得到 0 0
  • 对象不支持此操作 IE9 与 CustomEvent 初始化

    我在 IE9 中遇到以下错误 对象不支持此操作 关于此有各种各样的问题 但我的问题专门针对以下代码 var myEvent new CustomEvent additem 据我了解 CustomEvent在 IE9 中支持作为 DOM 操作
  • 如何重命名对象 boto3 S3?

    我在 S3 中有大约 1000 个对象 它们以 abcyearmonthday1 abcyearmonthday2 abcyearmonthday3 想要将它们重命名为 abc year month day 1 abc year month
  • 在删除属性之前,必须删除或重新定义所有包含外键的内容 - EF Core

    使用实体框架核心添加外键后尝试添加迁移时出现上述错误 我正在添加FK public class ApplicantDetail Key public int Id get set ForeignKey GrantProgramFK publ
  • 如何在 JSF 中创建现有组件的组合?

    我想知道是否可以编写我自己的组件 或称其为 Widget Object 我的意思是 而不是 例如 使用h panelGroup and a h outputLabel在里面 做我自己的h panelMarkzzz 作为 panelGroup
  • Firefox 中隐藏滚动条

    我想在页面中隐藏滚动条 但我可以像它有滚动条一样滚动 所以我不能使用溢出 隐藏因为我希望我可以像正常一样滚动 但看不到滚动条 所以我使用这个css代码 类not scroll body是一类body标签 not scroll body we
  • 错误 '_' 未定义 no-undef

    我使用 eslint 检查我的代码 并发生错误 错误 未定义 no undef 我这样写代码 new webpack ProvidePlugin jquery jquery jQuery jquery window jQuery jquer
  • MySQL 获取 ORDER BY 中的行位置

    具有以下 MySQL 表 id INT UNSIGNED name VARCHAR 100 我怎样才能选择一个single行及其在表中其他行中的位置 排序时 name ASC 因此 如果表数据按名称排序时如下所示 id name 5 Alp
  • 如何用相同的方法“包装”两个类?

    我必须使用相同的方法处理两个类 但它们不实现相同的接口 也不扩展相同的超类 我无法 不允许更改此类 并且我不构造此类的实例 我只获取此类的对象 避免大量代码重复的最佳方法是什么 班级之一 package faa public class S
  • 如何解释此故障转储

    我们在 iis 中托管的特定网站上遇到了问题 我无法从事件日志中获取太多信息 我对这些低水平的 原始 诊断工具有点不适应 如果我找错了树 在这种情况下请告诉我 例如 IIS 刚刚损坏 或者我是否 我按照正确的路径尝试定位问题 为应用程序池
  • NativeScript + Vue.js + FontAwesome

    我正在尝试使用 FontAwesome 图标集通过 NativeScript 和 Vue js 构建应用程序 但我无法找出问题 因为我什至没有错误提示消息 我忠实地关注文档 但没有任何反应 我到处寻找 但什么也没有 如果你能帮我解决这个问题
  • java应用程序中与oracle的连接

    我已经下载了 oracle express 11g 版本并安装了它 现在我想从 java 应用程序连接它 这是我的连接代码 Class forName oracle jdbc driver OracleDriver newInstance
  • 在 Android @drawable 中查找图像的主色

    如果您使用 Windows 7 您就会明白为什么我要尝试查找图像中的主色 当您将鼠标悬停在任务栏中的某个程序上时 该特定程序的背景会根据图标中的主色发生变化 我注意到其他程序中也使用了这种技术 但我一时想不起来 我可以看到这对我用来开发应用
  • 通过 TCP 查看 h264 流

    我有一个用于无人机的小型基于 wifi 的 FPV 相机 我已经成功地使用 python 下载并保存 h264 文件 TCP IP 193 168 0 1 TCP PORT 6200 BUFFER SIZE 2056 f open stre
  • 访问基模板函数中派生类的成员函数

    我有一堂课叫DBDriver处理与数据库中给定表的通信 它的公共入口点是一个名为的函数模板execute query 它执行 SELECT 查询 调用此函数后 将执行一些数据库逻辑 然后用结果填充提供的容器 模板类型 这看起来像下面这样 c
  • 如何使用终端打开“-”虚线文件名?

    我尝试了 gedit nano vi leafpad 和其他文本编辑器 它无法打开 我尝试了 cat 和其他文件查找命令 我向你保证这是一个文件而不是目录 这种方法有很多误解 因为使用 作为参数指的是标准输入 标准输出 i e 开发 标准输
  • Angular CORS 简单请求通过 POST 中的授权标头触发预检

    根据文档 对于简单的请求 预检不应该发生 https developer mozilla org en docs Web HTTP Access control CORS https developer mozilla org en doc
  • PHPWord - 获取页数?

    我使用 PHPWord 的模板解析器来制作文档 然后使用命令行自动打印该文档 该文件必须在单页上 因为它是证书 并且要打印在特殊纸张上 我添加了人们的名字 这样名字较长的人就可以将一行放到两行上 然后将所有内容推到第二页上 PHPWord
  • SlidingWindows Python Apache Beam 复制数据

    问题 每次系统从带有滑动窗口的 pubsub 收到一条消息时 它都会被复制 The code Parse dictionary gt gt beam Map lambda elem elem Serial int elem Value wi