Dagster 循环实体的输出和并发处理

2024-04-07

我有一个由两个固体组成的 Dagster 管道(下面是可重现的示例)。首先 (return_some_list)输出一些对象的列表。第二个固体(print_num) 接受第一个列表(不是完整列表)中的元素,并对该元素进行一些处理。

我该如何为第一个实体返回的列表中的每个元素调用第二个实体?还请解释任何最佳实践。

不确定这是否是最好的方法(让我知道),但我想生成一个不同的实体实例print_num对于第一个实体输出的每个元素。这将帮助我将来并行化实体并更好地处理长/计算密集型实体。

from dagster import execute_pipeline, pipeline, solid

@solid
def return_some_list(context):
    return [1,2,3,4,5]

@solid
def print_num(context, some_num: int):
    print(some_num)
    return some_num


@pipeline
def some_pipeline():
    output_list = return_some_list()
    for some_num in output_list:
        print_num(some_num)

if __name__ == "__main__":
    result = execute_pipeline(some_pipeline)

事实证明,有一个实验性功能(希望将成为正式的),允许基于可迭代输出的元素创建任务。工作代码如下:

from dagster import execute_pipeline, pipeline, solid, Output, OutputDefinition
from dagster.experimental import DynamicOutput, DynamicOutputDefinition
from typing import List


@solid
def return_some_list(context):
    return [1, 2, 3, 4, 5]


@solid(output_defs=[DynamicOutputDefinition(int)])
def generate_subtasks(context, nums: List[int]):
    context.log.info(str(nums))
    for num in nums:
        yield DynamicOutput(num, mapping_key=f'subtask_{num}')


@solid
def print_num(context, some_num: int):
    context.log.info(str(some_num))
    return some_num


@pipeline
def some_pipeline():
    output_list = return_some_list()
    generate_subtasks(output_list).map(print_num)


if __name__ == "__main__":
    result = execute_pipeline(some_pipeline)

Here, return_some_list返回一个可迭代对象。我们希望为这个可迭代的每个元素运行一个实体。我们在固体中这样做generate_subtasks,这会产生一个DynamicOutput包含元素以及为其生成的子任务的名称。这DynamicOutput的类型信息在DynamicOutputDefinition in the solid规格。

为了连接这些实体,我们首先通过以下方式获取列表return_some_list。然后打电话generate_subtasks,这是一个生成器,并且map到它的每个输出print_num功能。

运行整个管道应该为生成的每个子任务打印大量信息generate_subtasks,看起来像这样(仅显示部分输出):

2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_SUCCESS - Finished execution of step "print_num[subtask_4]" in 2.1ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_START - Started execution of step "print_num[subtask_5]".
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - LOADED_INPUT - Loaded input "some_num" using input manager "io_manager", from output "result" of step "test"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_INPUT - Got input "some_num" of type "Int". (Type check passed).
2021-03-13 21:27:53 - dagster - INFO - system - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - print_num[subtask_5] - 5
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_SUCCESS - Finished execution of step "print_num[subtask_5]" in 1.98ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - ENGINE_EVENT - Finished steps in process (pid: 33738) in 44ms
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - PIPELINE_SUCCESS - Finished execution of pipeline "some_pipeline".

哦,还有一件很酷的事情:Dagster 执行类型检查,如果你给它输入错误的参数,它会很快失败。所以,如果我们要供应print_str,比如说,到map功能,它甚至会拒绝运行。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Dagster 循环实体的输出和并发处理 的相关文章

