如何使用部署在 Kubernetes 上的 Flink 运行 Beam Python 管道?

2023-12-08

有谁知道当 Flink 在 Kubernetes 中作为 Pod 运行时如何使用 Flink 运行 Beam Python 管道?

我已经成功地使用 Portable runner 和指向在 Docker 容器中运行的本地 Flink 服务器的作业服务来运行 Beam Python 管道。

我能够在我的 Flink 容器中安装 Docker 套接字,并将 Flink 作为根进程运行,因此 DockerEnvironmentFactory 类可以创建 Pythonharness 容器。

不幸的是,当 Flink 在 Kubernetes 中运行时,我无法使用相同的解决方案。此外,我不想使用 Pod 中的 Docker 命令创建 Python Harness 容器。

Bean runner 似乎会自动选择 Docker 来执行 Python 管道。但是,我注意到有一个名为ExternalEnvironmentFactory的实现,但我不知道如何使用它。

有没有办法部署侧容器并使用不同的工厂来运行Pythonharness进程?正确的做法是什么?

这是 DockerEnvironmentFactory 的补丁:

diff -pr beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
*** beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java   2019-08-14 22:33:41.000000000 +0100
--- beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java 2019-09-09 16:02:07.000000000 +0100
*************** package org.apache.beam.runners.fnexecut
*** 19,24 ****
--- 19,26 ----

  import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;

+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
  import java.nio.file.Files;
  import java.nio.file.Paths;
  import java.time.Duration;
*************** public class DockerEnvironmentFactory im
*** 127,133 ****
          ImmutableList.<String>builder()
              .addAll(gcsCredentialArgs())
              // NOTE: Host networking does not work on Mac, but the command line flag is accepted.
!             .add("--network=host")
              // We need to pass on the information about Docker-on-Mac environment (due to missing
              // host networking on Mac)
              .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
--- 129,135 ----
          ImmutableList.<String>builder()
              .addAll(gcsCredentialArgs())
              // NOTE: Host networking does not work on Mac, but the command line flag is accepted.
!             .add("--network=flink")
              // We need to pass on the information about Docker-on-Mac environment (due to missing
              // host networking on Mac)
              .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
