Dataproc:使用 PySpark 从 BigQuery 读取和写入数据时出现错误

2024-04-28

我正在尝试读取一些 BigQuery 数据(ID:my-project.mydatabase.mytable[原始名称受保护])来自用户管理的 Jupyter Notebook 实例,内部Dataproc https://cloud.google.com/dataproc?hl=es工作台。我正在尝试的灵感来自于this https://cloud.google.com/dataproc-serverless/docs/guides/bigquery-connector-spark-example?hl=en#submit_a_pyspark_wordcount_batch_workload,更具体地说,代码是(请阅读关于代码本身的一些附加注释):

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, ArrayType, StringType
from google.cloud import bigquery

# UPDATE (2022-08-10): BQ conector added
spark = SparkSession.builder.appName('SpacyOverPySpark') \
                    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.2') \
                    .getOrCreate()

# ------------------ IMPORTING DATA FROM BIG QUERY --------------------------

# UPDATE (2022-08-10): This line now runs...
df = spark.read.format('bigquery').option('table', 'my-project.mydatabase.mytable').load()

# But imports the whole table, which could become expensive and not optimal
print("DataFrame shape: ", (df.count(), len(df.columns)) # 109M records & 9 columns; just need 1M records and one column: "posting"

# I tried the following, BUT with NO success:
# sql = """
# SELECT `posting`
# FROM `mentor-pilot-project.indeed.indeed-data-clean`
# LIMIT 1000000
# """
# df = spark.read.format("bigquery").load(sql)
# print("DataFrame shape: ", (df.count(), len(df.columns)))

# ------- CONTINGENCY PLAN: IMPORTING DATA FROM CLOUD STORAGE ---------------

# This section WORKS (just to enable the following sections)
# HINT: This dataframe contains 1M rows of text, under a single column: "posting"
df = spark.read.csv("gs://hidden_bucket/1M_samples.csv", header=True)

# ---------------------- EXAMPLE CUSTOM PROCESSING --------------------------

# Example Python UDF Python
def split_text(text:str) -> list:
    return text.split()

# Turning Python UDF into Spark UDF
textsplitUDF = udf(lambda z: split_text(z), ArrayType(StringType()))

# "Applying" a UDF on a Spark Dataframe (THIS WORKS OK)
df.withColumn("posting_split", textsplitUDF(col("posting")))

# ------------------ EXPORTING DATA TO BIG QUERY ----------------------------

# UPDATE (2022-08-10) The code causing the error:

# df.write.format('bigquery') \
#   .option('table', 'wordcount_dataset.wordcount_output') \
#   .save()

# has been replace by a code that successfully stores data in BQ:

df.write \
  .format('bigquery') \
  .option("temporaryGcsBucket", "my_temp_bucket_name") \
  .mode("overwrite") \
  .save("my-project.mynewdatabase.mytable")

使用 SQL 查询从 BigQuery 读取数据时,触发的错误为:

Py4JJavaError: An error occurred while calling o195.load.
: com.google.cloud.spark.bigquery.repackaged.com.google.inject.ProvisionException: Unable to provision, see the following errors:

1) Error in custom provider, java.lang.IllegalArgumentException: 'dataset' not parsed or provided.
  at com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule.provideSparkBigQueryConfig(SparkBigQueryConnectorModule.java:65)
  while locating com.google.cloud.spark.bigquery.SparkBigQueryConfig

1 error
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalProvisionException.toProvisionException(InternalProvisionException.java:226)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InjectorImpl$1.get(InjectorImpl.java:1097)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InjectorImpl.getInstance(InjectorImpl.java:1131)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelationInternal(BigQueryRelationProvider.scala:75)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:46)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:332)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:242)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:197)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: 'dataset' not parsed or provided.
    at com.google.cloud.bigquery.connector.common.BigQueryUtil.lambda$parseTableId$2(BigQueryUtil.java:153)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at com.google.cloud.bigquery.connector.common.BigQueryUtil.parseTableId(BigQueryUtil.java:153)
    at com.google.cloud.spark.bigquery.SparkBigQueryConfig.from(SparkBigQueryConfig.java:237)
    at com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule.provideSparkBigQueryConfig(SparkBigQueryConnectorModule.java:67)
    at com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule$$FastClassByGuice$$db983008.invoke(<generated>)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.ProviderMethod$FastClassProviderMethod.doProvision(ProviderMethod.java:264)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.ProviderMethod.doProvision(ProviderMethod.java:173)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalProviderInstanceBindingImpl$CyclicFactory.provision(InternalProviderInstanceBindingImpl.java:185)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalProviderInstanceBindingImpl$CyclicFactory.get(InternalProviderInstanceBindingImpl.java:162)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InjectorImpl$1.get(InjectorImpl.java:1094)
    ... 18 more

