从代码中取消 Apache Flink 作业

2023-12-28

我现在的情况是想从代码中停止/取消 flink 作业。这是在我的集成测试中,我正在向我的 flink 作业提交任务并检查结果。当作业异步运行时,即使测试失败/通过,它也不会停止。我想在考试结束后在车站工作。

我尝试了一些事情,我在下面列出:

  1. 获取工作经理演员
  2. 获取正在运行的作业
  3. 对于每个正在运行的作业,向作业管理器发送取消请求

当然,这不会运行,但我不确定 jobmanager actorref 是否错误或缺少其他内容。

我得到的错误是: [flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$] from Actor[ akka://flink/temp/$a] 到 Actor[akka://flink/user/jobmanager_1] 未交付。 [1]遇到的死信。可以使用配置设置“akka.log-dead-letters”和“akka.log-dead-letters-during-shutdown”关闭或调整此日志记录

这意味着作业管理器参与者引用错误或发送给它的消息不正确。

代码如下所示:

val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
 val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
    try {
      val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
      if(result.isInstanceOf[RunningJobsStatus]){
        val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
        val itr = runningJobs.iterator()
        while(itr.hasNext){
          val jobId = itr.next().getJobId
          val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
          try {
            Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
          }
          catch {
            case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
          }

        }
      }
    }
    catch{
      case e : Exception => "Could not retrieve running jobs from the JobManager." + e
    }

  }

有人可以检查这是否是正确的方法?

编辑 : 要完全停止作业,需要先停止Task Manager,再停止JobManager,顺序是先Task Manager,再停止JobManager。


你正在创建一个新的ActorSystem然后尝试找到一个有这个名字的演员/user/jobmanager_1在同一个演员系统中。这是行不通的,因为实际的作业管理器将在不同的环境中运行ActorSystem.

如果您想获得ActorRef对于真正的工作经理,您要么必须使用相同的ActorSystem进行选择(然后您可以使用本地地址),或者您已经找到作业管理器参与者的远程地址。远程地址的格式为akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number]。如果您有权访问FlinkMiniCluster那么你可以使用leaderGateway承诺获得现任领导的ActorGateway.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从代码中取消 Apache Flink 作业 的相关文章

