Spark 序列化错误:当我将 Spark Stream 数据插入 HBase 时

2024-02-18

我对 Spark 如何在数据格式方面与 HBase 交互感到困惑。例如,当我在下面的代码片段中省略“ERROR”行时,它运行良好......但是添加该行后,我发现了与序列化问题相关的“任务不可序列化”的错误。

如何更改代码? 发生错误的原因是什么?

我的代码如下:

// HBase 
    Configuration hconfig = HBaseConfiguration.create();
    hconfig.set("hbase.zookeeper.property.clientPort", "2222");
    hconfig.set("hbase.zookeeper.quorum", "127.0.0.1");  
    hconn = HConnectionManager.createConnection(hconfig);  
    HTable htable = new HTable(hconf, Bytes.toBytes(tableName));       

// KAFKA configuration 
    Set<String> topics = Collections.singleton(topic); 

    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", "localhost:9092");
    kafkaParams.put("zookeeper.connect", "localhost:2222");
    kafkaParams.put("group.id", "tag_topic_id");  

//Spark Stream  
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics );  

    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {   

        @Override
        public String call(Tuple2<String, String> tuple2)  { 
            return tuple2._2();
        }
    });  

    JavaDStream<String> records = lines.flatMap(new FlatMapFunction<String, String>() {  

        @Override
        public Iterator<String> call(String x) throws IOException {   

////////////// Put into HBase : ERROR ///////////////////// 
            String[] data = x.split(","); 

            if (null != data && data.length > 2 ){ 
                SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");   
                String ts = sdf.format(new Date());

                Put put = new Put(Bytes.toBytes(ts)); 

                put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("LINEID"), Bytes.toBytes(data[0]));
                put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("TAGID"), Bytes.toBytes(data[1]));
                put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("VAL"), Bytes.toBytes(data[2]));

                htable.put(put); // ***** ERROR ******** 
                htable.close();  
            }
            return Arrays.asList(COLDELIM.split(x)).iterator(); 
        } 

    }); 

    records.print();

    ssc.start();
    ssc.awaitTermination();

当我启动我的应用程序时,遇到以下错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.streaming.dstream.DStream$$anonfun$flatMap$1.apply(DStream.scala:554)
at org.apache.spark.streaming.dstream.DStream$$anonfun$flatMap$1.apply(DStream.scala:554)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:682)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264)
at org.apache.spark.streaming.dstream.DStream.flatMap(DStream.scala:553)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.flatMap(JavaDStreamLike.scala:172)
at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.flatMap(JavaDStreamLike.scala:42) 

Caused by: java.io.NotSerializableException: org.apache.hadoop.hbase.client.HTable
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.client.HTable, value: MCSENSOR;hconnection-0x6839203b)

序列化调试器在这里给你一个提示

Caused by: java.io.NotSerializableException: org.apache.hadoop.hbase.client.HTable
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.client.HTable, value: MCSENSOR;hconnection-0x6839203b)

将下面的部分放入里面FlatMapFunction在调用方法(闭包)之前使用它,这应该可以解决问题

