如何从 PySpark 中某个表中找到的多个表中获取所有数据?

2024-05-07

我正在使用 pyspark/SQL。我有一个包含三列的表(MAIN_TABLE):

DATABASE_NAME
TABLE_NAME
SOURCE_TYPE

我想从 DATABASE_NAME 和 TABLE_NAME 列中的主表下找到的实际数据库和表中获取所有数据。但是,我只想从具有 SOURCE_TYPE = 'STANDARD' 的表中抓取数据,其他任何内容都不应该抓取。

我基本上需要在 MAIN_TABLE 下找到的所有表的数据的并集,其中 SOURCE_TYPE = 'STANDARD' 并且它们满足某些条件。我尝试运行,但它没有抓取 MAIN_TABLE 下具有 SOURCE_TYPE = 'STANDARD' 的所有表下找到的数据。我看起来好像缺少什么吗?

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Create a SparkSession
spark = SparkSession.builder.appName("InsertData").getOrCreate()

# Filter the tables where SOURCE_TYPE = 'STANDARD'
config_df = spark.table("MAIN_TABLE").filter("SOURCE_TYPE = 'STANDARD'")

# Initialize an empty DataFrame to store the result
result_df = None

# Loop through the filtered tables
for row in config_df.collect():
    database_name = row["database_name"]
    table_name = row["table_name"]

    # Generate a dynamic SQL query to select data from the source table
    sql_query = f"""
        SELECT
            header.profile,
            attributes.id,
            header.location,
            'SOURCE_TYPE' as source_type,
            header.actionname as actionname,
            transform.date,
            header.ip,
            header.country,
            '{database_name}' as source_database_name,
            '{table_name}' as source_event_name
        FROM {database_name}.{table_name}
        """

    # Execute the SQL query and create a DataFrame
    source_data_df = spark.sql(sql_query)

    # Union the source_data_df with the result_df
    if result_df is None:
        result_df = source_data_df
    else:
        result_df = result_df.unionAll(source_data_df)

# Insert the combined data into MAIN.NEW_RESULTED_TABLE
result_df.write.mode("append").saveAsTable("MAIN.NEW_RESULTED_TABLE")

# Stop the SparkSession
spark.stop()

