如何更新 Spark Streaming 中的广播变量?

2023-11-27

我相信 Spark Streaming 有一个相对常见的用例:

我有一个对象流,我想根据一些参考数据来过滤它们

最初,我认为使用广播变量:

public void startSparkEngine {
    Broadcast<ReferenceData> refdataBroadcast
      = sparkContext.broadcast(getRefData());

    final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
        final ReferenceData refData = refdataBroadcast.getValue();
        return obj.getField().equals(refData.getField());
    }

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
            // Final processing of filtered objects
        });
        return null;
    });
}

然而,尽管很少见,我的参考数据会定期更改

我的印象是我可以修改并重播我在驱动程序上的变量,它将传播到每个工作人员,但是Broadcast对象不是Serializable并且需要final.

我还有哪些选择?我能想到的三个解决方案是:

  1. 将参考数据查找移动到forEachPartition or forEachRdd这样它就完全落在工人身上了。然而,参考数据存在于 REST API 中,因此我还需要以某种方式存储计时器/计数器,以停止对流中的每个元素进行远程访问。

  2. 每次 refdata 更改时,使用新的广播变量重新启动 Spark 上下文。

  3. 将参考数据转换为RDD, then join以我现在正在流式传输的方式进行流式传输Pair<MyObject, RefData>,尽管这会将参考数据与每个对象一起发送。


扩展答案@Rohan Aletti。下面是 BroadcastWrapper 的示例代码,它根据某些 ttl 刷新广播变量

public class BroadcastWrapper {

    private Broadcast<ReferenceData> broadcastVar;
    private Date lastUpdatedAt = Calendar.getInstance().getTime();

    private static BroadcastWrapper obj = new BroadcastWrapper();

    private BroadcastWrapper(){}

    public static BroadcastWrapper getInstance() {
        return obj;
    }

    public JavaSparkContext getSparkContext(SparkContext sc) {
       JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
       return jsc;
    }

    public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
        Date currentDate = Calendar.getInstance().getTime();
        long diff = currentDate.getTime()-lastUpdatedAt.getTime();
        if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
            if (var != null)
               var.unpersist();
            lastUpdatedAt = new Date(System.currentTimeMillis());

            //Your logic to refresh
            ReferenceData data = getRefData();

            var = getSparkContext(sparkContext).broadcast(data);
       }
       return var;
   }
}

你的代码看起来像:

public void startSparkEngine() {

    final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
        Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

        stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
    });

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
        // Final processing of filtered objects
        });
        return null;
    });
}

这在多集群上也对我有用。 希望这可以帮助

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何更新 Spark Streaming 中的广播变量? 的相关文章

随机推荐