随机推荐

  • 为具有 # 次浏览次数的单页网站启用 Google 分析

    我读过类似的问题 但我的问题略有不同 我正在使用 Kendo UI 为网站实现单页注册处理页面 该网站有 4 个页面 当用户单击菜单选项卡时动态生成 例如 当用户单击菜单上的 tab1 时 则tab 1将被注入app container容器
  • 包含原始数据的头文件的用途是什么?

    决定检查一些除了我自己的代码之外的代码 Quake I https github com id Software Quake就是选择 我点击的第一个文件只包含原始数据 唯一的注释是 GP L 我猜它是一个包含法向量的数组 不管它的目的是什么
  • java jaxb简单解析需要@XmlAccessorType(XmlAccessType.FIELD)注释

    我正在尝试将 xml 解析为 java 对象 我已阅读并实现了以下教程 http www vogella com articles JAXB article html http www vogella com articles JAXB a
  • PHP/MySQL:从数据库中选择靠近给定位置的位置

    在 PHP 中 我有以下代码用于计算两个位置之间的距离
  • 如何在 Firefox WebExtensions 附加组件中使用 chrome.storage 和 runtime.connect

    我正在尝试创建一个 Firefox 插件来执行以下操作 单击页面主体时向后台脚本发送消息 发送的消息由后台脚本存储 后台脚本检索存储的消息 单击加载项浏览器按钮时 存储的消息将从后台脚本发送到内容脚本 内容脚本显示收到的消息 我正在使用适用
  • 使用 xml2js 在 Angular 6 中未定义 parseString

    到目前为止 我无法使用 Angular 6 让 xml2js 代码正常工作 我有一个 service ts 返回 xml 但是当调用 xml2js parseString 转换为 json 时 我始终收到未定义的错误 core js 167
  • 这个排序算法的名称是什么?

    for int i 0 i
  • 如何阅读linkedin收件箱消息[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我对 linkedin api 进行了大量研究 以了解如何通过Api https developer linkedin com res
  • RecyclerView 正在切断最后一项

    我有一个带有工具栏和里面有 recyclerView 的片段 我正在用虚拟数据填充 recyclerView 然后尝试显示它们 由于某种原因 recyclerView 的最后一个元素被截断 这是该片段的 XML
  • Python 中的鲍勃计数器

    使用 Python 2 7 我试图计算短语 bobbbobobboobobookobobbobbboj 中 bob 出现的次数 为此 我编写了以下代码 b 0 string bobbbobobboobobookobobbobbboj str
  • 解构对象作为函数参数

    不太明白这个参数const Posts以下 我对 Node React 还很陌生 它是解构的参数对象吗 或者它只是作为参数传递的对象 getPosts 和 post 显示为未定义 但我不明白参数对象是从哪里传递到函数中的 完整代码在这里 h
  • Visual Studio 2010:向 2008 生成的 wsdl 添加服务引用

    不生成 app config 在我的团队中有一个人拥有 Visual Studio 2008 他创建了一个 Web 服务 然后我将这个 Web 服务添加到控制台项目中 添加服务引用没有问题 但不会生成有效的 app config 只不过是空
  • 在 R 中对非常小的值使用舍入函数返回零

    有时我必须处理非常低的 p 值并以表格格式呈现它们 R 返回的值可以具有很长的有效数字 即小数点后的数字 现在 由于 p 值无论如何都很低 我倾向于在将它们写入 xls 或 tsv 文件之前缩短它们 只是为了使表格看起来漂亮 我在用R ve
  • docker python 自定义模块未找到

    我是 docker 新手 正在尝试将一个简单的应用程序移至 docker 我可以使用 pip install 导入Python标准模块 但是 我有一些想要使用的自定义 python 实用程序文件 这些文件位于单独的包 utils 中 在我的
  • 无限循环使 TimeManager 失效

    我在 WPF 应用程序中遇到了一个非常棘手的缺陷 需要追踪 错误信息是 无限循环似乎是由重复导致的 在布局 渲染过程中使 TimeManager 无效 堆栈跟踪 就其价值而言 是 在 System Windows Media MediaCo
  • TortoiseSVN无法验证

    在我之前的问题之后 TortoiseSVN无法连接 https stackoverflow com questions 11820835 tortoisesvn cant connect解决了 我又遇到了新的问题 在托管我的 svn 存储库
  • 当我尝试在 Windows PC 上安装 PySide2 时,为什么总是出现错误?

    我一直在尝试安装PySide2在我的电脑上 Windows 10 64bits with Python 3 8已安装 但每次都出现错误 我使用了命令pip install PySide2 它对我不起作用 任何帮助将不胜感激 Error ER
  • Linux 如何知道何时为调用堆栈分配更多页面?

    鉴于以下程序 segfault 顾名思义 将通过访问堆栈下方的 256k 来使程序出现段错误 nofault 然而 逐渐将堆栈推到下方 1m 但永远不会出现段错误 此外 运行segfault after nofault 也不会导致错误 如果
  • Sphinx 文档中使用全局变量标准化链接

    我正在使用 Sphinx 来记录工作项目 我想在整个文档的多个页面上使用指向下载的相同链接 例如 home rst Hi I want you to download my project download blah com downloa
  • Dagster 循环实体的输出和并发处理

    我有一个由两个固体组成的 Dagster 管道 下面是可重现的示例 首先 return some list 输出一些对象的列表 第二个固体 print num 接受第一个列表 不是完整列表 中的元素 并对该元素进行一些处理 我该如何为第一个