为了从 SOURCE_TYPE = 'ACTUAL' 的所有表中获取所有数据,我是否有什么做得不对的地方?


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 PySpark 中某个表中找到的多个表中获取所有数据? 的相关文章

  • Flutter sqflite 插入列表

    我正在尝试将列表插入到 flutter 中的 sql 数据库中 但我不知道该怎么做 有人可以帮助我吗 当我初始化 mi 数据库时 我有这个 Directory documentsDirectory await getApplicationD
  • “已经有一个与此命令关联的打开的 DataReader,必须先将其关闭。”

    我正在开发需要连接到另一个数据库以获取一些数据的应用程序 为此 我决定使用 SqlConnection reader 等 我需要执行一些查询 例如首先我需要获取某个用户的卡 ID 之后我需要通过该卡 ID 获取一些数据 这是我的代码 reg
  • 参数的性能不如硬编码值

    我有一个执行得很糟糕的存储过程 当我声明一个变量时 设置它的值 然后在 where 子句中使用它 该语句需要一个多小时才能运行 当我对 where 子句中的变量进行硬编码时 它的运行时间不到一秒 我开始通过执行计划来查找问题所在 看起来当我
  • 在 Android 版 ORMLite 中加入类会引发 SQL 异常:找不到外部类,反之亦然

    我正在尝试使用 QueryBuilder 为两个不同的类创建一个联接查询 一个Product类和一个Coupon类 引用 Product 属性 storeId public class Coupon DatabaseField column
  • 编写 PHP SQL 更新语句的最佳方法

    我有这个 PHP SQL 语句 updateCategory UPDATE category SET name name description description parent parent active active WHERE i
  • 为什么 Sql Server 2000 上的 TSQL 对小数点的舍入不一致?

    我正在尝试计算美元金额的折扣百分比 在 50 的情况下 有时你会得到半分钱 我需要将其四舍五入到最接近的一分钱 在Sql中 我的计算如下 round retail 0 5 2 0 如果我采用以下值 我会得到不同的结果 4 39 2 49 不
  • ORACLE 在立即执行中批处理 DDL 语句

    我正在尝试在一个 Execute Immediate 语句中运行多个 ddl 语句 我认为这会很简单 但看来我错了 想法是这样的 declare v cnt number begin select count into v cnt from
  • 帮助将二进制图像数据从 SQL Server 读取到 PHP 中

    我似乎无法找到将二进制数据从 SQL 服务器读取到 PHP 的方法 我正在开发一个项目 需要能够将图像直接存储在 SQL 表中 而不是文件系统上 目前 我一直在使用这样的查询 插入 myTable 文档 选择 从 OPENROWSET BU
  • REPLACE MYSql 中的新行字符不起作用

    我执行了以下查询 由于某种原因它没有替换数据库中的换行符 它说 Rows matches 1 但没有变化 有什么问题吗 mysql gt UPDATE aboutme SET abouttext REPLACE abouttext n WH
  • 分组依据检索 3 个值

    我有以下查询 SELECT Cod MIN Id AS id Min MAX Id AS id Max retrieve value in the middle COUNT AS Tot FROM Table a NOLOCK GROUP
  • 如何防止用户生成的 Sql 查询上的 Sql 注入

    我有一个项目 私有的 ASP net 网站 受 https 密码保护 其中要求之一是用户能够输入直接查询数据库的 Sql 查询 我需要能够允许这些查询 同时防止它们对数据库本身造成损坏 以及访问或更新它们不应该访问 更新的数据 我制定了以下
  • 在旧版本的 MySQL (<5.5.0) 中模拟 TO_SECONDS()

    出于性能和简单性的原因 我想以秒的形式获取 MySQL 3 x 服务器中 DATETIME 列的内容 或者实际上任何数字类型 我只是想在使用 UNIX TIMESTAMP 时避免所有明显的时区问题 the我表中的日期确实来自不同的区域设置
  • 如何使用 with open 在 pySpark 中打开存储在 HDFS 中的文件

    如何打开存储在 HDFS 中的文件 这里输入文件来自 HDFS 如果我按如下方式提供文件 我将无法打开 它将显示为找不到文件 from pyspark import SparkConf SparkContext conf SparkConf
  • 按年份进行透视并获取 2020 年以来的金额总和

    我有这样的数据 我想按年份旋转并仅显示 2020 年以来的总数 我该如何实现这一目标 您可以使用以下方法实现此目的PIVOT https spark apache org docs 3 2 1 api python reference ap
  • EF4 和 SQL Server 2000

    我使用 EF4 和 SQL Server 2005 开发了我的网站 但当转移到临时站点时 发现他们使用 SQL Server 2000 现在我收到此错误 我认为该错误与 SQL Server 2000 有关 Incorrect syntax
  • MySQL 中有“connect by”替代方案吗?

    如果我使用 Oracle 有connect by可用于创建分层查询的关键字 目前我正在一个项目中使用MySQL 我想知道是否有替代方案connect by在 MySQL 中 我尝试过谷歌 但到目前为止还没有结果 我想要实现的是通过一个查询从
  • Postgres 上的 C 语言环境和 Posix 语言环境有什么区别?

    我知道 Postgres 上的数据库区域设置负责国家字符的正确顺序 正确的小写 大写等 但为什么有两种语言中立的语言环境 posix and c 它们之间有什么区别 还是只是一个中立的语言环境有两个不同的名称 UPDATE正如 Magnus
  • MSSQL:如何使用代码编写存储过程创建脚本?

    我正在尝试使用一个数据库中存在但另一个数据库中不存在的 information schema routines 查询存储过程定义列表 SELECT t1 Routine Definition FROM server1 MyDatabase
  • 如何比较 Postgresql 中日期时间字段中的日期?

    在比较 postgresql Windows 中的版本 9 2 4 中的日期时 我遇到了一个奇怪的情况 我的表中有一列说update date与类型timestamp without timezone 客户可以仅使用日期搜索此字段 例如 2
  • 如何使用 ALTER TABLE 添加新列并使其唯一?

    我该如何使用ALTER TABLE添加新列并使其独一无二 取决于 DBMS 但我认为以下内容相当可移植 ALTER TABLE table name ADD column name datatype ALTER TABLE table na

