本篇介绍在window运行环境下,使用spark消费kafka数据遇到的几个坑。。
调试环境IDEA
//依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.7</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.7</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>2.4.7</version>
</dependency>
1.设置checkpoint
在本地运行sparkstream程序的时候,需要设置checkpoint,在设置好本地路径的checkpoint后运行程序,出现如下报错:
原因:
虽然checkpoint到本地目录,但是在spark的底层调用里面还是用到了hadoop的api,由于hadoop的api里面用到了hadoop.home.dir环境变量,在本地没有设置,所以报错。
解决方法:
- 在本地下载hadoop,设置环境变量
- 在程序开始添加代码
System.setProperty("hadoop.home.dir", "C:\\Users\\17903\\Downloads\\hadoop-common-2.2.0-bin-master")
后面的路径是下载hadoop.home.dir后解压的位置
2、使用jdk11,scala2.12时,SparkStreaming消费Kafka数据会报错java序列化错误
很不巧,我刚好对应了这个错误,搜索几小时,找到了那么简单朴实的一句解决方案
3、sparkstreaming消费kafka出现序列化错误
在程序运行后,正常打印出现时间戳,但当kafka中topic新增生产者数据时,程序出现异常
原因:
Kafka10的ConsumerRecord 异常时未序列化。
解决方法:
将ConsumerRecord类注册为使用Kyro序列化
在程序开始的conf配置项中添加:
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
测试代码:
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import java.lang
object test_kafka {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "C:\\Users\\17903\\Downloads\\hadoop-common-2.2.0-bin-master")
val conf = new SparkConf().setMaster("local[2]").setAppName("myapp").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "master:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: lang.Boolean),
"group.id" -> "spark")
val KafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](List("test1"), kafkaParams)
)
KafkaStream.map(_.value()).flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>(x+y)).print()
ssc.checkpoint("C:/Users/17903/Desktop/checkpoint")
ssc.start()
ssc.awaitTermination()
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)