Configuration hconfig = HBaseConfiguration.create();
    hconfig.set("hbase.zookeeper.property.clientPort", "2222");
    hconfig.set("hbase.zookeeper.quorum", "127.0.0.1");  
    hconn = HConnectionManager.createConnection(hconfig);  
    HTable htable = new HTable(hconf, Bytes.toBytes(tableName));  
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 序列化错误:当我将 Spark Stream 数据插入 HBase 时 的相关文章

  • 如何使用 JAVA 代码以编程方式捕获线程转储?

    我想通过 java 代码生成线程转储 我尝试使用 ThreadMXBean 为此 但我没有以正确的格式获得线程转储 因为我们正在使用jstack命令 请任何人提供一些帮助 他们是否有其他方式获取线程转储 使用任何其他 API 我想要的线程转
  • 如何将jscrollpane添加到jframe?

    我有以下源代码 有人可以给我建议如何将 jscrollpane 添加到 jframe 上吗 我尝试了几次将其添加到 jframe 但没有任何进展 它甚至没有显示 public class Form3 JFrame jframe new JF
  • 埃拉托色尼筛法 - 实现返回一些非质数值?

    我用 Java 实现了埃拉托斯特尼筛法 通过伪代码 public static void sieveofEratosthenes int n boolean numArray numArray new boolean n for int i
  • tomcat 7.0.50 java websocket 实现给出 404 错误

    我正在尝试使用 Java Websocket API 1 0 JSR 356 中指定的带注释端点在 tomcat 7 0 50 上实现 websocket 以下是我如何对其进行编码的简要步骤 1 使用 ServerEndpoint注解编写w
  • 为自定义驱动程序创建 GraphicsDevice

    我正在开发一个在嵌入式系统中使用 Java 的项目 我有用于屏幕和触摸输入的驱动程序 以及用于文本输入的虚拟键盘 我的屏幕驱动程序有一个Graphics2D您可以绘制的对象和repaint Rectangle 更新方法 类似地 触摸驱动器能
  • 为什么 MOVE CURSOR 在 OS X Mountain Lion 上不显示?

    我正在做一个项目 想看看 Swing 提供的每个光标是什么样子的 public class Test public static void main String args JFrame frame new JFrame frame set
  • 如何检测图像是否像素化

    之前有人在 SO 上提出过这样的问题 在Python中检测像素化图像 https stackoverflow com questions 12942365 detecting a pixelated image in python还有关于q
  • Spring Data JPA:查询如何返回非实体对象或对象列表?

    我在我的项目中使用 Spring Data JPA 我正在演奏数百万张唱片 我有一个要求 我必须获取各种表的数据并构建一个对象 然后将其绘制在 UI 上 现在如何实现我的 Spring 数据存储库 我读到它可以通过命名本机查询来实现 如果指
  • 如何停止执行的 Jar 文件

    这感觉像是一个愚蠢的问题 但我似乎无法弄清楚 当我在 Windows 上运行 jar 文件时 它不会出现在任务管理器进程中 我怎样才能终止它 我已经尝试过 TASKKILL 但它对我也不起作用 On Linux ps ef grep jav
  • JAVA中遍历JSON数据

    我是 JSON 新手 我使用 HTTPUrlConnections 并在 JAVA 程序中获得一些响应 响应数据将类似于 data id 1 userId 1 name ABC modified 2014 12 04 created 201
  • collect_list() 是否保持行的相对顺序?

    想象一下我有以下 DataFrame df id featureName featureValue id1 a 3 id1 b 4 id2 a 2 id2 c 5 id3 d 9 想象一下我运行 df groupBy id agg coll
  • 使用Java绘制维恩图

    我正在尝试根据给定的布尔方程绘制维恩图 例如 a AND b AND c我想在 Android 手机上执行此操作 因此我需要找到一种使用 Java 来执行此操作的方法 我找到了一个完美的小部件 它可以完成我在这方面寻找的一切布尔代数计算器
  • 如何从 Ant 启动聚合 jetty-server JAR?

    背景 免责声明 I have veryJava 经验很少 我们之前在 Ant 构建期间使用了 Jetty 6 的包装版本来处理按需静态内容 JS CSS 图像 HTML 因此我们可以使用 PhantomJS 针对 HTTP 托管环境运行单元
  • 我可以限制分布式应用程序发出的请求吗?

    我的应用程序发出 Web 服务请求 提供商处理的请求有最大速率 因此我需要限制它们 当应用程序在单个服务器上运行时 我曾经在应用程序级别执行此操作 一个对象跟踪到目前为止已发出的请求数量 并在当前请求超出允许的最大负载时等待 现在 我们正在
  • 使用 Java https 上传到 Imgur v3 错误

    我目前正在尝试使用他们当前的 API v3 上传到 imgur 但是我不断收到错误 错误 javax net ssl SSLException 证书中的主机名不匹配 api imgur com imgur com OR imgur com
  • 记录类名、方法名和行号的性能影响

    我正在我的 java 应用程序中实现日志记录 以便我可以调试应用程序投入生产后可能出现的潜在问题 考虑到在这种情况下 人们不会奢侈地使用 IDE 开发工具 以调试模式运行事物或单步执行完整代码 因此在每条消息中记录类名 方法名和行号将非常有
  • 使用 JFreeChart 为两个系列设置不同的 y 轴

    我正在使用 JFreeChart 使用折线图绘制两个数据系列 XYSeries 复杂的因素是 其中一个数据系列的 y 值通常远高于第二个数据系列的 y 值 假设第一个系列的 y 值约为数百万数量级 而第二个数据系列的 y 值约为数百万数量级
  • try-with-resources 中出现死代码警告,但翻译后的 try-catch-finally 中没有出现死代码警告

    以下代码使用try 有资源 https docs oracle com javase specs jls se7 html jls 14 html jls 14 20 3Java 8 中引入的构造 偶尔抛出 方法被声明为抛出一个偶尔的异常
  • 基于 Spring Boot 的测试中的上下文层次结构

    我的 Spring Boot 应用程序是这样启动的 new SpringApplicationBuilder sources ParentCtxConfig class child ChildFirstCtxConfig class sib
  • 即使调整大小,如何获得屏幕的精确中间位置

    好的 这个问题有两部分 当我做一个JFrame 并在其上画一些东西 即使我将宽度设置为 400 并使其在一个项目击中它时 当然 允许项目宽度 它会反弹回来 但由于某种原因 它总是偏离屏幕约 10 个像素 有没有办法解决这个问题 或者我只需要

随机推荐