sparkStreaming中用sparksql时遇到的问题总结

2023-11-06

问题一

今天准备用sparkStreaming接入kafka再写入hive,准备在流里面执行sparksql,按照官网的写法,一开始的代码是这样的:

SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("test");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
SparkSession ss = SparkSession.builder()
                .config(sparkConf)
                .enableHiveSupport()
                .getOrCreate();
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(20));

然后执行sql

ss.sql("show databases");

咋一看没啥毛病,执行后发现报错了。。。。报错如下:
在这里插入图片描述
百度后告诉我,因为创建了两个sparkcontext
请教同事,告诉我把SparkSession 和JavaStreamingContext 的代码位置换一下。果然执行后不报错了。
然而。。。。
发现个新问题
不管我在hive中创建多少个database,执行后都只有一个default库。懵逼。。。
继续找问题,发现。。。。
SparkSession.builder()会创建一个sparkContext() ;
new JavaStreamingContext(sparkConf, Durations.seconds(20)) 也创建一个sparkContext;
spark 只能使用一个全局的SparkContext;
所以,SparkSession创建时的sparkConf就不是初始化的sparkConf。
后来代码改成如下:

SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("test");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
SparkSession ss = SparkSession.builder()
		.config(sparkConf)
		.enableHiveSupport()
		.getOrCreate();
JavaStreamingContext jsc = new JavaStreamingContext(new JavaSparkContext(ss.sparkContext()), Durations.seconds(20));
ss.sql("show databases");

用JavaStreamingContext的另一个构造方法,从 SparkSession 中获取到JavaSparkContext。

问题二

问题:org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(org.apache.hadoop.fs.Path, java.lang.String, java.util.Map, boolean, int, boolean, boolean, long, boolean, org.apache.hadoop.hive.ql.io.AcidUtils$Operation)
原因:找到loadDynamicPartitions这个方法,发现是来自下面的依赖

<groupId>org.spark-project.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>1.2.1.spark2</version>

<groupId>org.spark-project.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>1.2.1.spark2</version>

这个依赖是从下面这个依赖继承来的

<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.3</version>

报错信息来看,loadDynamicPartitions这个方法需要10个参数,所以把这个自带的依赖排掉,换个依赖,保证loadDynamicPartitions这个方法的参数是10个参数。
最终换到2.3.4版本,用的是org.apache.hive,不是之前的org.spark-project.hive

<dependency>
	<groupId>org.apache.hive</groupId>
	<artifactId>hive-exec</artifactId>
	<version>2.3.4</version>
</dependency>
<dependency>
	<groupId>org.apache.hive</groupId>
	<artifactId>hive-metastore</artifactId>
	<version>2.3.4</version>
</dependency>

问题解决!

问题三

问题:Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.
原因:
采用的是spring-sparkstreaming-kafka10,如果是采用官方文档里上述方式手动提交offset,需要把stream对象的属性标记为static或者transient避免序列化,不然可能在任务提交的时候报DirectKafkaInputDStream 无法序列化导致Task not serializable错误。
解决:
定义一个stream private static(transient) JavaInputDStream<ConsumerRecord<String, String>> stream;
然后在初始化他 stream = KafkaParamsConfig.buildKafkaSourceDStream(topic, jsc);

问题四

sparkstreaming的checkpoint的间隔要是批次间隔的倍数

注:后续其他问题再补充。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

