从 Spark 作业中调用 JDBC 到 impala/hive 并创建表

2023-11-23

我正在尝试在 scala 中编写一个 Spark 作业,该作业将打开与 Impala 的 jdbc 连接,并让我创建一个表并执行其他操作。

我该怎么做呢?任何例子都会有很大的帮助。 谢谢你!


val JDBCDriver = "com.cloudera.impala.jdbc41.Driver"
val ConnectionURL = "jdbc:impala://url.server.net:21050/default;auth=noSasl"

Class.forName(JDBCDriver).newInstance
val con = DriverManager.getConnection(ConnectionURL)
val stmt = con.createStatement()
val rs = stmt.executeQuery(query)

val resultSetList = Iterator.continually((rs.next(), rs)).takeWhile(_._1).map(r => {
    getRowFromResultSet(r._2) // (ResultSet) => (spark.sql.Row)
}).toList

sc.parallelize(resultSetList)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 Spark 作业中调用 JDBC 到 impala/hive 并创建表 的相关文章

  • JDBC set_approle

    我正在尝试使用prepareCall 通过 JDBC 连接设置应用程序角色 它似乎工作正常 即语法方面 但 SQL Server 2008 返回此错误 Application roles can only be activated at t
  • SparkSession 初始化需要很长时间

    SparkSession 初始化需要很长时间才能成功 这是我的代码 import findspark findspark init import pyspark from pyspark sql import SparkSession sp
  • 乔达时间:将 UTC 转换为本地时间

    我想将 Joda Time UTC DateTime 对象转换为本地时间 这是一种看似有效的费力方法 但一定有更好的方法 这是没有周围声明的代码 在 Scala 中 val dtUTC new DateTime 2010 10 28T04
  • 如何将列表 插入数据库

    我是 Java 新手 我已经创建了产品类型的通用列表 如何将其添加到数据库中 该列表包含Products的对象 数据库中的列是Products类的字段 即使我通过 listvariable get 0 等分隔列表 我也会得到对象 而不是该对
  • 在 Scala 中生成数字的质因数

    如何在 Scala 中生成整数的因子 这是我的看法1 def factorize x Int List Int def foo x Int a Int List Int if a gt Math pow x 0 5 return List
  • 我在 Scala 中将资源放在哪里?

    在学习使用 Scala 和 JavaFX 时 我在 a 中遇到了以下代码ProScalaFX 示例 https github com jpsacha ProScalaFX val resource getClass getResource
  • 副作用是纯函数中找不到的一切吗?

    可以肯定地说 以下二分法成立 每个给定的函数是 要么纯粹 或有副作用 如果是这样 函数的 副作用就是纯函数中找不到的任何东西 这很大程度上取决于您选择的定义 可以公平地说 函数是pure or impure 纯函数始终返回相同的结果并且不会
  • 有什么方法可以*不*在 Postgresql 中使用服务器端准备好的语句吗?

    在 比如说 Python 中 我可以发出 psycopg2 connect cursor execute select from account where id 00100000006ONCrAAO 在服务器上会产生以下日志条目 2011
  • Scalaz 7 Iteratee 处理大型 zip 文件(OutOfMemoryError)

    我正在尝试使用 scalaz iteratee 包在恒定空间中处理大型 zip 文件 我需要对 zip 文件中的每个文件执行一个长时间运行的进程 这些进程可以 并且应该 并行运行 我创建了一个EnumeratorT使每个膨胀ZipEntry
  • Spark 2.1无法在CSV上写入Vector字段

    当我将代码从 Spark 2 0 迁移到 2 1 时 我偶然发现了与 Dataframe 保存相关的问题 这是代码 import org apache spark sql types import org apache spark ml l
  • Scala 重载构造函数和 super

    我无法理解如何在 Java 上开发类似于以下的 Scala 代码 public abstract class A protected A protected A int a public abstract class B protected
  • 如何在 Akka Stream 中记录流量?

    我有一个带有单个流 图的 Akka Stream 应用程序 我想测量源头的流量并每 5 秒记录一次 例如 在过去 5 秒内收到 3 条消息 我尝试过 someOtherFlow groupedWithin Integer MAX VALUE
  • 为 Spark Thrift 服务器提供仓库目录的路径

    我已经设置了 Spark 集群 并且成功通过 Spark SQL 连接器连接 Tableau 我从 Spark shell 创建了表 并使用 saveAsTable 如何访问从 Tableau 保存的表 启动spark thrift服务器时
  • 将列表拆分为多个具有固定元素数量的列表

    如何将元素列表拆分为最多包含 N 个项目的列表 例如 给定一个包含 7 个元素的列表 创建 4 个组 最后一组可能包含较少的元素 split List 1 2 3 4 5 6 seven 4 gt List List 1 2 3 4 Lis
  • Spark:用列的平均值替换数据框中的空值

    如何创建 UDF 以编程方式将每列中 Spark 数据框中的空值替换为列平均值 例如 在示例中 数据 col1 空值的值为 2 4 6 8 5 5 5 示例数据 col1 col2 col3 2 null 3 4 3 3 6 5 null
  • 使用 Spark pandas_udf 创建列,具有动态数量的输入列

    我有这个 df df spark createDataFrame row a 5 0 0 0 11 0 row b 3394 0 0 0 4543 0 row c 136111 0 0 0 219255 0 row d 0 0 0 0 0
  • Scala 组合器解析器 - 区分数字字符串和变量字符串

    我正在做 Cay Horstmann 的组合器解析器练习 我想知道区分代表数字的字符串和代表匹配语句中变量的字符串的最佳方法 def factor Parser ExprTree wholeNumber expr ident case a
  • IntelliJ IDEA Scala 插件问题

    我对新的 Intellij IDEA 10 和 Scala 插件有疑问 当我在 Scala 源文件中输入任何内容时 编辑器会永久冻结 在其他文件 java 和其他 编辑器中效果很好 结构视图 scala 检查和显示成员功能已关闭 堆大小增加
  • Scala:在运行时获取 mixin 接口

    我需要在运行时从给定的类获取所有接口 全部加载在类加载器中 例如 如果一个类是这样声明的 trait B trait C trait D class A extends B with C with D 我想在运行时获取这些信息 A 取决于
  • Sparklyr - 在 Apache Spark Join 中包含空值

    问题在 Apache Spark Join 中包含空值 https stackoverflow com questions 41728762 including null values in an apache spark join有 Sc

随机推荐