向BigQuery写入数据时,出现错误:

Py4JJavaError: An error occurred while calling o167.save.
: java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html

UPDATE:(2022-09-10) 向BigQuery写入数据时出错的问题已经解决,请参考上面的代码以及下面的评论部分。

我究竟做错了什么?


讨论中发现的要点:

  1. 通过以下方式将 BigQuery 连接器添加为依赖项spark.jars=<gcs-uri> or spark.jars.packages=com.google.cloud.spark:spark-bigquery-with-dependencies_<scala-version>:<version>.

  2. 指定正确的表名<project>.<dataset>.<table> format.

  3. 数据帧写入器的默认模式是errorifexists。当写入不存在的表时,数据集必须存在,该表将自动创建。写入现有表时,模式需要设置为"append" or "overwrite" in df.write.mode(<mode>)...save().

  4. 写入 BQ 表时,执行以下任一操作

    a) 直接写入(自支持)0.26.0 https://mvnrepository.com/artifact/com.google.cloud.spark/spark-bigquery-with-dependencies_2.12/0.26.0)

    df.write \
      .format("bigquery") \
      .option("writeMethod", "direct") \
      .save("dataset.table")
    

    b) 或间接写

    df.write \
      .format("bigquery") \
      .option("temporaryGcsBucket","some-bucket") \
      .save("dataset.table")
    

    看到这个doc https://github.com/GoogleCloudDataproc/spark-bigquery-connector#writing-data-to-bigquery.

  5. 通过 SQL 查询从 BigQuery 读取数据时,添加强制属性viewsEnabled=true and materializationDataset=<dataset>:

    spark.conf.set("viewsEnabled","true")
    spark.conf.set("materializationDataset","<dataset>")
    
    sql = """
      SELECT tag, COUNT(*) c
      FROM (
        SELECT SPLIT(tags, '|') tags
        FROM `bigquery-public-data.stackoverflow.posts_questions` a
        WHERE EXTRACT(YEAR FROM creation_date)>=2014
      ), UNNEST(tags) tag
      GROUP BY 1
      ORDER BY 2 DESC
      LIMIT 10
      """
    df = spark.read.format("bigquery").load(sql)
    df.show()
    

    看到这个doc https://github.com/GoogleCloudDataproc/spark-bigquery-connector#reading-data-from-a-bigquery-query.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Dataproc:使用 PySpark 从 BigQuery 读取和写入数据时出现错误 的相关文章

  • 如何获取右侧数据框中不在左侧数据框中的数据

    我有两个数据帧 我正在尝试输出其中一个数据帧中的数据 而不是另一个数据帧中的数据 我可以使用第一个数据帧中的数据 但不能使用第二个数据帧中的数据 only new old merge new outer on Employee ID Ben
  • 将嵌套循环计算转换为 Numpy 以加速

    我的Python程序的一部分包含以下代码段 其中一个新的网格 是根据旧网格中找到的数据计算的 网格是二维浮点数列表 该代码使用了三个 for 循环 for t in xrange 0 t step for h in xrange 1 hei
  • 如何使用 QWebView 显示 html。 Python?

    如何在控制台中显示 HTML 格式的网页 import sys from PyQt4 QtGui import QApplication from PyQt4 QtCore import QUrl from PyQt4 QtWebKit i
  • 可以memmap pandas系列。数据框怎么样?

    看来我可以通过创建 mmap d ndarray 并使用它来初始化系列来对 python 系列的底层数据进行内存映射 def assert readonly iloc try iloc 0 999 Should be non editabl
  • 从内存中发送图像

    我正在尝试为 Discord 机器人实现一个系统 该系统可以动态修改图像并将其发送给机器人用户 为此 我决定使用 Pillow PIL 库 因为它对于我的目的来说似乎简单明了 这是我的工作代码的示例 它加载一个示例图像 作为测试修改 在其上
  • argparse 更改参数的定义

    我按如下方式设置参数解析器 parser argparse ArgumentParser parser add argument point help enter a point e g 2 3 4 parser parse args po
  • pybind11:如何将 c++ 和 python 代码打包到一个包中?

    我正在尝试使用 CMake 和 pybind 11 将现有的 Python 代码和新的 C 11 代码打包在一起 我认为我缺少一些可以添加到 CMake 脚本中的简单内容 但在任何地方都找不到它 pybind11 示例只有 C 代码和没有P
  • 在我的 Mac 上以 root 身份运行 pip 时出现“权限被拒绝”

    我开始使用我的 Mac 来安装 Python 包 就像我在工作中使用 Windows PC 一样 然而在我的 Mac 上我经常遇到没有权限写入日志文件或站点包时出错 于是我想到了跑步pip install
  • Selenium 上的切换窗口

    我在 Python 中使用 Selenium 和 PhantomJS 我需要打开一个新窗口并控制它 出于测试目的 我这样做 from selenium import webdriver driver webdriver PhantomJS
  • 带有redirect_uri的social-auth-app-django Facebook后端状态

    我知道我的问题听起来像是重复的 但我到处寻找但没有找到任何解决方案 我正在努力为我的 django web 应用程序实现社交登录 到目前为止 谷歌 推特和雅虎登录均按预期工作 但facebook总是给出以下错误 URL 被阻止 此重定向失败
  • 在Python中将用户昵称转换为正式名字

    我正在尝试根据 Python 中的用户名字和姓氏映射来自不同系统的用户 一个问题是 名字在很多情况下都是 昵称 例如 对于用户来说 他的名字在一个系统中是 Dave 而在另一个系统中是 David python 中有没有简单的方法可以将这些
  • 将 Selenium 与 PyCharm CE 结合使用

    我正在尝试将 Selenium 与 PyCharm CE 一起使用 我已经使用 pip install Selenium 安装了 Selenium 并且可以通过终端使用它 但是当我尝试将它与 PyCharm 一起使用时 出现导入错误 Imp
  • Pandas如何将多个函数应用于数据框

    有没有办法像 DataFrameGroupBy agg 函数那样将函数列表应用于 DataFrame 中的每一列 我发现了一个丑陋的方法来做到这一点 df pd DataFrame dict one np random uniform 0
  • 如何在Python中获取套接字的外部IP?

    当我打电话时socket getsockname 在套接字对象上 它返回我的机器的内部 IP 和端口的元组 但是 我想找回我的外部IP 最便宜 最有效的方式是什么 如果没有外部服务器的配合 这是不可能的 因为您和另一台计算机之间可能存在任意
  • Spark:出现心跳错误后丢失数据

    我有一个在 Spark 集群上运行的 Python 程序 有四个工作线程 它处理一个包含大约 1500 万条记录的巨大 Oracle 表 检查结果后发现大约有600万条记录没有插入 我的写入功能如下 df write format jdbc
  • 检测反射 DLL 注入

    在过去的几年中 恶意软件 以及一些渗透测试工具 如 Metasploit 的 meterpreter 负载 已经开始使用反射 DLL 注入 PDF http www harmonysecurity com files HS P005 Ref
  • Django admin.py 未知命令:'collectstatic'

    我已经从 django 1 2 7 升级到 django 1 5 1我正在使用 python 2 6 6当我尝试跑步时python manage py collectstatic i get 未知命令 collectstatic 从我的设置
  • django 南迁移,不设置默认值

    我使用 South 来迁移我的 Django 模型 然而 南方有一个令人讨厌的错误 它不会在 Postgres 数据库中设置默认值 例子 created at models DateTimeField default datetime no
  • 访问 Scrapy 内的 django 模型

    是否可以在 Scrapy 管道内访问我的 django 模型 以便我可以将抓取的数据直接保存到我的模型中 我见过this https scrapy readthedocs org en latest topics djangoitem ht
  • matplotlib imshow() 和像素强度

    我试图了解矩阵的值是如何输入到 matplotlib 的imshow 函数确定灰度模式下像素的强度 考虑示例代码 import random import matplotlib pyplot as plt import matplotlib

随机推荐