使用 pyspark 连接 PostgreSQL

2024-05-07

我正在尝试使用 pyspark 连接到数据库,并且使用以下代码:

sqlctx = SQLContext(sc)
df = sqlctx.load(
    url = "jdbc:postgresql://[hostname]/[database]",
    dbtable = "(SELECT * FROM talent LIMIT 1000) as blah",
    password = "MichaelJordan",
    user =  "ScottyPippen",
    source = "jdbc",
    driver = "org.postgresql.Driver"
)

我收到以下错误:

知道为什么会发生这种情况吗?

Edit:我正在尝试在我的计算机本地运行代码。


从以下位置下载 PostgreSQL JDBC 驱动程序https://jdbc.postgresql.org/download/ https://jdbc.postgresql.org/download/

然后将数据库配置值替换为您的数据库配置值。

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "/path_to_postgresDriver/postgresql-42.2.5.jar") \
    .getOrCreate()

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/databasename") \
    .option("dbtable", "tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .option("driver", "org.postgresql.Driver") \
    .load()

df.printSchema()

更多信息:https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 pyspark 连接 PostgreSQL 的相关文章

  • PostgreSql“运行安装后步骤...数据库集群初始化失败”

    我是一名 Windows 用户 我花了几个小时不断地安装和卸载 然后才使其正常工作 前 10 次左右才看到标题中的错误消息 我将其作为一个自我回答的问题放在这里 以防止其他人在安装时可能遇到同样的问题 并为像我这样第一次使用 Postgre
  • 纱线上的火花,连接到资源管理器 /0.0.0.0:8032

    我正在我的开发机器 Mac 上编写 Spark 程序 hadoop的版本是2 6 spark的版本是1 6 2 hadoop集群有3个节点 当然都在linux机器上 我在idea IDE中以spark独立模式运行spark程序 它运行成功
  • Spark.sql.shuffle.partitions 的最佳值应该是多少,或者在使用 Spark SQL 时如何增加分区?

    我实际上正在使用 Spark SQLhiveContext sql 它使用 group by 查询 我遇到了 OOM 问题 所以考虑增加价值spark sql shuffle partitions从默认的 200 到 1000 但这没有帮助
  • 最近邻居的 Postgis SQL

    我正在尝试计算最近的邻居 为此 我需要传递一个参数来限制与邻居的最大距离 例如 半径1000米内最近的邻居是哪些 我做了以下事情 我用数据创建了表 id name latitude longitude 之后 我执行了以下查询 SELECT
  • Spark scala 模拟 Spark.implicits 用于单元测试

    当尝试使用 Spark 和 Scala 简化单元测试时 我使用 scala test 和mockito scala 以及mockito Sugar 这只是让你做这样的事情 val sparkSessionMock mock SparkSes
  • 如何创建不返回任何内容的函数

    我想写一个函数pl pgsql 我在用着Postgres 企业管理器 v3并使用 shell 来创建一个函数 但在 shell 中我必须定义返回类型 如果我不定义返回类型 我将无法创建函数 如何创建一个不返回结果的函数 即创建一个新表的函数
  • Google App Engine Flexi 上 Django 的 Postgres 设置

    我正在尝试在应用程序引擎灵活环境中使用 postgres 设置 django 我按照这里的说明进行操作 https cloud google com appengine docs flexible python using cloud sq
  • 从 PySpark RDD 中的每个组中取出前 N 个元素(不使用 groupByKey)

    我有一个如下所示的 RDD dataSource sc parallelize user1 3 blue user1 4 black user2 5 white user2 3 black user2 6 red user1 1 red 我
  • Postgres JSON 数据类型 Rails 查询

    我正在使用 Postgres 的 json 数据类型 但想要使用嵌套在 json 中的数据进行查询 排序 我想在 json 数据类型上使用 where 进行订购或查询 例如 我想查询关注者数量 gt 500 的用户 或者我想按关注者或关注数
  • postgreSQL 在 WAMP 上的集成

    我刚刚在 Windows 7 上安装了 postgreSQL 我正在尝试将 postgreSQL 与 WAMP 服务器集成 为此 我在 httpd conf 和 php ini 文件中进行了以下更改 1个加载模块c path to libp
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 部分唯一索引不适用于冲突子句 PostgreSQL

    表结构 create table example a id integer b id integer c id integer flag integer 部分索引 create unique index u idx on example a
  • 使用 NLog .NET Core 将日志记录到 PostgreSQL DB

    我尝试将日志记录集成到 NET Core 中的数据库 我能够设置 NLog 并将消息记录到 SQL Server 这很容易 但是当我尝试将 DB 切换到 PostgreSQL 时 似乎没有记录任何内容 以下是startup cs中的代码 p
  • 如何加速spark df.write jdbc到postgres数据库?

    我是 Spark 新手 正在尝试使用 df write 加速将数据帧的内容 可以有 200k 到 2M 行 附加到 postgres 数据库 df write format jdbc options url psql url spark d
  • 使用连接池后如何处理过多的并发连接?

    Scenario 假设您有一个拥有大量流量的网站或应用程序 即使使用数据库连接池 性能也会受到真正的打击 站点 应用程序甚至可能崩溃 因为并发连接太多 Question 人们有什么选择来处理这个问题 我的想法 我在想有这个问题的人可以创建多
  • 在Python中检索PostgreSQL数据库的新记录

    在数据库表中 第二列和第三列有数字 将会不断添加新行 每次 每当数据库表中添加新行时 python 都需要不断检查它们 当 sql 表中收到的新行数低于 105 时 python 应打印一条通知消息 警告 数量已降至 105 以下 另一方面
  • pyspark 将 twitter json 流式传输到 DF

    我正在从事集成工作spark streaming with twitter using pythonAPI 我看到的大多数示例或代码片段和博客是他们从Twitter JSON文件进行最终处理 但根据我的用例 我需要所有字段twitter J
  • 了解 Spark 中的 DAG

    问题是我有以下 DAG 我认为当需要洗牌时 火花将工作划分为不同的阶段 考虑阶段 0 和阶段 1 有些操作不需要洗牌 那么为什么 Spark 将它们分成不同的阶段呢 我认为跨分区的实际数据移动应该发生在第 2 阶段 因为这里我们需要cogr
  • 如何使用 Scala 从 Spark 更新 ORC Hive 表

    我想更新 orc 格式的 hive 表 我可以从 ambari hive 视图进行更新 但无法从 sacla spark shell 运行相同的更新语句 objHiveContext sql select from table name 能
  • Django 独特的不工作

    我在从查询中过滤掉重复项时遇到问题 我正在使用 Django 1 4 和 Postgres 8 4 13 我在我的模型对象上使用这个查询 它是一个 jquery 自动完成 term request GET get term field re

随机推荐