PySpark 2.x:以编程方式将 Maven JAR 坐标添加到 Spark

2024-02-17

以下是我的 PySpark 启动片段,非常可靠(我已经使用它很长时间了)。今天我添加了两个 Maven 坐标,如图所示spark.jars.packages选项(有效地“插入”Kafka 支持)。现在通常会触发依赖项下载(由 Spark 自动执行):

import sys, os, multiprocessing
from pyspark.sql import DataFrame, DataFrameStatFunctions, DataFrameNaFunctions
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as sFn
from pyspark.sql.types import *
from pyspark.sql.types import Row
  # ------------------------------------------
  # Note: Row() in .../pyspark/sql/types.py
  # isn't included in '__all__' list(), so
  # we must import it by name here.
  # ------------------------------------------
 
num_cpus = multiprocessing.cpu_count()        # Number of CPUs for SPARK Local mode.
os.environ.pop('SPARK_MASTER_HOST', None)     # Since we're using pip/pySpark these three ENVs
os.environ.pop('SPARK_MASTER_POST', None)     # aren't needed; and we ensure pySpark doesn't
os.environ.pop('SPARK_HOME',        None)     # get confused by them, should they be set.
os.environ.pop('PYTHONSTARTUP',     None)     # Just in case pySpark 2.x attempts to read this.
os.environ['PYSPARK_PYTHON'] = sys.executable # Make SPARK Workers use same Python as Master.
os.environ['JAVA_HOME'] = '/usr/lib/jvm/jre'  # Oracle JAVA for our pip/python3/pySpark 2.4 (CDH's JRE won't work).
JARS_IVY_REPO = '/home/jdoe/SPARK.JARS.REPO.d/'

# ======================================================================
# Maven Coordinates for JARs (and their dependencies) needed to plug
# extra functionality into Spark 2.x (e.g. Kafka SQL and Streaming)
# A one-time internet connection is necessary for Spark to autimatically
# download JARs specified by the coordinates (and dependencies).
# ======================================================================
spark_jars_packages = ','.join(['org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0',
                                'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0',])
# ======================================================================
spark_conf = SparkConf()
spark_conf.setAll([('spark.master', 'local[{}]'.format(num_cpus)),
                   ('spark.app.name', 'myApp'),
                   ('spark.submit.deployMode', 'client'),
                   ('spark.ui.showConsoleProgress', 'true'),
                   ('spark.eventLog.enabled', 'false'),
                   ('spark.logConf', 'false'),
                   ('spark.jars.repositories', 'file:/' + JARS_IVY_REPO),
                   ('spark.jars.ivy', JARS_IVY_REPO),
                   ('spark.jars.packages', spark_jars_packages), ])

spark_sesn            = SparkSession.builder.config(conf = spark_conf).getOrCreate()
spark_ctxt            = spark_sesn.sparkContext
spark_reader          = spark_sesn.read
spark_streamReader    = spark_sesn.readStream
spark_ctxt.setLogLevel("WARN")

但是,当我运行代码片段时,插件不会下载和/或加载(例如./python -i init_spark.py),正如他们应该的那样。

这种机制曾经有效,但后来就停止了。我缺少什么?

先感谢您!


在这种帖子中,问题比答案更有价值,因为上面的代码可以工作,但在 Spark 2.x 文档或示例中找不到。

以上是我如何通过 Maven 坐标以编程方式向 Spark 2.x 添加功能。我让这个工作正常,但后来它停止工作了。为什么?

当我在 a 中运行上面的代码时jupyter notebook,笔记本在幕后已经通过我的方式运行了相同的代码片段PYTHONSTARTUP脚本。那PYTHONSTARTUP脚本与上面的代码相同,但省略了maven坐标(有意)。

那么,这个微妙的问题是如何出现的:

spark_sesn = SparkSession.builder.config(conf = spark_conf).getOrCreate()

因为 Spark 会话已经存在,所以上面的语句只是重用了现有会话 (.getOrCreate()),而该会话没有加载 jar/库(同样,因为我的 PYTHONSTARTUP 脚本故意省略了它们)。这就是为什么最好将 print 语句放在 PYTHONSTARTUP 脚本中(否则这些脚本是静默的)。

最后,我只是忘了这样做:$ unset PYTHONSTARTUP在开始之前JupyterLab / Notebook daemon.

