我正在使用 pyspark/SQL。我有一个包含三列的表(MAIN_TABLE):
DATABASE_NAME
TABLE_NAME
SOURCE_TYPE
我想从 DATABASE_NAME 和 TABLE_NAME 列中的主表下找到的实际数据库和表中获取所有数据。但是,我只想从具有 SOURCE_TYPE = 'STANDARD' 的表中抓取数据,其他任何内容都不应该抓取。
我基本上需要在 MAIN_TABLE 下找到的所有表的数据的并集,其中 SOURCE_TYPE = 'STANDARD' 并且它们满足某些条件。我尝试运行,但它没有抓取 MAIN_TABLE 下具有 SOURCE_TYPE = 'STANDARD' 的所有表下找到的数据。我看起来好像缺少什么吗?
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
# Create a SparkSession
spark = SparkSession.builder.appName("InsertData").getOrCreate()
# Filter the tables where SOURCE_TYPE = 'STANDARD'
config_df = spark.table("MAIN_TABLE").filter("SOURCE_TYPE = 'STANDARD'")
# Initialize an empty DataFrame to store the result
result_df = None
# Loop through the filtered tables
for row in config_df.collect():
database_name = row["database_name"]
table_name = row["table_name"]
# Generate a dynamic SQL query to select data from the source table
sql_query = f"""
SELECT
header.profile,
attributes.id,
header.location,
'SOURCE_TYPE' as source_type,
header.actionname as actionname,
transform.date,
header.ip,
header.country,
'{database_name}' as source_database_name,
'{table_name}' as source_event_name
FROM {database_name}.{table_name}
"""
# Execute the SQL query and create a DataFrame
source_data_df = spark.sql(sql_query)
# Union the source_data_df with the result_df
if result_df is None:
result_df = source_data_df
else:
result_df = result_df.unionAll(source_data_df)
# Insert the combined data into MAIN.NEW_RESULTED_TABLE
result_df.write.mode("append").saveAsTable("MAIN.NEW_RESULTED_TABLE")
# Stop the SparkSession
spark.stop()
为了从 SOURCE_TYPE = 'ACTUAL' 的所有表中获取所有数据,我是否有什么做得不对的地方?