sparkStreaming中用sparksql时遇到的问题总结 的相关文章

  • spark性能优化调优指导性文件

    1 让我们看一下前面的核心参数设置 num executors 10 20 executor cores 1 2 executor memory 10 20 driver memory 20 spark default parallelis
  • Spark广播变量与累加器

    在之前的文章中 我介绍了flink广播状态 从而了解了flink广播状态实际上就是将一个流广播到下游所有算子之中 在本文中我将介绍spark中类似的概念 为了方便理解 先放张spark应用程序架构图 1 普通spark变量 实际上 如果我们
  • 分类算法之朴素贝叶斯

    1 朴素贝叶斯分类算法 朴素贝叶斯 Naive Bayes NB 算法是基于贝叶斯定理与特征条件独立假设的分类方法 该算法是有监督的学习算法 解决的是分类问题 是将一个未知样本分到几个预先已知类别的过程 朴素贝叶斯的思想就是根据某些个先验概
  • SparkStreaming知识总结

    一 流式计算的概述 1 1 什么是流式计算 1 数据流与静态数据的区别 数据流指的就是不断产生的数据 是源源不断 不会停止 静态数据指的就是存储在磁盘中的固定的数据 2 流式计算的概念 就是对数据流进行计算 由于数据是炼苗不断的产生的 所以
  • spark集群搭建与mysql元数据管理

    找个spark集群搭建是针对于上一篇hadoop的基础上搭建的 所以spark的版本也是要按照着hadoop版本进行下载 1 解压spark 修改spark的 etc profile的home目录 2 安装SCALA 并配置SCALA HO
  • 【Spark NLP】第 3 章:Apache Spark 上的 NLP

    大家好 我是Sonhhxg 柒 希望你看完之后 能对你有所帮助 不足请指正 共同学习交流 个人主页 Sonhhxg 柒的博客 CSDN博客 欢迎各位 点赞 收藏 留言 系列专栏 机器学习 ML 自然语言处理 NLP 深度学习 DL fore
  • Spark课程设计——电影推荐系统

    题目所需数据集及相应信息描述 数据集 1 用户评分数据集ratings dat 包含了大量用户的历史评分数据 2 样本评分数据集personalRatings dat 包含了少数几个用户的个性化评分数据 这些数据反映了某个用户的个性化观影喜
  • Spark中的基本概念

    Spark中的基本概念 1 基本概念 1 1 RDD 弹性分布式数据集 1 2 DAG 有向无环图 1 3 Partition 数据分区 1 4 NarrowDependency 窄依赖 1 5 ShuffleDependency 宽依赖
  • scala和spark的下载与安装

    简易安装scala和spark 一 安装scala 1 安装scala scala下载注意和jdk的版本号 下载地址 https www scala lang org download 2 上传到linux虚拟机里 可通过rz方式上传 上传
  • Compressed Sparse Column format(CSC)

    CSR Compressed Sparse Row format 和CSC Compressed Spare Column format 都是一种稀疏矩阵的存储格式 这里分别给出实例 假设有如下矩阵 1360
  • 浅谈Hadoop体系和MPP体系

    浅谈Hadoop体系和MPP体系 引言 如题 在大数据发展至今 为了应对日益繁多的数据分析处理 和解决客户各种奇思妙 怪 想需求 形形色色的大数据处理的框架和对应的数据存储手段层出不穷 有老当益壮的Hadoop体系 依靠Hadoop巨大的社
  • Spark 源码阅读一-启动脚本

    Spark Complile Help Links Because spark 1 5 need maven version 3 3 3 so i track the branch 1 4 git branch a git checkout
  • 【Spark NLP】第 7 章:分类和回归

    大家好 我是Sonhhxg 柒 希望你看完之后 能对你有所帮助 不足请指正 共同学习交流 个人主页 Sonhhxg 柒的博客 CSDN博客 欢迎各位 点赞 收藏 留言 系列专栏 机器学习 ML 自然语言处理 NLP 深度学习 DL fore
  • 大数据开发必备面试题Spark篇合集

    1 Hadoop 和 Spark 的相同点和不同点 Hadoop 底层使用 MapReduce 计算架构 只有 map 和 reduce 两种操作 表达能力比较欠缺 而且在 MR 过程中会重复的读写 hdfs 造成大量的磁盘 io 读写操作
  • Hudi和Kudu的比较

    与Kudu相比 Kudu是一个支持OLTP workload的数据存储系统 而Hudi的设计目标是基于Hadoop兼容的文件系统 如HDFS S3等 重度依赖Spark的数据处理能力来实现增量处理和丰富的查询能力 Hudi支持Increme
  • spark_hadoop集群搭建自动化脚本

    bin bash 脚本使用说明 1 使用脚本前需要弄好服务器的基础环境 2 在hadoop的每个节点需要手动创建如下目录 data hdfs tmp 3 修改下面的配置参数 4 脚本执行完备后需要收到格式化namenode
  • 使用Flink1.16.0的SQLGateway迁移Hive SQL任务

    使用Flink的SQL Gateway迁移Hive SQL任务 前言 我们有数万个离线任务 主要还是默认的DataPhin调度CDP集群的Hive On Tez这种低成本任务 当然也有PySpark 打Jar包的Spark和打Jar包的Fl
  • Spark的常用概念总结

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 基本概念 1 RDD的生成 2 RDD的存储 3 Dependency 4 Transformation和Action 4 1 Transformatio
  • Spark 配置

    文章目录 1 Spark 配置 1 1 Spark 属性 1 1 1 动态加载Spark属性 1 1 2 查看Spark属性 1 2 环境变量 2 重新指定配置文件目录 3 继承Hadoop集群配置 4 定制的Hadoop Hive配置 1
  • spark相关

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 pandas是什么 二 使用步骤 1 引入库 2 读入数据 总结 前言 提示 这里可以添加本文要记录的大概内容 例如 随着人工智能的不断发展 机器学习这门