我希望这个问题对其他人有帮助,因为这就是如何以编程方式向 Spark 2.x(在本例中为 Kafka)添加功能。请注意,您需要互联网连接才能从 Maven Central 一次性下载指定的 jar 和递归依赖项。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark 2.x:以编程方式将 Maven JAR 坐标添加到 Spark 的相关文章

  • 需要根据数据框中的行号应用不同的公式

    我正在努力在数据框中找到某种移动平均值 该公式将根据正在计算的行数而变化 实际场景是我需要计算Z列 Edit 2 以下是我正在使用的实际数据 Date Open High Low Close 0 01 01 2018 1763 95 176
  • 按每个元素中出现的数字对字符串列表进行排序[重复]

    这个问题在这里已经有答案了 我有一个脚本 其目的是对不断下载到服务器上的空间数据集文件进行排序和处理 我的列表目前大致如下 list file t00Z wrff02 grib2 file t00Z wrff03 grib2 file t0
  • 学习Python中的解析器

    我记得我读过有关解析器的内容 您只需提供一些示例行 它就知道如何解析某些文本 它只是确定两条线之间的差异 以了解可变部分是什么 我以为它是用 python 写的 但我不确定 有谁知道那是什么图书馆吗 可能你的意思是模板制作器 http co
  • 使用ideone时如何传入命令行参数?

    我正在使用 ideone 在线解释器 http ideone com http ideone com 来测试一些 C 和 Python 程序 如何指定命令行参数而不是使用 STDIN 输入 看起来你不能 但是快速破解应该做的伎俩 stati
  • pandas read_csv 之前预处理数据文件

    我使用 SAP 的数据输出 但它既不是 CSV 因为它不引用包含其分隔符的字符串 也不是固定宽度 因为它具有多字节字符 它是一种 固定宽度 字符 为了将其放入 pandas 我当前读取文件 获取分隔符位置 对分隔符周围的每一行进行切片 然后
  • 我可以同时打开两个 Tkinter Windows 吗?

    可以同时打开2个窗口吗 import tkinter as Tk import random import math root Tk Tk canvas Tk Canvas root background image Tk PhotoIma
  • 如何在plotly(python)中的刻度标签和图形之间添加空格?

    如果我使用绘图创建水平条形图 则每个条形的标签都与图表相对应 我想在标签和图表之间添加一些空间 填充 边距 我怎样才能做到这一点 Example import plotly offline as py import plotly graph
  • 获取字符串模板中所有标识符列表的函数(Python)

    对于标准库string template在Python中 有没有一个函数可以获取所有标识符的列表 例如 使用以下 xml 文件
  • pandas 数据框的最大大小

    我正在尝试使用读取一个有点大的数据集pandas read csv or read stata功能 但我不断遇到Memory Errors 数据帧的最大大小是多少 我的理解是 只要数据适合内存 数据帧就应该没问题 这对我来说不应该是问题 还
  • 无法使用Python请求会话模块登录网站

    我刚刚开始进行网络抓取 对于我的第一个项目 我尝试使用 requests Session 登录 artofproblemsolving com 并访问另一个用户的帐户 这是我的代码 import requests LOGIN URL htt
  • 将列表值转换为 pandas 中的行

    我有数据帧 其中一列具有相同长度的 numpy ndarray 值 df list 0 Out 92 array 0 0 0 0 29273096 0 30691767 0 27531403 我想将这些列表值转换为数据框并从 df iloc
  • matplotlib:渲染到缓冲区/访问像素数据

    我想使用 matplotlib 生成的图作为 OpenGL 中的纹理 到目前为止 我遇到的 matplotlib 的 OpenGL 后端要么不成熟 要么已经停止使用 所以我想避免使用它们 我当前的方法是将图形保存到临时 png 文件中 并从
  • 当元组列表中相同项目的值是字符串时,对它们的值求和

    如果我有这样的元组列表 my list books 5 books 10 ink 20 paper 15 paper 20 paper 15 我怎样才能把列表变成这样 books 15 ink 20 paper 50 即添加同一项目的费用
  • Pandas 中每列的曲线拟合 + 外推值

    我有一个包含大约 300 列的数据集 每一列都与深度相关 Pandas DataFrame 的简化版本看起来像这样 import matplotlib pyplot as plt import numpy as np import pand
  • Django 1.7:如何使用 html/css 文件作为模板发送电子邮件

    从 Django 1 7 开始 可以send email 使用新参数 html message 不幸的是 没有关于如何使用它的全面指南 新手友好 或者至少我找不到它 我需要使发送的电子邮件变得漂亮 因此 我试图弄清楚如何将我的消息包含到 h
  • 在字符串内打印单引号

    我想输出 XYZ s ABC 我在Python IDLE中尝试了以下3条语句 第一条和第二条语句输出 a before 带打印功能的第三条语句不输出 before 作为 Python 新手 我想了解为什么 之前输出 在第 1 条和第 2 条
  • 为什么我会在 Python 字符串格式中使用除 %r 之外的其他内容?

    我偶尔会使用 Python 字符串格式 这可以像这样完成 print int i Float f String s 54 34 434 some text 但是 这也可以这样做 print int r Float r String r 54
  • 如何将Python包从旧版本安装到新版本?

    我正在使用 python 3 7 最近在 Linux 中安装了 python 3 8 是否有任何 bash 命令或脚本可以获取 3 7 的所有软件包列表并在 3 8 版本中一一安装 我想避免每个包裹都手工完成 注意 我将它们安装在我的系统中
  • 如何创建简单的梯度下降算法

    我正在研究简单的机器学习算法 从简单的梯度下降开始 但在尝试用 python 实现它时遇到了一些麻烦 这是我试图重现的示例 我获得了有关房屋的数据 居住面积 以英尺为单位 和卧室数量 以及最终的价格 居住面积 英尺2 2104 卧室 3 价
  • 如何仅读取 CSV 文件每行的第一列 [重复]

    这个问题在这里已经有答案了 如何在Python中读取CSV文件每行的第一列 我的数据是这样的 1 abc 2 bcd 3 cde 我只需要循环第一列的值 另外 当我在 calc 中打开 csv 文件时 每行中的数据都在同一个单元格中 这正常

随机推荐