Spark 广播变量 Map 给出 null 值

2023-12-27

我正在使用 java8 和 Spark v2.4.1。

我正在尝试使用广播变量Map查找使用如下所示:

输入数据:

+-----+-----+-----+
|code1|code2|code3|
+-----+-----+-----+
|1    |7    |  5  |
|2    |7    |  4  |
|3    |7    |  3  |
|4    |7    |  2  |
|5    |7    |  1  |
+-----+-----+-----+

预期输出:

+-----+-----+-----+
|code1|code2|code3|
+-----+-----+-----+
|1    |7    |51   |
|2    |7    |41   |
|3    |7    |31   |
|4    |7    |21   |
|5    |7    |11   |
+-----+-----+-----+

我当前的代码具有我尝试过的不同解决方案:

Map<Integer,Integer> lookup_map= new HashMap<>();
lookup_map.put(1,11);
lookup_map.put(2,21);
lookup_map.put(3,31);
lookup_map.put(4,41);
lookup_map.put(5,51);

JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
Broadcast<Map<Integer,Integer>> lookup_mapBcVar = javaSparkContext.broadcast(lookup_map);

Dataset<Row> resultDs= dataDs
  .withColumn("floor_code3", floor(col("code3")))
  .withColumn("floor_code3_int", floor(col("code3")).cast(DataTypes.IntegerType))
  .withColumn("map_code3", lit(((Map<Integer, Integer>)lookup_mapBcVar.getValue()).get(col("floor_code3_int"))))
  .withColumn("five", lit(((Map<Integer, Integer>)lookup_mapBcVar.getValue()).get(5)))
  .withColumn("five_lit", lit(((Map<Integer, Integer>)lookup_mapBcVar.getValue()).get(lit(5).cast(DataTypes.IntegerType))));

当前代码的输出使用:

resultDs.printSchema();                       
resultDs.show();
            
root
 |-- code1: integer (nullable = true)
 |-- code2: integer (nullable = true)
 |-- code3: double (nullable = true)
 |-- floor_code3: long (nullable = true)
 |-- floor_code3_int: integer (nullable = true)
 |-- map_code3: null (nullable = true)
 |-- five: integer (nullable = false)
 |-- five_lit: null (nullable = true)

+-----+-----+-----+-----------+---------------+---------+----+--------+
|code1|code2|code3|floor_code3|floor_code3_int|map_code3|five|five_lit|
+-----+-----+-----+-----------+---------------+---------+----+--------+
|    1|    7|  5.0|          5|              5|     null|  51|    null|
|    2|    7|  4.0|          4|              4|     null|  51|    null|
|    3|    7|  3.0|          3|              3|     null|  51|    null|
|    4|    7|  2.0|          2|              2|     null|  51|    null|
|    5|    7|  1.0|          1|              1|     null|  51|    null|
+-----+-----+-----+-----------+---------------+---------+----+--------+

重新创建输入数据:

List<String[]> stringAsList = new ArrayList<>();
stringAsList.add(new String[] { "1","7","5" });
stringAsList.add(new String[] { "2","7","4" });
stringAsList.add(new String[] { "3","7","3" });
stringAsList.add(new String[] { "4","7","2" });
stringAsList.add(new String[] { "5","7","1" });
    
JavaSparkContext sparkContext = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String[] row) -> RowFactory.create(row));

   
StructType schema = DataTypes
  .createStructType(new StructField[] {
    DataTypes.createStructField("code1", DataTypes.StringType, false),
    DataTypes.createStructField("code2", DataTypes.StringType, false),
    DataTypes.createStructField("code3", DataTypes.StringType, false)
  });

Dataset<Row> dataDf= sparkSession.sqlContext().createDataFrame(rowRDD, schema).toDF();

    
Dataset<Row> dataDs = dataDf
  .withColumn("code1", col("code1").cast(DataTypes.IntegerType))
  .withColumn("code2", col("code2").cast(DataTypes.IntegerType))
  .withColumn("code3", col("code3").cast(DataTypes.IntegerType));

我在这里做错了什么?

此处的 Scala Notebook 相同

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/3062033079132966/7035720262824085/latest.html https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/3062033079132966/7035720262824085/latest.html


