将 SparkSession.sql() 与 JDBC 结合使用

2024-03-22

问题 :

我想使用 JDBC 连接来使用 Spark 发出自定义请求。

此查询的目标是优化工作人员的内存分配,因为我无法使用:

ss.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

现在 :

我目前正在尝试运行:

ss = SparkSession
  .builder()
  .appName(appName)
  .master("local")
  .config(conf)
  .getOrCreate()

ss.sql("some custom query")

配置 :

url=jdbc:mysql://127.0.0.1/database_name
driver=com.mysql.jdbc.Driver
user=user_name
password=xxxxxxxxxx

Error :

[info] Exception encountered when attempting to run a suite with class name: db.TestUserProvider *** ABORTED ***
[info]   org.apache.spark.sql.AnalysisException: Table or view not found: users; line 1 pos 14
[info]   at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:459)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:478)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:463)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
[info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)

假设 :

我猜是配置错误,但我找不到哪里。


火花罐read and write使用 JDBC 数据源将数据传入或传出关系数据库(就像您在第一个代码示例中所做的那样)。

此外(并且完全独立),spark 允许使用 SQL 进行查询views它们是根据已从某个源加载到 DataFrame 中的数据创建的。例如:

val df = Seq(1,2,3).toDF("a") // could be any DF, loaded from file/JDBC/memory...
df.createOrReplaceTempView("my_spark_table")
spark.sql("select a from my_spark_table").show()

只有以这种方式创建的“表”(称为视图,从 Spark 2.0.0 开始)可以使用以下命令进行查询SparkSession.sql.

如果您的数据存储在关系数据库中,Spark 必须首先从那里读取数据,然后才能在加载的副本上执行任何分布式计算。底线 - 我们可以使用从表中加载数据read,创建一个临时视图,然后查询它:

ss.read
  .format("jdbc")
  .option("url", "jdbc:mysql://127.0.0.1/database_name")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()
  .createOrReplaceTempView("my_spark_table")

