Spark Streaming:接收器故障后如何不重新启动接收器

2024-01-25

我们正在使用自定义 Spark 接收器,它从提供的 http 链接读取流数据。如果提供的http链接不正确,则接收失败。问题是spark会不断重启接收器,并且应用程序永远不会终止。问题是如果接收器失败,如何告诉 Spark 终止应用程序。

这是我们定制接收器的摘录:

 def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Receiver") {
      override def run() { receive() }
    }.start()
  }

  private def receive(): Unit = {
    ....
    val response: CloseableHttpResponse = httpclient.execute(req)
    try {
      val sl = response.getStatusLine()
      if (sl.getStatusCode != 200){
        val errorMsg = "Error: " + sl.getStatusCode 
        val thrw = new RuntimeException(errorMsg)
        stop(errorMsg, thrw)
      } else {
      ...
        store(doc)
      }

我们有一个使用此接收器的 Spark 流应用程序:

val ssc = new StreamingContext(sparkConf, duration)
val changes = ssc.receiverStream(new CustomReceiver(...
...
ssc.start()
ssc.awaitTermination()

如果接收器没有错误,一切都会按预期进行。如果接收器失败(例如,http 链接错误),spark 将不断重新启动它,并且应用程序将永远不会终止。

16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.

我们只想在接收器失败时终止整个应用程序。


有一种方法可以控制基于自定义接收器的 Spark-Streaming 应用程序的生命周期。为您的应用程序定义作业进度侦听器并跟踪正在发生的情况。

class CustomReceiverListener extends StreamingJobProgressListener {
    private boolean receiverStopped = false;

    public CustomReceiverListener(StreamingContext ssc) { super(ssc);}

    public boolean isReceiverStopped() {
        return receiverStopped;
    }
    @Override
    public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
        LOG.info("Update the flag field");
        this.receiverStopped = true;
    }
}

在你的驱动程序中,初始化一个线程来监视receiverStopped旗帜。当该线程完成时,驱动程序将停止流应用程序。 (更好的方法是定义由驱动程序定义的回调方法,这将停止流应用程序)。

CustomReceiverListener listener = new CustomReceiverListener(ssc);
ssc.addStreamingListener(listener);
ssc.start();
Thread thread = new Thread(() -> {
    while (!listener.isReceiverStopped()) {
        LOG.info("Sleepy head...");
        try {
            Thread.sleep(2 * 1000); /*check after 2 seconds*/
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
thread.start();
thread.join();
LOG.info("Listener asked to die! Going to commit suicide :(");
ssc.stop(true, false);

Note:如果您的接收器有多个实例,请更改以下实现CustomReceiverListener以确保所有接收器实例都已停止。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming:接收器故障后如何不重新启动接收器 的相关文章

  • PHP 错误:php_network_getaddresses:getaddrinfo 失败:(从其他站点获取信息时。)

    尝试从外部源获取信息时 我收到以下错误 Warning php network getaddresses getaddrinfo 失败 第 行名称解析暂时失败 昨天一切都很好 那么这个脚本发生了什么 它不起作用并给我上面的错误 有什么解决方
  • 将 sudo 与 Python 脚本结合使用

    我正在尝试编写一个小脚本来在每次执行脚本时安装 VirtualBox 共享文件夹 我想用Python 来做这件事 因为我正在尝试学习它来编写脚本 问题是我需要特权才能启动挂载命令 我可以将脚本作为 sudo 运行 但我更喜欢它自己创建 su
  • 给 NSWindow 一个背景图片

    好的 我已经在 Photoshop 中创建了一个图像 该图像将与我的应用程序上的按钮对齐 现在我想将其作为窗口的背景图像 以便图像上的字符将对应于我的应用程序上的按键应用程序 我一直在开发的一个小型计算器演示应用程序 基本上 我没有给按钮提
  • 多线程:只有在执行完其他方法后才调用执行方法

    我正在尝试根据要求异步处理方法 一旦第一个方法完成 只有第二个方法应该开始执行 问题是第一个方法本身具有在后台线程上运行的代码 我尝试了dispatch semaphore wait 但这也不起作用 dispatch queue t que
  • 链接“let”语句时使用“and”还是“in”更好?

    我意识到这可能是一个愚蠢的问题 但是 如果我把一堆let不需要需要了解彼此价值观的语句 使用是否更好and or in 例如 以下哪一个更可取 如果有 let a foo and b bar and c baz in etc or let
  • JBoss AS 7 部署顺序和时间安排

    我对一般部署顺序和具体时间安排有疑问 我有一个 Ear 1 它通过 bean 和一些队列提供一些功能 队列在standalone xml 中配置 另一只耳朵 2 使用 Ear1 的此服务 所以依赖关系看起来像 ear1 因此 我将ear 2
  • 将一段文本保存到mysql

    我正在使用 php 和 mysql 做一个项目 我对此很陌生 现在我必须将一段文本存储到我的数据库中 在表中 对于列 I tried varchar 5000 创建表时但它不允许我 所以请给我一个解决方案 你的 mysql 字段类型应该 T
  • .NET Web API - 添加日志记录

    我正在寻找有关处理 API 日志记录的最佳方法的帮助 我想将所有请求和响应记录到 sql 或文本文件 如果这是最好的方法 目前我已经在 SQL Server 的日志表中插入一行 我使用名为 LogAction 的静态方法来执行此操作 并在
  • 无法比较类型“ndarray(dtype=int64)”和“str”

    Example of data that I want to replace 数据具有以下属性 购买 V 高 高 中 低 维持 V 高 高 中 低 门 2 3 4 5 更多 2 4人以上 lug boot 小 中 大 安全性低 中高 这就是
  • F# 中类型约束的顺序

    这适用于 F 4 0 type Something lt a b when b gt seq lt b gt gt 这不会 type Something lt b when b gt seq lt b gt a gt 类型名称中出现意外的符
  • 将项目添加到自定义组件的布局

    我有一个习惯Footer Component我想在 QML 应用程序的不同位置重用它 Rectangle color gold height 50 anchors bottom parent bottom left parent left
  • for循环内递归函数的时间复杂度

    如果我们有一个函数 int x 0 int fun int n if n 0 return 1 for int i 0 i
  • 在 C# 中调用并排显示窗口

    愚蠢的问题是否有一种简单的方法可以清除桌面 然后打开两个资源管理器窗口并调用 并排显示窗口 任务栏调用 只是想知道 MS 库中是否有 api 可以做到这一点 您可以使用TileWindowsWinAPI 函数通过 p invoke 将所需窗
  • jQuery UI .buttonset() 太慢

    我的 HTML 页面上有几千个按钮 运行需要10多秒 buttonset buttonset 文件准备好 有没有更快的方法来做到这一点 或者是我以某种方式限制按钮数量的唯一解决方案 创建buttonset在第一次显示之前按需进行 我刚刚测试
  • 在oracle sql中创建日期差异的自定义函数,排除周末和节假日

    我需要计算两个日期之间的天数decimal 不包括周末和节假日 by 使用自定义函数在 Oracle SQL 中 网站上也有类似的问题 然而 正如我所看到的 它们都没有要求使用自定义函数将输出作为十进制 我需要小数的原因是为了之后能够使用
  • 如何在位置中使用 Nginx Regexp

    Web 项目将静态内容放入 some content img 文件夹中 url规则为 img some md5 但文件夹中的位置 content img 前两位数字 Example url example com img fe5afe048
  • 实现用户定义的算术函数

    如何添加函数 例如汉明权重 并在右侧出现的表达式中使用它是一些 is 2 goal 像 goal expansion 或 term expansion 这样的东西可以帮助这里吗 我承认这不是一个大功能 但它可以提高我的一些 Prolog 程
  • 如何从 dll 导出 C++ 类? [复制]

    这个问题在这里已经有答案了 我有一个有两个重载函数的类 如何从 dll 导出它以及如何由其他 C 类使用它 我的班级是这样的 define DECLDIREXP declspec dllexport define DECLDIRIMP de
  • 如何在 Azure 中调试测试发送?

    I am trying to setup a successfull push notifications between my Net backend and ios client in azure I followed this htt
  • MonoDevelop - 代码窗口颜色方案

    MonoDevelop 中有没有办法自定义代码窗口配色方案 我想要类似的东西黑曜石之子 http studiostyl es schemes son of obsidian如果可能的话 是的 转到 MonoDevelop 语法突出显示选项面

随机推荐

  • 在目录中查找部分字符串匹配的文件

    我有一个包含以下文件的目录 apple1 json gz apple2 json gz banana1 json gz melon1 json gz melon2 json gz 我希望找到所有的apple banana and melon
  • Python解析包含函数、列表和字典的字符串

    我正在尝试找到方法来解析可以包含用 python 语法编写的变量 函数 列表或字典的字符串 并用 分隔 空格应该在任何地方都可以使用 因此当它不在 或 内时 请用 分隔 示例字符串 variable function1 1 3 functi
  • 从分支获取最新 Git 提交哈希的命令

    如何使用命令行检查特定 Git 分支的最新提交哈希值 git log n 1 branch name branch name 可能是远程或本地分支 是可选的 没有branch name 它将显示当前分支上的最新提交 例如 git log n
  • Clang - 将 C 标头编译为 LLVM IR/位码

    假设我有以下简单的 C 头文件 foo1 h typedef int foo typedef struct foo a char const b bar bar baz foo bar 我的目标是获取这个文件 并生成一个看起来像这样的 LL
  • 树莓派-gpio (sysfs) 上的 poll() 树莓派

    正如标题所述 我在将一些用户空间中断代码从另一个 armv7 嵌入式 Linux 平台移植到 Raspberry Pi 2 Model B 时遇到问题 我知道 WiringPi 库 并让它以这种方式工作 但出于评估原因 我希望在两个平台上运
  • 无法同时处理点击和触摸事件

    我正在尝试处理按钮上的触摸事件和单击事件 我执行以下操作 button setOnClickListener clickListener button setOnTouchListener touchListener 当注册任何一个侦听器时
  • Windows 每种语言版本的默认代码页

    在哪里可以找到有关 Windows 每种语言版本的默认代码页的信息 即 ANSI 每种语言版本的代码页 我找到了Windows 支持的代码页 http msdn microsoft com en us goglobal bb964654 a
  • 构建 AOSP 并添加具有运行时权限的系统应用程序

    我在 Android 6 AOSP 上工作 我能够构建添加应用程序作为系统应用程序 但现在我想在这个系统应用程序上默认添加运行时权限 这样应用程序就可以启动而无需要求用户验证权限 你知道我该怎么做吗 谢谢你的帮助 如果您的应用程序具有特权
  • 为什么 getSelectedItem 返回 null?

    我刚刚学习 OOP 对这篇基本文章感到抱歉 我不知道为什么当我尝试获取所选项目的值时它返回 nullJComboBox public class AddEmployee extends javax swing JInternalFrame
  • 所见即所得与所见即所得

    在基于 Web 的应用程序中 哪一个更好 更理想 Edit 实际上我正在开发一个社区网站 所以用户的水平可能会有所不同 听说 WYSIWYG 编辑器存在 XSS 安全问题 我也不熟悉所见即所得编辑器及其功能 据我所知WYSIWYM 编辑器中
  • python-从json对象中选择唯一的键值

    我有一个json回复 data id 1 name Tom age 24 id 2 name Nick age 45 id 3 name Harry age 18 id 1 name Tom age 29 count 4 而且我要outpu
  • Heroku“psql:致命:剩余的连接插槽保留用于非复制超级用户连接”

    我正在 Heroku 上开发一个带有 Postgresql 后端的应用程序 当我尝试从 CLI 和在服务器上加载页面访问数据库时 我会定期收到此错误消息 psql FATAL remaining connection slots are r
  • 结构体作为 Go 映射中的键

    我正在研究使用结构作为 golang 映射中的键 该结构中的字段也应该是映射 这似乎与提供的文档相悖here http blog golang org go maps in action这表示只有具有可比较字段的结构 and 可以位于用作映
  • 什么是 weblogic.socket.Muxer?

    你们中有人了解 weblogic socket Muxer 在 WebLogic 8 1 中的用途吗 我经常在线程转储中看到类似于以下的堆栈跟踪 ExecuteThread 0 for queue weblogic socket Muxer
  • HttpContext.Current.User 始终为 null

    我有一个 WCF 服务 它有一个方法可以返回正在使用该服务的 Silverlight 客户端的 Windows 用户名 WCF 服务使用 basicHttpBinding 并将 TransportCredentialOnly 模式设置为 W
  • 与适用于 Mac 的三星智能电视 sdk 4.1 配合使用

    我有一个愚蠢的问题 我安装 2013 Samsung TV SDK 4 1 MacOS pkg 并下载 2013 Smart TV Emulator 4 1 VB zip 和虚拟盒的安装 ova 文件 所以我想终于一切都完成了 但不是 当我
  • 如何让 JTIdy 使 HTML 文档格式良好?

    我正在使用 JTidy v r938 我正在使用这段代码来尝试清理页面 final Tidy tidy new Tidy tidy setQuiet false tidy setShowWarnings true tidy setShowE
  • Rails 在集成测试中设计经过身份验证的路由

    我想测试应用程序中的每条路线 并了解到我应该在集成测试中执行此操作 在 ruby on Rails 中测试路线的位置 https stackoverflow com questions 5290245 where to test route
  • Python:如何在给定时间调用字典包含的可调用对象?

    我正在使用 python 中的字典对象 其中包含许多键 它们的一些关联值类型是可调用类型 就像是 dico key1 1 key2 cars key3
  • Spark Streaming:接收器故障后如何不重新启动接收器

    我们正在使用自定义 Spark 接收器 它从提供的 http 链接读取流数据 如果提供的http链接不正确 则接收失败 问题是spark会不断重启接收器 并且应用程序永远不会终止 问题是如果接收器失败 如何告诉 Spark 终止应用程序 这