kafka
数据源是一个external https://github.com/apache/spark/tree/master/external/kafka-0-10-sql模块,默认情况下不可用于 Spark 应用程序。
您必须将其定义为您的依赖项pom.xml
(正如您所做的那样),但这只是将其添加到 Spark 应用程序中的第一步。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
有了这种依赖性,你必须决定是否要创建一个所谓的uber-jar这会将所有依赖项捆绑在一起(这会产生相当大的 jar 文件并使提交时间更长)或使用--packages
(或不太灵活--jars
) 选项添加依赖项spark-submit
time.
(还有其他选项,例如将所需的 jar 存储在 Hadoop HDFS 上或使用 Hadoop 发行版特定的方式来定义 Spark 应用程序的依赖项,但让我们保持简单)
我建议使用--packages
首先且仅当它有效时才考虑其他选项。
Use spark-submit --packages
包括Spark-SQL-Kafka-0-10模块如下。
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
根据需要包含其他命令行选项。
Uber-Jar 方法
包括所谓的所有依赖项uber-jar可能并不总是有效,因为如何META-INF
目录被处理。
For kafka
要工作的数据源(以及一般的其他数据源),您必须确保META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
所有数据源中的merged (not replace
or first
或您使用的任何策略)。
kafka
数据源使用自己的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister注册的org.apache.spark.sql.kafka010.KafkaSourceProvider https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L49作为数据源提供者kafka
format.