// and then you can query the view:
val df = ss.sql("select * from my_spark_table where ... ")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 SparkSession.sql() 与 JDBC 结合使用 的相关文章

  • 使用 slick 3.0.0-RC1 无法在 TableQuery 上找到方法结果

    我正在尝试 Slick3 0 0 RC1我遇到了一个奇怪的问题 这是我的代码 import slick driver SQLiteDriver api import scala concurrent ExecutionContext Imp
  • PDOException SQLSTATE[HY000] [2002] 我的本地计算机上的连接超时

    最近我从服务器导入了代码 但本地代码无法连接到远程 mysql 数据库 所以我有两个问题 我可以访问我的远程数据库吗 如果是 为什么我的代码不起作用 如果没有 有没有办法绕过这个问题 我不想复制在本地计算机上运行的 mysql 数据库 我的
  • 从 pandas udf 记录

    我正在尝试从 python 转换中调用的 pandas udf 进行日志记录 因为在执行器上调用的代码不会显示在驱动程序的日志中 我一直在寻找一些选项 但到目前为止最接近的选项是这个one https stackoverflow com q
  • 删除所有值比第二高值低 5 倍的记录

    我有一个表 价格 有两个字段 代码 字符 和价格 小数 我需要查找具有相同代码 价格比两个最高价格低 5 倍或更少的所有记录 例如 在这种情况下 我希望删除 id 1 id code price 1 1001 10 2 1001 101 3
  • 通过日期选择器过滤查询后检索具有特定值的行数[重复]

    这个问题在这里已经有答案了 目前 我正在使用 CodeIgniter 来检索特定时间范围内的数据 所有这些条目都有一个状态 我想将具有相同状态的所有条目分组并将其显示在各自的标题中 目前 这是我的模型类 其中我有以下条目来返回特定日期范围内
  • 无法在 mysql 表中的值中使用破折号(-)[重复]

    这个问题在这里已经有答案了 我一直在尝试从 python 将数据插入 MYSQL 表 我的sql表中的字段是id token start time end time和no of trans 我想存储使用生成的令牌uuid4在令牌栏中 但由于
  • 在 kubernetes 上安装 PySpark 软件包时出现 Spark-Submit:ivy-cache 文件未找到错误

    我一整天都在与它斗争 我能够安装并使用带有 Spark shell 或连接的 Jupiter 笔记本的包 graphframes 但我想使用 Spark Submit 将其移动到基于 kubernetes 的 Spark 环境 我的火花版
  • 如何使用 scala 宏打印变量名称和值?

    我确信有一种更优雅的方式来编写以下宏来打印变量的名称和值 def mprintx c Context linecode c Expr Any c Expr Unit import c universe val namez c enclosi
  • MySQL中Join同表临时表

    我喜欢在 MySQL 中加入一个失败的临时表 这个想法很简单 CREATE TEMPORARY TABLE temp table LIKE any other table srsly it does not matter which tab
  • 用于真实 Web 项目的 Scala-JS [已关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 有人用过吗Scala JS在真实的网络项目中 但不仅仅适用于普通的JavaScript在隔离环境中替换 我想尽可能多地使用 Scala 我希望可
  • PHP strtotime返回Mysql UNIX_TIMESTAMP的不同值

    我在 stackoverflow 上搜索过帖子 发现了一些类似的帖子 但我认为这是一篇不同的帖子 我的 PHP 和 Mysql 服务器的时区全部设置为 UTC 在表中我使用时间戳字段 值为 2010 11 08 02 54 15 我使用这样
  • Scala:尝试 .getOrElse 与 if/else

    我是一名相当新的 Scala 开发人员 我是一名经验丰富的 Java 开发人员 到目前为止 我一直很喜欢 Scala 的简单性 我真的很喜欢函数式结构 而且它们常常迫使你编写更简洁的代码 然而最近我注意到 由于舒适性和简单性 我最终使用了在
  • SQL查询:按字符长度排序?

    是否可以按字符总数对sql数据行进行排序 e g SELECT FROM database ORDER BY data length 我想你想用这个 http dev mysql com doc refman 5 0 en string f
  • Mysql:my.cnf中的修改不生效

    我已经更新了my cnf我的数据库文件包含以下行 max connections 200 之后我停止并启动 mysql 服务以使更改生效 但由于某种原因 此更改不会影响数据库 因为如果我运行 mysql gt select max conn
  • scala 中 'Array[Int]' 隐式转换为 'Int => Int' 的地方在哪里?

    这是一个问题this https stackoverflow com questions 70000384 why val arr int int array1 2 3 is allowed in scala 现在我们已经证明了Array
  • Twitter Future 与 Scala Future 相比有何优势?

    我知道 Scala Future 变得更好的很多原因 有什么理由改用 Twitter Future 吗 除了 Finagle 使用它这一事实之外 免责声明 我在 Twitter 负责 Future 的实施 一点背景知识 在 Scala 有一
  • 如何在具有动态列的表中插入值 Jdbc/Mysql

    我想在具有动态列的表中添加值 我设法创建一个包含动态列的表 但我不知道如何插入数据 Create Table sql CREATE TABLE MyDB myTable level INTEGER 255 int columnNumber
  • Spark:替换嵌套列中的空值

    我想更换所有n a以下数据框中的值unknown 它可以是scalar or complex nested column 如果它是一个StructField column我可以循环遍历列并替换n a using WithColumn 但我希
  • 寻找多列索引的最佳顺序

    假设我有一个包含两个索引的表 一个位于 a 列 一个位于 a b 和 c 列 我注意到 根据索引定义中列的顺序 MySQL 可能最终使用单列索引而不是多列索引 即使多列索引中的所有三列都在 ON 中引用JOIN 的一部分 这有点引出了一个问
  • 什么样的函数被认为是“可组合的”?

    维基百科文章函数组合 计算机科学 https en wikipedia org wiki Function composition computer science says 就像数学中通常的函数组合一样 每个函数的结果作为下一个函数的参数

随机推荐