随机推荐

  • 多对多 Spring Data JPA 关系中的额外列,变化最小

    我需要更改项目的模型 现在 我们有两个具有双向多对多关系的类 这意味着在关系表中 现在需要向关系添加额外的信息 我的问题是 唯一的方法是为关系创建一个类 例如 使用与已存在的关系表相同的名称创建一个类 我这么问是因为如果我们需要改变项目中的
  • 有没有办法在 Visual Studio 中自动更新已安装的 NuGet 包?

    正如标题所示 我想知道是否有一种方法可以在包源中出现新版本时自动更新已安装的 NuGet 包 该用例是一个将某些公司策略 代码分析 签名等 应用于我们的项目的包 一旦该包更新 我希望能够为此包配置自动更新 我确实知道 NuGet 有一个包恢
  • Python 列表是否保证其元素保持插入的顺序?

    如果我有以下Python代码 gt gt gt x gt gt gt x x 1 gt gt gt x x 2 gt gt gt x x 3 gt gt gt x 1 2 3 Will x保证永远是 1 2 3 或者临时元素的其他顺序是否可
  • Xpath选择多个标签

    我想要使 用 PHP DOMXPath 查询的多个标签 td 和 th 我该怎么做 您可以使用 联盟 运营商 这是一个例子 doc new DOMDocument doc gt loadHTML table tr th table head
  • 使用自动滚动向面板添加控件 (c#)

    我有一个带有属性的面板AutoScroll true 通过动态地将其他控件添加到面板而不滚动 一切正常 void addControl int top 13 this Controls Count cmdSet Height ucComma
  • 如何定义 R 函数的参数类型?

    我正在编写一个 R 函数 并且我想确保我的 R 函数的参数属于某个类 例如 矩阵 做这个的最好方式是什么 假设我有一个函数 foo 它计算矩阵的逆 foo lt function x I want to make sure x is of
  • 名称冲突的类的构造函数

    我正在使用 clang 使用 c 14 方言编译我的代码 举个例子 class x int i public x int i this gt i i void x void f class x my x Do something here
  • jboss 7.1 xalan 问题?

    我正在尝试在 JBoss7 上创建基于 Apache Jena 的应用程序 Apache Jena 使用 Xalan 2 11 0 JBoss 7 附带 2 7 1 当我尝试调用该应用程序时 出现异常 其根源是 org apache xer
  • 记录函数闭包

    例如 假设我的包中有一个函数闭包 f function x x x g function y x lt lt y h function x list g g h h l f 5 l g 10 l h 什么是正确的 在官方CRAN http
  • JFactory导入失败

    我正在尝试为 Android 应用程序制作一个登录系统 该系统可与我的 2 5 Joomla 网站一起使用 我试图通过制作一个 Joomla 插件来做到这一点 Android 应用程序将发布数据发送到 php 文件 然后该文件对用户进行身份
  • 减少 Swing 应用程序中耦合的设计模式

    大家好 我目前正在开发 Java Swing 应用程序 并且正在寻找一些指导 该应用程序相当小 但我注意到 随着代码库变得越来越大 我的对象图中存在大量耦合 我对 Swing 比较陌生 但我已经编程了足够长的时间 知道它的发展方向 我遇到的
  • Django 中间件并获取视图名称?

    我正在尝试用 Django 编写我的第一个中间件 class RefreshBalance def process view self request view func view args view kwargs pass 我想检测视图是
  • volatile int 比 AtomicInteger 快吗

    我目前正在做一个示例练习 我发现一个奇怪的观察结果 如果我用易失性程序替换 AutomicInteger 则运行速度会更快 注意 我只进行读操作 code import java util ArrayList import java uti
  • 如何访问 Backbone 视图中的父元素?

    在 Backbone 模型视图中 似乎 this el parent 不起作用 从视图中选择父元素的最佳方法是什么 我正在使用设置 eltagName li 为了景观 默认情况下 Backbone 分配一个空的div到你的视图中 你无法访问
  • 如何使用opencv python解决theta迷宫?

    I have to find shortest path from the center of the maze to the outermost circle I have to solve this problem using open
  • 检查 WHERE 子句中参数是否为 NULL

    我在执行一个存储过程时遇到了麻烦 该过程需要永远执行 它相当大 我可以理解我需要一些时间 但这个持续了将近 20 分钟 经过一些调试和研究后 我注意到替换这部分WHERE clause p DrumNo IS NULL OR T ORDER
  • 获取不同项目及其数量的列表

    我有一个对象 它有很多属性 但唯一需要担心的两个是 myobject ID这是一个int myobject Names这是一个HashSet 然后我有一个List这些对象看起来与此类似 List
  • 如何从 Jupyter Notebook 中的 .py 文件调用函数?

    我不想在每个 Jupyter Notebook 文件中编写相同的函数 如果我只需要编辑一次函数而不需要在每个 ipynb 文件中进行编辑 那就更容易了 问题是 如果我编辑 py 文件 我必须重新启动内核 这将重新启动一切 有什么方法可以简单
  • 在 XML 中保留原始换行符类型(\r 与 \r\n)

    我有一个应用程序 我想在其中使用 XML 文件来存储 1 文档的原始文本 以及 2 使用字符偏移量 指向 原始文本的多个实体 例如
  • 从代码中取消 Apache Flink 作业

    我现在的情况是想从代码中停止 取消 flink 作业 这是在我的集成测试中 我正在向我的 flink 作业提交任务并检查结果 当作业异步运行时 即使测试失败 通过 它也不会停止 我想在考试结束后在车站工作 我尝试了一些事情 我在下面列出 获