lit()返回Column类型,但是map.get需要int类型 你可以这样做

    val df: DataFrame = spark.sparkContext.parallelize(Range(0, 10000), 4).toDF("sentiment")
    val map = new util.HashMap[Int, Int]()
    map.put(1, 1)
    map.put(2, 2)
    map.put(3, 3)
    val bf: Broadcast[util.HashMap[Int, Int]] = spark.sparkContext.broadcast(map)
    df.rdd.map(x => {
      val num = x.getInt(0)
      (num, bf.value.get(num))
    }).toDF("key", "add_key").show(false)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 广播变量 Map 给出 null 值 的相关文章

  • java.lang.Class: 在 java 程序中初始化 log4j 属性文件时出错

    我正在尝试使用 log4j 运行独立的 java 程序 但在调试时收到以下消息 控制台上没有 log4j 相关日志 log Logger 1343 java lang Class ERROR in 18b4aac2 有人可以建议这里出了什么
  • 如何从另一个xml文件动态更新xml文件?

    我想从另一个 xml 文件更新 xml 文件 我使用了一个 xml 文件 如下所示 one xml
  • 通过 InjectMocks Spy 注入对象

    我需要对一个类运行一系列单元测试 该类具有 Autowired Logger 实现 实现的基本思想是 Mock Logger logger InjectMocks TestedClass tested 但我想保存日志输出功能 Mockito
  • 使用 Spring 时实例化对象,用于测试与生产

    使用 Spring 时 应该使用 Spring 配置 xml 来实例化生产对象 并在测试时直接实例化对象 这样的理解是否正确 Eg MyMain java package org world hello import org springf
  • Java 泛型/类型调度问题

    考虑以下程序 import java util List import java util ArrayList public class TypeTest public static class TypeTestA extends Type
  • Java 重写 hashCode() 得到 StackOverflowError

    所以我不太熟悉重写 hashCode 并且我似乎在 hashCode 方法中以某种方式进行了一些无限递归 这是我的场景 我有一个 DuplicateCache 类 它是一个缓存对象 用于检查系统中的重复对象 我有一个静态内部类 Duplic
  • 场景生成器删除 fxml 文件中的导入

    我使用场景构建器 Gluon Scene Builder JavaFX Scene Builder 8 1 1 来创建应用程序的 UI 并使用 Eclipse 开发 JavaFX 现在 每次我在场景生成器中保存某些内容时 它都会从 fxml
  • 想要开发像 Facebook 这样的网站 - 处理数百万个请求 - 高性能 [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我想用 Java 开发一个像 Fac
  • 如何在java中使jpeg无损?

    有没有人可以告诉我如何使用编写 jpeg 文件losslessjava中的压缩 我使用下面的代码读取字节来编辑字节 WritableRaster raster image getRaster DataBufferByte buffer Da
  • 具有多种值类型的 Java 枚举

    基本上我所做的是为国家编写一个枚举 我希望不仅能够像国家一样访问它们 而且还能够访问它们的缩写以及它们是否是原始殖民地 public enum States MASSACHUSETTS Massachusetts MA true MICHI
  • 打印包含 JBIG2 图像的 PDF

    请推荐一些库 帮助我打印包含 JBIG2 编码图像的 PDF 文件 PDFRenderer PDFBox别帮我 这些库可以打印简单的 PDF 但不能打印包含 JBIG2 图像的 PDF PDFRenderer尝试修复它 根据 PDFRedn
  • 为什么 ConcurrentHashMap::putIfAbsent 比 ConcurrentHashMap::computeIfAbsent 更快?

    使用 ConcurrentHashMap 我发现computeIfAbsent 比putIfAbsent 慢两倍 这是简单的测试 import java util ArrayList import java util List import
  • tomcat 过滤所有 web 应用程序

    问题 我想对所有网络应用程序进行过滤 我创建了一个过滤器来监视对 apache tomcat 服务器的请求 举例来说 它称为 MyFilter 我在 netbeans 中创建了它 它创建了 2 个独立的目录 webpages contain
  • 在 AKKA 中,对主管调用 shutdown 是否会停止其监督的所有参与者?

    假设我有一位主管连接了 2 位演员 当我的应用程序关闭时 我想优雅地关闭这些参与者 调用supervisor shutdown 是否会停止所有参与者 还是我仍然需要手动停止我的参与者 gracias 阻止主管 https github co
  • 使用 Java 从 S3 上的文件在 S3 上创建 zip 文件

    我在 S3 上有很多文件 需要对其进行压缩 然后通过 S3 提供压缩文件 目前 我将它们从流压缩到本地文件 然后再次上传该文件 这会占用大量磁盘空间 因为每个文件大约有 3 10MB 而且我必须压缩多达 100 000 个文件 所以一个 z
  • 如何在android sdk上使用PowerMock

    我想为我的 android 项目编写一些单元测试和仪器测试 然而 我遇到了一个困扰我一段时间的问题 我需要模拟静态方法并伪造返回值来测试项目 经过一些论坛的调查 唯一的方法是使用PowerMock来模拟静态方法 这是我的 gradle 的一
  • struts 教程或示例

    我正在尝试在 Struts 中制作一个登录页面 这个想法是验证用户是否存在等 然后如果有错误 则返回到登录页面 错误显示为红色 典型的登录或任何表单页面验证 我想知道是否有人知道 Struts 中的错误管理教程 我正在专门寻找有关的教程 或
  • 重写Object类的finalize()方法有什么用?

    据我所知 在java中如果我们想手动调用垃圾收集器 我们可以执行System gc 1 我们在重写的finalize 方法中做了哪些操作 2 如果我们想手动调用JVM垃圾收集器 是否需要重写finalize 方法 我们在重写的 Finali
  • 为什么 BufferedWriter 不写入文件?

    我有这个代码 String strings Hi You He They Tetrabenzene Caaorine Calorine File file new File G words txt FileWriter fWriter Bu
  • MongoDB Java 驱动程序:MongoCore 驱动程序与 MongoDB 驱动程序与 MongoDB 异步驱动程序

    MongoDB Java 驱动程序有三种不同的驱动程序选项 核心驱动 MongoDB 驱动程序 MongoDB 异步驱动程序 The 驱动程序描述页面 https docs mongodb org ecosystem drivers jav

随机推荐

  • django-rest-framework :设置每个用户权限

    我使用 DRF 3 0 1 创建了 REST api 如果我使用权限类rest framework permissions IsAuthenticated 任何经过身份验证的用户都可以对任何用户执行 GET POST 等操作 只要他拥有有效
  • 查找集合中的重复条目

    是否有一个工具或库可以根据可以实现的特定标准查找集合中的重复条目 为了让自己清楚 我想根据特定标准对条目进行相互比较 所以我认为一个Predicate刚刚返回true or false还不够 我不能使用equals 这取决于标准的语义 如果
  • 带进度条的 R Shiny 异步

    Shiny 中的异步处理应该采用长时间运行的函数并将控制权交还给用户 然而 让用户知道计算正在后台运行仍然是件好事 我无法弄清楚如何构建异步进程以在后台运行并仍然显示进度指示器 下面是我一直在摆弄的示例代码 我认为进度指示器是一个问题 而且
  • 国产提取器和案例类提取器的区别

    根据scala规范 由case类构建的提取器如下 scala规范 5 3 2 def unapply tps x c tps if x eq null scala None else scala Some x xs11 x xs1k 出于实
  • 在 html 页面加载和渲染期间显示 throbber

    我有一个渲染缓慢的页面 网络旅行很快 页面的初始加载速度很快 您实际上可以看到 如果您的机器足够慢 html 组件的初始布局 然后运行一些 javascript 的东西 使其中一些组件全部支持 ajax 最后CSS被应用 我对 javasc
  • 联系 使用权限请求 iPhone

    我的应用程序被苹果审核团队拒绝了 据他们说 原因是 17 1 未经用户事先许可并为用户提供访问权限 应用程序不得传输有关用户的数据 有关如何以及在何处使用数据的信息 具体来说 您的应用程序无需请求许可即可访问用户联系人 第一的 但是 我用过
  • 制作旋转动画:开始和结束缓慢,但中间快速

    我想要将旋转动画应用于一个元素 旋转应该缓慢开始 然后变得越来越快 然后它将到达一个点 从该点开始它将继续非常快 然后非常缓慢地越来越慢 直到停止 该图看起来像这样 Speed gt Time 我如何将此路径应用于 jQueryanimat
  • 根据闭包上下文的要求,返回类型“Future Function()”不是“Future

    我已将项目 sdk 升级为 gt 2 12所以我正在进行更改 但我陷入了这个特定的错误 无法找到任何解决方案 我对该方法进行了更改 如下所示 在调用它时我不知道如何修复它 这个问题是在我升级并尝试解决所有问题后引起的non nullable
  • 当我将数字指定为 .000021 时,为什么 PHP 会以科学计数法打印我的数字?

    在 PHP 中我有以下代码 输出是2 1E 5 为什么 它应该打印 000021 Use 数字格式 http php net function number format得到你想要的东西 print number format var 5
  • 哪个 Clang 警告相当于 GCC 中的 Wzero-as-null-pointer-constant?

    我们的项目使用C 11 14 我们想使用nullptr代替0 or NULL用指针 即使当0 作为整数文字 是允许的 我有以下代码 int main int ptr1 nullptr 1 int ptr2 0 2 如果我使用 GCC 5 3
  • 将文本内容包裹在剪辑路径多边形(三角形)形状内,并将图像剪辑在另一半上

    Im trying to achieve something like this 我需要将一些文本和图像分成两半 如上图所示 尝试使用 Clip path 但文本内容未换行并且也存在对齐问题 My code clipped text wid
  • 一个 git 仓库中有多个项目?

    我目前是 SVN 用户 正在考虑将我的项目转移到 git 我使用一个 SVN 存储库来保存我创建的所有项目 我的结构是这样的 group1 subgroup1 project1 branches tags trunk project2 su
  • 激活 graph_tool 的 cairo 相关功能

    我一直在使用图形工具库 https graph tool skewed de 到目前为止 我已经有一段时间没有真正使用过它的大部分绘图功能了 今天尝试使用时graph tool draw graph draw https graph too
  • 使用 ActivityManager.getRunningAppProcesses() 获取(真实)前台进程

    我正在尝试确定用户当前可见的应用程序 为此 我使用activityManager getRunningAppProcesses method 我知道 Android 5 1 1 不支持该方法 这没关系 一开始它就像魅力一样 我正在迭代列表
  • 在C中不使用条件语句和三元运算符查找三个数字中的最大值

    我必须找到用户提供的最多三个数字 但有一些限制 不允许使用任何条件语句 我尝试使用三元运算符 如下所示 max a gt b a b gt c a gt b a b c 但同样限制使用三元运算符 现在我不知道该怎么做 利用布尔表达式中的短路
  • 将jquery mobile中的所有点击事件替换为tap以加快速度

    我使用phonegap 和jquery mobile 开发移动应用程序 我使用数据角色等创建了布局 在这个应用程序中 我有很多如下所示的按钮可以转到不同的页面 我没有专门将点击事件绑定到这些按钮 它们只是使用 href 来实现魔法 a hr
  • 如何创建 IPC(进程间通信)C 程序来创建两个子进程

    我想创建一个 IPC c 程序来创建一个父进程和两个子进程 我的代码是 include
  • Spark代码组织和最佳实践[关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 因此 在面向对象的世界中度过了很多年 始终考虑到代码重用 设计模式和最佳实践 我发现自己在 Spark 世界中的代码组织和代码重用方面有些挣扎
  • NSTableView 行中的垂直对齐文本

    我对 NSTableView 有一个小问题 当我增加表中一行的高度时 其中的文本在行顶部对齐 但我想将其垂直居中对齐 谁能建议我有什么方法可以做到这一点 Thanks Miraaj 这是一个简单的代码解决方案 显示了可用于居中对齐 Text
  • Spark 广播变量 Map 给出 null 值

    我正在使用 java8 和 Spark v2 4 1 我正在尝试使用广播变量Map查找使用如下所示 输入数据 code1 code2 code3 1 7 5 2 7 4 3 7 3 4 7 2 5 7 1 预期输出 code1 code2