The to_gbq
功能允许您从a上传数据Pandas到 BigQuery 表中。
在本教程中,您将学习如何从 Pandas 导出数据数据框使用 BigQueryto_gbq
功能。
安装所需的库
首先,您需要确保您拥有pandas-gbq
库已安装。
该库提供了基本功能,例如to_gbq
我们将使用的。
!pip install pandas-gbq
Output:
Collecting pandas-gbq
...
Successfully installed pandas-gbq-x.x.x
设置 Google Cloud SDK
如果您尚未安装 Google Cloud SDK,请按照说明操作:
!curl https://sdk.cloud.google.com | bash
重新启动 shell 或终端以添加gcloud
到你的道路。
安装 SDK 后,您可以使用以下方式进行身份验证:
!gcloud auth login
系统会提示您使用 Google 凭据登录。成功登录后,您的凭据将存储在本地并用于将来的请求。
通过编程访问pandas-gbq
,您需要设置应用程序默认凭据。跑步:
!gcloud auth application-default login
您将被引导完成身份验证过程。完成此步骤后,您的 Python 脚本将使用pandas-gbq
可以与 BigQuery 交互。
to_gbq 语法和参数
The to_gbq
函数语法如下:
DataFrame.to_gbq(destination_table, project_id=None,
chunk_size=None, reauth=False, if_exists='fail',
auth_local_webserver=False, table_schema=None,
location=None, progress_bar=True, credentials=None)
参数:
-
目的地表(str):您想要以“dataset.tablename”格式写入数据的表的名称。
-
项目ID(str,可选):您的 Google Cloud 项目 ID。如果没有提供,则会从环境中推断。
-
chunk_size(整数,可选):要从 DataFrame 插入到每个块中的行数。默认情况下,它会一次插入所有行。
-
reauth(布尔值,默认 False):强制 Google 用户重新进行身份验证,这在使用多个帐户时非常有用。
-
if_exists (str, 默认‘失败’):表已存在时的行为。选项包括“失败”、“替换”和“追加”。
-
auth_local_webserver(布尔值,默认 False):使用本地网络服务器流进行身份验证。
-
table_schema(字典列表或 pandas.DataFrame.schema,可选):如果需要,定义 BigQuery 表的架构。
-
位置(str,可选):表的地理位置。默认为您的 GCP 帐户中设置的位置。
-
Progress_bar(布尔值,默认 True):显示上传进度条。
-
凭据(google.auth.credentials.Credentials,可选):用于通过 GCP 进行身份验证的凭据。默认情况下,它使用应用程序默认凭据。
指定数据集和表destination_table
The destination_table
字符串应遵循“dataset.tablename”格式。举个例子:
如果您想将数据上传到“sales_data”数据集中的“monthly_sales”表,您的destination_table
字符串将是:
destination_table = 'sales_data.monthly_sales'
使用时to_gbq
,它看起来像这样:
df.to_gbq(destination_table='sales_data.monthly_sales', project_id='your_project_id')
Note:如果指定的表不存在,to_gbq
将为您创建它(基于if_exists
范围)。
使用if_exists
范围
The if_exists
中的参数to_gbq
方法控制当您尝试将 DataFrame 上传到已存在的 BigQuery 表时的行为。
默认行为if_exists='fail'
这是默认行为。如果 BigQuery 中已存在该表,则上传操作将失败,并且不会进行任何更改。
Example:
df.to_gbq(destination_table='dataset_name.table_name', project_id='your_project_id', if_exists='fail')
Output:
如果表已经存在:
TableCreationError: Table dataset_name:table_name already exists.
替换现有数据if_exists='replace'
如果该表已存在于 BigQuery 中,它将被新数据覆盖。本质上,现有表将被删除,并创建一个新表。
Example:
df.to_gbq(destination_table='dataset_name.table_name', project_id='your_project_id', if_exists='replace')
Output:
表是否存在:
1 out of 1 chunks uploaded.
将数据追加到现有表if_exists='append'
如果 BigQuery 中已存在该表,则 DataFrame 中的数据将附加到该表中。
如果该表不存在,则会创建该表。
Example:
df.to_gbq(destination_table='dataset_name.table_name', project_id='your_project_id', if_exists='append')
Output:
表是否存在:
1 out of 1 chunks uploaded.
手动表架构定义
该架构被定义为字典列表,其中每个字典代表一个列、其数据类型和可选属性。
每本词典一般都有:
-
name:列的名称。
-
type:BigQuery 格式的列的数据类型。
-
模式(可选):定义列是否可以有 NULL 值或者是否是重复字段。默认为“NULLABLE”。
假设您有以下 DataFrame:
import pandas as pd
data = {
'Product': ['A', 'B', 'C'],
'Sales': [100, 150, 200],
'Date': ['2023-01-01', '2023-01-02', '2023-01-03']
}
df = pd.DataFrame(data)
定义架构
对于我们的 DataFrame,手动模式定义如下所示:
table_schema = [
{'name': 'Product', 'type': 'STRING'},
{'name': 'Sales', 'type': 'INT64'},
{'name': 'Date', 'type': 'DATE'}
]
Use to_gbq
具有指定模式
您可以使用table_schema
指定模式的参数:
df.to_gbq(destination_table='sales_data.product_sales', project_id='your_project_id', table_schema=table_schema, if_exists='replace')
Output:
1 out of 1 chunks uploaded.
处理嵌套和重复的列
您可以使用table_schema
参数来指定嵌套列或重复列的架构。
具有嵌套数据的示例数据框:
import pandas as pd
data = {
'Product': ['A', 'B'],
'Details': [{'Color': 'Red', 'Size': 'Large'}, {'Color': 'Blue', 'Size': 'Medium'}]
}
df = pd.DataFrame(data)
模式定义:
对于嵌套列,您的模式字典中需要一个“fields”键:
table_schema = [
{'name': 'Product', 'type': 'STRING'},
{'name': 'Details', 'type': 'RECORD', 'fields': [
{'name': 'Color', 'type': 'STRING'},
{'name': 'Size', 'type': 'STRING'}
]}
]
具有重复数据的 DataFrame:
重复列允许您拥有特定字段的数据数组。它对于一对多关系很有用。
data = {
'Product': ['A', 'B'],
'Tags': [['Outdoor', 'Summer'], ['Indoor', 'Winter']]
}
df = pd.DataFrame(data)
模式定义
对于重复列,请使用 ‘mode’: ‘REPEATED’:
table_schema = [
{'name': 'Product', 'type': 'STRING'},
{'name': 'Tags', 'type': 'STRING', 'mode': 'REPEATED'}
]
现在你使用to_gbq
像这样:
df.to_gbq(destination_table='sales_data.product_info', project_id='your_project_id', table_schema=table_schema, if_exists='replace')
对大型数据帧进行分块
The chunksize
中的参数to_gbq
函数指定每个块的行数。使用方法如下:
import pandas as pd
import pandas_gbq
# Sample large DataFrame
data = {'column1': range(1, 100001), 'column2': range(100001, 1, -1)}
large_df = pd.DataFrame(data)
# Using chunksize with to_gbq
pandas_gbq.to_gbq(large_df, 'your_dataset.your_table', project_id='your_project_id', if_exists='replace', chunksize=5000)
在此示例中,大型 DataFramelarge_df
100,000 行被分成每块 5,000 行的块,从而导致 20 次单独上传到 BigQuery。
使用进度栏跟踪上传进度
The progress_bar
参数输入to_gbq
功能允许您跟踪上传进度。
df.to_gbq(destination_table='dataset_name.table_name', project_id='your_project_id', progress_bar=True)
Output:
控制台或 Jupyter Notebook 中的动态进度条会随着数据块的上传而更新:
Uploading: 100%|█████████████████████████| 5/5 [00:05<00:00, 1.00s/rows]
Logging
您可以设置日志记录以获取有关上传过程的详细信息,这对于调试特别有用。
首先,配置Python的日志模块:
import logging
logging.basicConfig(level=logging.INFO)
有了这个设置,pandas_gbq
将提供有关上传过程的详细日志。
Output:
INFO:pandas_gbq.gbq:Uploading 1000000 rows to table dataset_name.table_name
现实世界的例子
想象一下,您运行一个电子商务网站,其中包含保存交易记录的 PostgreSQL 数据库。
您希望每天午夜将此事务数据同步到 BigQuery,以运行复杂的分析、生成报告,或许还可以将其与 Data Studio 等其他 GCP 工具一起使用。
从 PostgreSQL 提取数据
首先,您需要从本地 PostgreSQL 实例中提取相关数据:
import psycopg2
import pandas as pd
conn = psycopg2.connect(
host="localhost",
database="your_database",
user="your_user",
password="your_password"
)
query = "SELECT * FROM transactions WHERE transaction_date >= current_date - interval '1 day';"
df = pd.read_sql_query(query, conn)
conn.close()
将数据加载到 BigQuery
现在你已经有了 pandas DataFrame 中的数据,请使用to_gbq
:
import pandas_gbq
destination = 'your_dataset.transactions'
df.to_gbq(destination_table=destination, project_id='your_project_id', if_exists='append')
自动化
每日同步:
使用任务调度程序:
-
- 在 Linux 上,使用
cron
.
- 在 Windows 上,使用任务计划程序。
- 如果您正在利用 GCP,请考虑使用 Cloud Functions 或 Cloud Scheduler。
对于这个工作:
Linux 计划任务示例:
0 0 * * * /usr/bin/python3 /path_to_script/your_script.py
这将在每天午夜运行该脚本。
Resource
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_gbq.html