随机推荐

  • Flink任务日志写到kafka【最新1.12,1.13】

    这篇文章如果对你有帮助 记得点赞哦 有问题也可以给我评论 一 背景 公司的日志希望能够同一到一个Kibana去做一个同一的展示 那就需要将任务的日志写到kafka Flink1 12开始默认的日志框架就是log4j2 那么配置的方式跟之前l
  • StableLM(stablelm-tuned-alpha-7b)中文能力测试

    模型地址 https huggingface co stabilityai stablelm tuned alpha 7b 基模型 GPT NeoX 环境 2块3090 24G 运行代码 from transformers import A
  • Qt 2D绘图(2):绘制椭圆、弧、弦、扇形、圆角矩形

    Qt 2D绘图 2 绘制椭圆 弧 弦 扇形 圆角矩形 本文为原创文章 转载请注明出处 或注明转载自 黄邦勇帅 原名 黄勇 本文出自本人原创著作 Qt5 10 GUI完全参考手册 网盘地址 https pan baidu com s 1iqa
  • 初学SQL

    SQL是做什么的 SQL Structured Query Language 结构化查询语言 SQL 是用于访问和处理数据库的标准的计算机语言 一般的后台开发 只要是需要操作数据库 就需要用到SQL SQL与MySQL SQL Server
  • XCTF攻防世界web新手练习_ 10_simple_php

    XCTF攻防世界web新手练习 simple php 题目 题目为simple php 根据题目信息 判断是关于php代码审计的 打开题目 得到一串php代码 代码很简单 就是以GET方式获得两个参数a和b 如果a和b满足一定条件 则打印f
  • #css# 超出高度,可上下滚动

    div class test div class item p class title 这是一条数据 p div div 在最大的div里面加入以下代码 test width 200px max height 300px 设置最大的高度 o
  • 老毛桃 重装系统 Windos

    1 准备好系统启动U盘 2 电脑开机时按 F12 看电脑品牌 不同的电脑有不同的U盘启动的按钮 3 选择USB进入 4 回车进入WinPE桌面 5 打开分区工具 6 选择硬盘 使用快速分区 选择分成几个盘 确定 7 打开一键装机 选择映像文
  • 十分钟搞定 C/C++ 项目自动化构建 —— Xmake 入门指南

    上面是一个开发中的经典问题场景 发生的原因通常是 开发者工作机上的某些依赖项没有安装在测试或者客户环境中 导致程序无法运行或者报错 如何有效地避免这个尴尬的场景呢 那就需要一个持续的 可复用的自动构建流程 这样会促使团队保证他们的软件在 I
  • 缘份居在线起名,姓名打分API接口

    接口数据api 接口平台 https api yuanfenju com 开发文档 https doc yuanfenju com 支持格式 JSON 请求方式 HTTP POST 密钥 api secret wD XhOUW pvr 请求
  • 原生js层叠轮播

    轮播图 基本上是每个网页必备的一个模块 那么下面就给大家分享一个层叠样式轮播图 先给大家看一下样式图 首先满足自动轮播 鼠标放上就会停止轮播 移除就会继续 要想写好一个轮播图 首先肯定是要把样式 布局写出来 废话不多说 直接放样式布局 cs
  • ST-Llink与STM32最小系统开发板的线路连接

    ST Llink与STM32最小系统开发板的线路连接 1 ST Llink与STM32最小系统开发板的线路连接 ST Llink SWO STM32 SWDIO ST Llink SWCLK STM32 SWCLK ST Llink GND
  • AndroidTreeView中的列表树形展示 点击弹出树形结构中的某个列表并关闭已经打开的列表

    想看详情 链接如下 https download csdn net download sinat 28238111 10545762 public class MyTestFragment extends Fragment private
  • 二进制的简单应用

    include
  • SQL Server 存储过程返回结果集的几种方式

    SQL Server 返回结果集的几种方式 2017年12月18日 21 52 24 xxc1605629895 阅读数 7033更多 分类专栏 sqlserver 版权声明 本文为博主原创文章 遵循 CC 4 0 BY SA 版权协议 转
  • adc0832工作原理详解_ADC0832芯片介绍

    芯片的应用 模 数 AD 和数 模 DA 转换是模拟 电路和数字电路进行沟通的渠道 从前面的课程我们知道 数字电路里 电平只有 高和低两种状态 比如 5V 和 0V 对应着 1 和 0 模拟电路里 电平则理论上有无数 个状态 比如 0V 0
  • 高德地图报错 Uncaught TypeError: AMap.Geocoder is not a constructor

    原本我是这样写的 但是在谷歌或者其他浏览器的时候就报错了 而且谷歌等浏览器 pc端是只能定位到附近的基站 出现这个报错可以添加 AMap plugin AMap Geocoder function
  • chroot的作用及详解

    什么是 chroot chroot 即 change root directory 更改 root 目录 在 linux 系统中 系统默认的目录结构都是以 即是以根 root 开始的 而在使用 chroot 之后 系统的目录结构将以指定的位
  • 在线机械键盘测试软件,键盘检测软件(DAS G2 104/105 Keyboard Test Tool)

    DAS G2 104 105 Keyboard Test Tool是一款主要针对新老键盘测试的工具 可以测试键盘的连键盘 坏键 按键响应时间 软件相关 根据测试人员从近百次的测试结果 50篇发文反馈来看 我们的硬核测试逐步得到了一些读者的关
  • java 文件pem,在Java中,仅用PEM文件创建SSLContext的最简单方法是什么?

    I used LetsEncrypt s CertBot to generate PEM files for free In other languages it is easy to start an HTTPS server using
  • sparkStreaming中用sparksql时遇到的问题总结

    问题一 今天准备用sparkStreaming接入kafka再写入hive 准备在流里面执行sparksql 按照官网的写法 一开始的代码是这样的 SparkConf sparkConf new SparkConf setMaster lo