*************** public class DockerEnvironmentFactory im
*** 222,228 ****

      private static ServerFactory getServerFactory() {
        ServerFactory.UrlFactory dockerUrlFactory =
!           (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString();
        if (RUNNING_INSIDE_DOCKER_ON_MAC) {
          // If we're already running in a container, we need to use a fixed port range due to
          // non-existing host networking in Docker-for-Mac. The port range needs to be published
--- 224,230 ----

      private static ServerFactory getServerFactory() {
        ServerFactory.UrlFactory dockerUrlFactory =
!               (host, port) -> HostAndPort.fromParts(getCanonicalHostName(), port).toString();
        if (RUNNING_INSIDE_DOCKER_ON_MAC) {
          // If we're already running in a container, we need to use a fixed port range due to
          // non-existing host networking in Docker-for-Mac. The port range needs to be published
*************** public class DockerEnvironmentFactory im
*** 237,242 ****
--- 239,252 ----
      }
    }

+   private static String getCanonicalHostName() throws RuntimeException {
+     try {
+       return InetAddress.getLocalHost().getCanonicalHostName();
+     } catch (UnknownHostException e) {
+       throw new RuntimeException(e);
+     }
+   }
+
    /** Provider for DockerEnvironmentFactory. */
    public static class Provider implements EnvironmentFactory.Provider {
      private final boolean retainDockerContainer;
*************** public class DockerEnvironmentFactory im
*** 269,275 ****
      public ServerFactory getServerFactory() {
        switch (getPlatform()) {
          case LINUX:
!           return ServerFactory.createDefault();
          case MAC:
            return DockerOnMac.getServerFactory();
          default:
--- 279,286 ----
      public ServerFactory getServerFactory() {
        switch (getPlatform()) {
          case LINUX:
!           return DockerOnMac.getServerFactory();
! //          return ServerFactory.createDefault();
          case MAC:
            return DockerOnMac.getServerFactory();
          default:

这是我用来运行 Flink 的 Docker compose 文件:

version: '3.4'
services:
  jobmanager:
    image: tenx/flink:1.8.1
    command: 'jobmanager'
    environment:
      JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
      DOCKER_MAC_CONTAINER: 1
      FLINK_JM_HEAP: 128
    volumes:
      - jobmanager-data:/data
      - /var/run/docker.sock:/var/run/docker.sock
    ports:
      - target: 8081
        published: 8081
        protocol: tcp
        mode: ingress
    networks:
      - flink
  taskmanager:
    image: tenx/flink:1.8.1
    command: 'taskmanager'
    environment:
      JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
      DOCKER_MAC_CONTAINER: 1
      FLINK_TM_HEAP: 1024
      TASK_MANAGER_NUMBER_OF_TASK_SLOTS: 2
    networks:
      - flink
    volumes:
      - taskmanager-data:/data
      - /var/run/docker.sock:/var/run/docker.sock
      - /var/folders:/var/folders
volumes:
    jobmanager-data:
    taskmanager-data:
networks:
  flink:
    external: true

这是我的 Python 管道:

import apache_beam as beam
import logging

class LogElements(beam.PTransform):

    class _LoggingFn(beam.DoFn):

        def __init__(self, prefix=''):
            super(LogElements._LoggingFn, self).__init__()
            self.prefix = prefix

        def process(self, element, **kwargs):
            logging.info(self.prefix + str(element))
            yield element

    def __init__(self, label=None, prefix=''):
        super(LogElements, self).__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._LoggingFn(self.prefix))


from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"])

p = beam.Pipeline(options=options)

(p | beam.Create([1, 2, 3, 4, 5]) | LogElements())

p.run()

这就是我运行作业服务的方式:

gradle :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081

自动选择 Docker 来执行 Python 工具。

我可以更改用于运行 Python 容器的映像:

选项 = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=DOCKER", "--environment_config=beam/python:latest"])

我可以禁用 Docker 并启用ExternalEnvironmentFactory:

选项 = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=EXTERNAL", "--environment_config=server"])

但我必须实现一些回调应答http://服务器:80.

有可用的实现吗?


要回答上面的问题,基本上您需要在同一个 pod 中添加 beam_worker_pool 容器以及 flink 任务管理器容器。因此,在用于部署 flink 任务管理器的 yaml 文件中,添加一个新容器:

  - name: beam-worker-pool
    image: apache/beam_python3.7_sdk:2.22.0
    args: ["--worker_pool"]
    ports:
    - containerPort: 50000
      name: pool
    livenessProbe:
      tcpSocket:
        port: 50000
      initialDelaySeconds: 30
      periodSeconds: 60
    volumeMounts:
    - name: flink-config-volume
      mountPath: /opt/flink/conf/
    securityContext:
      runAsUser: 9999
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用部署在 Kubernetes 上的 Flink 运行 Beam Python 管道? 的相关文章

随机推荐

  • 修剪 R 中的数据,去掉“*”

    我有一个数据集 如下所示 gt data lt c IGHV1 2 02 F or IGHV1 2 03 F IGHV3 23 01 F or gt IGHV3 23 04 F IGHV2 70 01 F IGHV7 4 1 01 例如 我
  • 如何将 uint8_t 与 I/O 流一起使用,同时避免 char 行为?

    考虑这个简单的 C 程序 include
  • Python pyqt 多线程脉冲进度条

    请耐心解答我的问题 因为我是初学者 我在 pyqt 中实现进度条时遇到问题 我看到的所有示例都没有真正解释如何正确实现它 并由此而来example和这个example我在某种程度上使其工作正常 但它仍然挂起 我有这个代码 class Win
  • Javascript 使用变量通过按钮创建 url

    我的网站上有一个按钮 单击该按钮会生成一个单词 然后在 url 调用中使用该单词来下载特定文件
  • 如何获取卷 GUID

    我正在使用 win32 apiC 我想知道如何使用 设备路径 获取卷 GUID 我的设备如下所示 usb vid 04f2 pid 0111 5 39fe81e 0 2 a5dcbf10 6530 11d2 901f 00c04fb951e
  • JavaScript cookie 删除

    如果我用 Javascript 创建一个 cookiedocument cookie unseen 当我离开此页面时如何删除它 这是我在页面上创建的唯一 cookie 运行这个 document cookie unseen expires
  • PHP:Imagick:合并透明图像

    我想将透明PNG合并到另一个图像中 但是PNG的边框不会按照需要更改为透明
  • 跟踪 GA 中的主题标签和查询字符串

    我有一些贴纸 其中包含我网站的 URL 二维码 如下所示 我会把它贴在街上 正如你所看到的 如果有人阅读了这个二维码 他就会转发到http issocial net qr page 现在我想跟踪通过此二维码贴纸访问我的网站的人 遗憾的是 G
  • 在 MATLAB 中向现有矩阵添加新列?

    我有一个包含两列的矩阵 其中一列是日期 另一列是我必须执行一些操作的数量 我想在现有矩阵中添加第三列 我打算通过将第三列表示为列向量 然后将其添加到我现有的矩阵中来解决此问题 尽管我不确定如何将另一列添加到矩阵中 任何帮助将不胜感激 对于第
  • 无法激活 IDL 中定义的投影类型

    我试图在 IDL 中定义 Windows 运行时类型 并使用其投影类型 从默认生成的空白应用程序UWP 项目 称为 空白应用程序 我补充说 我的控件 idl namespace BlankApp default interface runt
  • Android推送服务,实现gcm服务器端

    我对 Android 推送世界还是个新手 几天来我一直在挣扎 我毫无问题地创建并实现了它的 GCM 客户端 我还创建了我的谷歌云项目 启用了android推送notif s并得到了我的Project Number Project ID an
  • FluentNHibernate 字典映射

    使用 Fluent NHibernate 映射简单 Dictionary 属性的最佳方法是什么 public class PersistedData public virtual IDictionary
  • 如何使用 TFS 2010 SDK 获取分支的所有未合并变更集?

    目前我有 2 个分支 开发和发布 是否可以获得从开发到发布的所有未合并变更集 目前我们使用默认的合并向导 然而它有一个很大的限制 它不能按用户过滤 因此 我正在考虑构建一个应用程序 它将所有未合并的变更集从开发拉到发布 并允许我按用户过滤这
  • Erlang VM (BEAM) 是如何构建列表的?

    当我在 Erlang 中创建列表时 例如在 Erlang shell 中 1 gt 1 2 据我了解 在虚拟机中 该列表将表示为单链表 Erlang 运行时如何创建这个结构 例如 它的构造是这样的 在内存中创建一个结构来保存终止列表的列表
  • 具有相同 GroupId 的多个 Kafka 监听器都接收消息

    我在 Spring Boot 应用程序中配置了一个 kafka 监听器 如下所示 KafkaListener topicPartitions TopicPartition topic data all partitions 0 1 2 gr
  • 如何从多个线程安全地写入套接字?

    我正在使用 asio 非升压 创建一个 TCP 服务器 虽然我的代码可以工作 但它没有正确完成 因为我正在调用asio async write来自多个线程 我think我应该使用股线 但我读得越多 我就越迷失 include
  • 如何将包含科学记数法的字符串转换为正确的 Javascript 数字格式

    我有一个字符串e g 4 874915326E7 将其转换为 JavaScript 数字格式的最佳方法是什么 整数或浮点数 如果我尝试 parseInt E最后被忽略 Edit 这个答案似乎引起了一些混乱 最初的问题是问如何将字符串形式的科
  • 判断Oracle null == null

    我希望在可为空的列上搜索数据库表 有时我要搜索的值本身就是 NULL 因为 Null 等于什么都没有 甚至是 NULL 所以说 where MYCOLUMN SEARCHVALUE 将失败 现在我必须求助于 where MYCOLUMN S
  • 如何创建充当链接的 TextView

    我有一个Textview地点 例如 加利福尼亚州山景城 我想要实现的是创建这个文本以充当链接 颜色 下划线 可聚焦性等 此链接不需要指向任何地方 周围视图已附加 onClick 侦听器 该侦听器会触发谷歌地图意图 像这样的东西应该有效 Te
  • 如何使用部署在 Kubernetes 上的 Flink 运行 Beam Python 管道?

    有谁知道当 Flink 在 Kubernetes 中作为 Pod 运行时如何使用 Flink 运行 Beam Python 管道 我已经成功地使用 Portable runner 和指向在 Docker 容器中运行的本地 Flink 服务器