如何广播 DataFrame?

2023-12-24

我使用的是spark-sql-2.4.1版本。 创建一个广播变量,如下所示

Broadcast<Map<String,Dataset>> bcVariable = javaSparkContext.broadcast(//read dataset);

我将 bcVariable 传递给函数

Service.calculateFunction(sparkSession, bcVariable.getValue());


public   static class Service {
        public static calculateFunction(
          SparkSession sparkSession,
          Map<String, Dataset> dataSet ) {

        System.out.println("---> size : " + dataSet.size());  //printing size 1


        for( Entry<String, Dataset> aEntry : dataSet.entrySet() ) {
           System.out.println( aEntry.getKey());   //  printing key 
            aEntry.getValue().show()   // throw null pointer exception
           }
    }

这里出了什么问题?如何在函数中传递数据集/数据帧?

Try 2 :

Broadcast<Dataset> bcVariable = javaSparkContext.broadcast(//read dataset);

我将 bcVariable 传递给函数

 Service.calculateFunction(sparkSession, bcVariable.getValue());

公共静态类服务{ 公共静态计算函数( SparkSession SparkSession, 数据集 数​​据集 ) {

    System.out.println("---> size : " + dataSet.size());  // throwing null pointer exception.



}

这里出了什么问题?如何在函数中传递数据集/数据帧?

Try 3 :

Dataset metaData = //read dataset from oracle table i.e. meta-data.

我将元数据传递给函数

Service.calculateFunction(sparkSession, 元数据 );

public   static class Service {
        public static calculateFunction(
          SparkSession sparkSession,
          Dataset metaData ) {

        System.out.println("---> size : " + metaData.size());  // throwing null pointer exception.



    }

这里出了什么问题?如何在函数中传递数据集/数据帧?


要广播的值必须是任何 Scala 对象,但不能是DataFrame.

Service.calculateFunction(sparkSession, metaData)在执行器上执行,因此元数据是null(因为它没有被序列化并通过线路从驱动程序发送到执行程序)。

广播[T](值:T):广播[T]

向集群广播一个只读变量,返回一个org.apache.spark.broadcast.Broadcast http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.broadcast.Broadcast用于在分布式函数中读取它的对象。该变量将仅发送到每个集群一次。

考虑到DataFrame数据抽象来表示用类似 SQL 的语言(数据集 API 或 SQL)描述的分布式计算。除了可以提交计算以执行的驱动程序(作为执行程序上的任务)之外,将其放在任何地方都没有任何意义。

您只需“转换”此计算代表的数据(在DataFrame术语)使用DataFrame.collect.

收集数据后,您可以广播它并使用.value method.


代码如下所示:

val dataset = // reading data
Broadcast<Map<String,Dataset>> bcVariable = 
  javaSparkContext.broadcast(dataset.collect);
Service.calculateFunction(sparkSession, bcVariable.getValue());

与您的代码相比唯一的变化是collect.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何广播 DataFrame? 的相关文章

随机推荐