随机推荐

  • 防止被 0 除的 Typescript 类型

    我正在使用打字稿创建一个用于培训目的的计算系统 但在除法过程中出现打字错误 您知道如何解决吗 type Variable value number resolve gt number type NoZeroVariable value Om
  • 从其他模块调用的数组扩展

    其他模块 例如 XCTest 项目 无法使用数组扩展方法 为了简单起见 下面的代码什么也不做 但可以用来重现错误 import Foundation extension Array mutating func myMethod toInde
  • 将选择标准添加到 read.table

    让我们采用以下我导入的数据集的简化版本read table a lt as data frame c M M F F F b lt as data frame c 25 22 33 17 18 df lt cbind a b colname
  • 在Python中获取目录基名的优雅方法?

    我有几个脚本将目录名称作为输入 并且我的程序在这些目录中创建文件 有时我想获取给程序的目录的基本名称 并用它在目录中创建各种文件 例如 directory name given by user via command line output
  • 如何在S3中存储数据并允许用户使用rails API / iOS客户端以安全的方式访问?

    我是编写 Rails 和 API 的新手 我需要一些有关 S3 存储解决方案的帮助 这是我的问题 我正在为 iOS 应用程序编写一个 API 用户在 iOS 上使用 Facebook API 登录 服务器根据 Facebook 向 iOS
  • 调用泛型类型的方法?

    为什么下面的代码在 Delphi XE 中会产生错误 unit UTest interface type TTest class public procedure Foo
  • 设置角度组件的完整高度

    我无法让我的列表成为全高 我的代码由于嵌套组件而更加复杂 但我仍然可以使用此代码来复制它 这是一个笨蛋 http plnkr co edit R0QgLz8cjyRHYOLf4uJW http plnkr co edit R0QgLz8cj
  • 在散景中隐藏轴

    如何在散景图中隐藏 x 轴和 y 轴 我已经根据此进行了检查和尝试 p1 figure visible None p1 select type Axis visible 0 xaxis Axis plot p1 visible 0 和喜欢h
  • 您可以使用 Openpyxl 将全名拆分为名字和姓氏吗?

    我有一个 Excel 文件 我一直在尝试使用 openpyxl 将列 全名 拆分为两个单独的名字和姓氏列 例如 我有 from openpyxl import Workbook load workbook wb load workboo p
  • 在 Swift 中使用 enumeratorAtUrl 从 NSFileManager 返回目录枚举器时出现问题

    我试图从 NSFileManager 方法 enumeratorAtUrl 返回 NSDirectoryEnumerator 对象 这导致编译器错误 Cannot convert the expressions type NSDirecto
  • 就地修改 XML 文件?

    假设我有以下 XML 文件
  • 如何使用 gcloud 凭据对 Dialogflow API 进行身份验证

    我有一个 Node JS 应用程序 可以向 Dialogflow 代理发出请求 我实际上使用基于临时令牌的请求 但是我如何更改它以通过谷歌服务凭证来做到这一点 https cloud google com docs authenticati
  • 使用 Azure AD B2C 登录 Xamarin Android 应用

    经过一周的研究可与 Azure AD B2C 一起使用 Xamarin 以 Android 平台 而不是 Xamarin Forms 为目标的身份验证原理后 我终于寻求一些建议 我有一个带有 登录 按钮的活动 我想通过按钮的触摸事件登录到
  • 蓝牙 LE:地址类型

    我正在研究 iBeacon 技术 但我找不到有关地址类型的特定问题的任何答案 我找到了解释地址类型的文档 蓝牙规范 但我似乎找不到如何在两种类型 公共和随机 之间进行选择 这是我发现它的一个例子 它是由 Raspberry PI 上的 iB
  • React Native:如何在组件中添加脚本标签

    我正在尝试在 React Native 应用程序的组件内添加标签 下面是我的代码 它似乎不起作用 谁能告诉我如何解决这个问题 import React Component from react import PropTypes from p
  • Tensorflow无法分配设备进行操作

    我正在尝试跑步NVidia 脸部生成器演示 https github com tkarras progressive growing of gans在我的电脑上 我使用的是 Windows 10 我已经下载了源代码 并尝试按照页面下方的步骤
  • WPF DataGrid DataBindingComplete 事件在哪里?

    数据绑定完成后 我需要采取一些操作 例如 根据其他一些单元格使某些单元格只读 在WinForm DataGridView中 我曾经在DataBindingComplete事件中执行此操作 但是 我在 WPF DataGrid 中找不到这样的
  • CouchDB 视图中的链接文档

    我很难理解 CouchDB链接文档 http wiki apache org couchdb Introduction to CouchDB views Linked documents特征 我有两个types存储在单个 CouchDB 数
  • asp.net mvc 3 中模糊的远程属性验证

    asp net mvc 3 中的内置远程属性会执行 onchange 验证 我希望它在模糊时验证 有没有办法自定义它 或者还有其他东西可以这样做 我确信这是一个非常普遍的需求 你可以设置默认值 http docs jquery com Pl
  • 如何从 PySpark 中某个表中找到的多个表中获取所有数据?

    我正在使用 pyspark SQL 我有一个包含三列的表 MAIN TABLE DATABASE NAME TABLE NAME SOURCE TYPE 我想从 DATABASE NAME 和 TABLE NAME 列中的主表下找到的实际数