问题 :
我想使用 JDBC 连接来使用 Spark 发出自定义请求。
此查询的目标是优化工作人员的内存分配,因为我无法使用:
ss.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
现在 :
我目前正在尝试运行:
ss = SparkSession
.builder()
.appName(appName)
.master("local")
.config(conf)
.getOrCreate()
ss.sql("some custom query")
配置 :
url=jdbc:mysql://127.0.0.1/database_name
driver=com.mysql.jdbc.Driver
user=user_name
password=xxxxxxxxxx
Error :
[info] Exception encountered when attempting to run a suite with class name: db.TestUserProvider *** ABORTED ***
[info] org.apache.spark.sql.AnalysisException: Table or view not found: users; line 1 pos 14
[info] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:459)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:478)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:463)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
[info] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
假设 :
我猜是配置错误,但我找不到哪里。
火花罐read and write使用 JDBC 数据源将数据传入或传出关系数据库(就像您在第一个代码示例中所做的那样)。
此外(并且完全独立),spark 允许使用 SQL 进行查询views它们是根据已从某个源加载到 DataFrame 中的数据创建的。例如:
val df = Seq(1,2,3).toDF("a") // could be any DF, loaded from file/JDBC/memory...
df.createOrReplaceTempView("my_spark_table")
spark.sql("select a from my_spark_table").show()
只有以这种方式创建的“表”(称为视图,从 Spark 2.0.0 开始)可以使用以下命令进行查询SparkSession.sql
.
如果您的数据存储在关系数据库中,Spark 必须首先从那里读取数据,然后才能在加载的副本上执行任何分布式计算。底线 - 我们可以使用从表中加载数据read
,创建一个临时视图,然后查询它:
ss.read
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1/database_name")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
.createOrReplaceTempView("my_spark_table")
// and then you can query the view:
val df = ss.sql("select * from my_spark_table where ... ")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)