我们正在使用自定义 Spark 接收器,它从提供的 http 链接读取流数据。如果提供的http链接不正确,则接收失败。问题是spark会不断重启接收器,并且应用程序永远不会终止。问题是如果接收器失败,如何告诉 Spark 终止应用程序。
这是我们定制接收器的摘录:
def onStart() {
// Start the thread that receives data over a connection
new Thread("Receiver") {
override def run() { receive() }
}.start()
}
private def receive(): Unit = {
....
val response: CloseableHttpResponse = httpclient.execute(req)
try {
val sl = response.getStatusLine()
if (sl.getStatusCode != 200){
val errorMsg = "Error: " + sl.getStatusCode
val thrw = new RuntimeException(errorMsg)
stop(errorMsg, thrw)
} else {
...
store(doc)
}
我们有一个使用此接收器的 Spark 流应用程序:
val ssc = new StreamingContext(sparkConf, duration)
val changes = ssc.receiverStream(new CustomReceiver(...
...
ssc.start()
ssc.awaitTermination()
如果接收器没有错误,一切都会按预期进行。如果接收器失败(例如,http 链接错误),spark 将不断重新启动它,并且应用程序将永远不会终止。
16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.
我们只想在接收器失败时终止整个应用程序。
有一种方法可以控制基于自定义接收器的 Spark-Streaming 应用程序的生命周期。为您的应用程序定义作业进度侦听器并跟踪正在发生的情况。
class CustomReceiverListener extends StreamingJobProgressListener {
private boolean receiverStopped = false;
public CustomReceiverListener(StreamingContext ssc) { super(ssc);}
public boolean isReceiverStopped() {
return receiverStopped;
}
@Override
public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
LOG.info("Update the flag field");
this.receiverStopped = true;
}
}
在你的驱动程序中,初始化一个线程来监视receiverStopped
旗帜。当该线程完成时,驱动程序将停止流应用程序。 (更好的方法是定义由驱动程序定义的回调方法,这将停止流应用程序)。
CustomReceiverListener listener = new CustomReceiverListener(ssc);
ssc.addStreamingListener(listener);
ssc.start();
Thread thread = new Thread(() -> {
while (!listener.isReceiverStopped()) {
LOG.info("Sleepy head...");
try {
Thread.sleep(2 * 1000); /*check after 2 seconds*/
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
thread.join();
LOG.info("Listener asked to die! Going to commit suicide :(");
ssc.stop(true, false);
Note:如果您的接收器有多个实例,请更改以下实现CustomReceiverListener
以确保所有接收器实例都已停止。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)