如何使用 Spark-Submit 和 pyspark 运行 luigi 任务

2024-01-19

我有一个luigipython 任务,其中包括一些 pyspark 库。现在我想用spark-submit在mesos上提交这个任务。我应该做什么来运行它?下面是我的代码框架:

from pyspark.sql import functions as F
from pyspark import SparkContext

class myClass(SparkSubmitTask):
# date = luigi.DateParameter()

  def __init__(self, date):
    self.date = date # date is datetime.date.today().isoformat()

  def output(self):

  def input(self):

  def run(self):
    # Some functions are using pyspark libs

if __name__ == "__main__":
  luigi.run()

如果没有 luigi,我将通过以下命令行提交此任务:

/opt/spark/bin/spark-submit --master mesos://host:port --deploy-mode cluster --total-executor-cores 1 --driver-cores 1 --executor-memory 1G --driver-memory 1G my_module.py

现在的问题是我如何 Spark 提交包含 luigi 命令行的 luigi 任务,例如:

luigi --module my_module myClass --local-scheduler --date 2016-01

还有一个问题是,如果 my_module.py 有一项需要首先完成的任务,我是否需要为它做更多的事情,或者只是设置与当前命令行相同的值?

我真的很感谢对此的任何提示或建议。非常感谢。


Luigi 有一些模板任务。其中之一称为 PySparkTask。 您可以从此类继承并覆盖属性:

https://github.com/spotify/luigi/blob/master/luigi/contrib/spark.py https://github.com/spotify/luigi/blob/master/luigi/contrib/spark.py.

我还没有测试过,但根据我对 luigi 的经验,我会尝试这个:

import my_module


class MyPySparkTask(PySparkTask):
    date = luigi.DateParameter()

    @property
    def name(self):
        return self.__class__.__name__

    @property
    def master(self):
        return 'mesos://host:port'

    @property
    def deploy_mode(self):
        return 'cluster'

    @property
    def total_executor_cores(self):
        return 1

    @property
    def driver_cores(self):
        return 1

    @property
    def executor-memory(self):
        return 1G

    @property
    def driver-memory(self):
        return 1G

    def main(self, sc, *args):
        my_module.run(sc)

    def self.app_options():
        return [date]

然后你可以使用以下命令运行它: luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01

还有一个选项可以在 client.cfg 文件中设置属性,以使它们成为其他 PySparkTasks 的默认值:

[spark]
master: mesos://host:port
deploy_mode: cluster
total_executor_cores: 1
driver_cores: 1
executor-memory: 1G
driver-memory: 1G
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 Spark-Submit 和 pyspark 运行 luigi 任务 的相关文章

  • 在 Pandas 中按日期获取有效合约

    我在检测 pandas DataFrame 中的活动合约方面遇到了一些困难 假设每一行都是一个协商 对于每一行 我有两列 initial date 和 end date 我想知道的是按日期划分的活跃合约数量 到目前为止我做了一个非常低效的方
  • Django:如何测试“HttpResponsePermanentRedirect”

    我正在为我的 django 应用程序编写一些测试 在我看来 它使用 HttpResponseRedirect 重定向到其他一些网址 那么我该如何测试呢 姜戈TestCase类有一个方法assertRedirects https docs d
  • 在Python3.6中调用C#代码

    由于完全不了解 C 编码 我希望在我的 python 代码中调用 C 函数 我知道有很多关于同一问题的问答 但由于一些奇怪的原因 我无法从示例 python 模块导入简单的 c 类库 以下是我所做的事情 C 类库设置 我使用的是 VS 20
  • 在 Numpy 中切片后确定结果数组的形状

    我很难理解在 numpy 中切片后如何确定结果数组的形状 例如 我使用以下简单代码 import numpy as np array np arange 27 reshape 3 3 3 slice1 array 1 2 1 slice2
  • 如何在Python中循环并存储自变量中的值

    我对 python 很陌生 所以这听起来可能很愚蠢 我进行了搜索 但没有找到解决方案 我在 python 中有一个名为 ExcRng 的函数 我可以对该函数执行什么样的 for 循环 以便将值存储在独立变量中 我不想将它们存储在列表中 而是
  • 如何使用Python将WebP图像转换为Gif?

    我已经尝试过这个 from PIL import Image im Image open this webp im save that gif gif save all True 这给了我这个错误 类型错误 不支持的操作数类型 tuple
  • Python BeautifulSoup XML 解析

    我编写了一个简单的脚本来使用 BeautifulSoup 模块解析 XML 聊天日志 标准 soup prettify 工作正常 只是聊天日志中有很多绒毛 您可以在下面看到我正在使用的脚本代码和一些 XML 输入文件 Code import
  • PIL Image.size 返回相反的宽度/高度

    使用PIL确定图像的宽度和高度 在特定图像上 幸运的是只有这一个 但这很麻烦 从 image size 返回的宽度 高度是相反的 图片 http storage googleapis com cookila 533ebf752b9d1f7c
  • 如何使用 python urllib 在 HTTP/1.1 中保持活力

    现在我正在这样做 Python3 urllib url someurl headers HOST somehost Connection keep alive Accept Encoding gzip deflate opener urll
  • “char”/“character”类型的类型提示

    char 或 character 没有内置的原始类型 因此显然必须使用长度为 1 的字符串 但是为了暗示这一点并暗示它应该被视为一个字符 如何通过类型提示来实现这一点 grade chr A 一种方法可能是使用内置的 chr 函数来表示这一
  • 无法打开 Python。错误 0xc000007b

    我最近一直在学习 Python 3 我在我的上网本 32 位 Windows 7 上创建简单的小程序没有任何问题 当我将它安装在我的上网本上时 我没有遇到任何问题 但现在我已经开始使用它了 我想将它安装在我的台式机上 并且我有一个 我的桌面
  • 如何通过双击在浏览器中打开 ipynb 文件

    以前 我安装了 Canopy 当时 我只需双击 ipynb 文件并在浏览器中打开它们即可 但是 后来我需要Anaconda 一旦我安装了它 这个功能就没有了 现在我只希望能够简单地双击 ipynb 文件 然后该文件就会在 Firefox 中
  • 如何在python中检索aws批处理参数值?

    流程 Dynamo DB gt Lambda gt 批处理 如果将角色 arn 插入动态数据库 它是从 lambda 事件中检索的 然后使用submit job角色 arn 的 API 被传递为 parameters role arn ar
  • spacy 如何使用词嵌入进行命名实体识别 (NER)?

    我正在尝试使用以下方法训练 NER 模型spaCy识别位置 人 名和组织 我试图理解如何spaCy识别文本中的实体 但我无法找到答案 从这个问题 https github com explosion spaCy issues 491在 Gi
  • 更改 pandas 中多个日期时间列的时区信息

    有没有一种简单的方法可以将数据帧中的所有时间戳列转换为本地 任何时区 不是逐列进行吗 您可以有选择地将转换应用于所有日期时间列 首先 选择它们select dtypes https pandas pydata org pandas docs
  • 如何在Python和Selenium中通过标签名称或id获取元素[重复]

    这个问题在这里已经有答案了 我正在尝试使用 Python 和 Selenium 获取输入 但它向我显示错误 我该如何解决这个错误 inputElement send keys getStock getStocklFunc 0 Error i
  • Django 在选择列表更改时创建毫无意义的迁移

    我正在尝试使用可调用创建一个带有选择字段的模型 以便 Django 在选择列表更改时不会创建迁移 如中所述this https stackoverflow com questions 31788450 stop django from cr
  • 无法截取宽度为 0 的屏幕截图

    我正在尝试截取 Bootstrap 模态内元素的屏幕截图 经过一番努力 我终于想出了这段代码 driver get https enlinea sunedu gob pe driver find element by xpath div c
  • 如何通过字符串匹配加速 pandas 行过滤?

    我经常需要过滤 pandas 数据框df by df df col name string value 并且我想加快行选择操作 有没有快速的方法可以做到这一点 例如 In 1 df mul df 3000 2000 3 reset inde
  • 为什么我们应该在 def __init__(self, n) -> None: 中使用 -> ?

    我们为什么要使用 gt in def init self n gt None 我读了以下摘录来自 PEP 484 https www python org dev peps pep 0484 the meaning of annotatio

随机推荐

  • 将 int 数组发布到 MVC 控制器 - 正确的方法签名是什么?

    下面是我发送到 ASP NET MVC2 控制器的 POST 请求的屏幕截图 使用 Firebug Net 面板 这是接收 POST 请求的控制器 操作方法 public ActionResult Search int skill int
  • 从 numba jitted 函数调用非 jitted 函数

    我的代码如下所示 jit nopython True def sum fn arg1 arg2 argn for i in xrange len arg2 For each bin l p fn1 arg1 arg2 argn res re
  • 如何动态地将项目添加到纸张下拉菜单中?

    我尝试使用 dropdownMenu appendChild menuItem 添加它 但正如我预期的那样 这不起作用 我在 Polymer 指南上找不到有关此问题的信息 也在这里找不到其他类似的问题 那可能吗 如果是这样 怎么办 纸张下拉
  • 人们如何处理 RESTful api 的身份验证(与技术无关)

    我正在考虑构建一些移动应用程序 因此 这些应用程序将通过 JSON 和 REST 例如 put post 等 与我的服务器 对话 如果我想确保客户端手机应用程序正在尝试执行需要某些 许可 的操作 人们该如何处理 例如 我们的网站出售东西 g
  • 如何在运行时通过脚本检查 PowerShell 中是否存在 cmdlet

    我有一个需要在多个主机 PowerGUI PowerShell ISE https technet microsoft com en us library dd315244 aspx等 但我遇到一个问题 有时其中一台主机下不存在 cmdle
  • 在 Maven 中,为什么运行“mvn clean”?

    我想知道跑步之间的主要区别是什么mvn compile and mvn clean compile是 在实践中 我明白真正的区别是什么mvn clean compile删除所有生成的文件并从头开始 但我们为什么要这样做呢 我可以假设mvn
  • WSDL.exe - 生成接口以及具体类,以便以后轻松进行伪造/模拟

    当 WSDL exe 生成 Web 服务的代理时 是否可以让 WSDL exe 生成接口以及具体类 或者代替具体类 我们正在使用来自 ASP Net 应用程序的第 3 方 Web 服务 并使用 WSDL exe 生成了我们的代理类 一切都很
  • 核心数据多级父-子上下文

    在我的应用程序中我有UITableViewController显示事件列表 该控制器使用 ManagedObjectContext 说ParentContext 现在 如果选择任何事件 则会显示详细的视图控制器 用户可以在其中编辑事件的详细
  • 无法使用 PhantomJS 加载页面资源

    我正在使用 PhantomJS 获取给定 URL 的页面内容 问题是在某些页面上 PhantomJS 无法加载某些资源 js css 我收到的错误是 错误代码 5 操作取消 我可以重现此问题的网页是www lifehacker com ht
  • Django Rest Framework 中 CreateAPIView 的权限

    我查看了 CreateAPIView 的代码 发现创建过程中没有任何地方检查权限 所以我决定检查它们perform create挂钩如下 class CourseList generics CreateAPIView Create a co
  • 浮动元素被前一个元素按下

    http jsfiddle net 4gw8wank http jsfiddle net 4gw8wank 我有 3 个容器作为响应式页面的一部分 在移动视图中 它们必须按顺序显示 蓝色 红色 绿色 像这样堆叠在一起 div class b
  • 如何删除警告:“-pie 被忽略。它仅在链接主可执行文件时使用”

    自从更新到 Xcode10 和 Swift 4 2 以来 我总是在我的 pod 中收到此构建时间警告 我需要做什么才能删除这些警告 我尝试删除派生数据 清理并构建项目 但它不起作用 我发现有人在这里问同样的问题https github co
  • 保持对管道运算符之间的变量的访问

    我一直在尝试在节点应用程序中使用 Rxjs fileList 是从返回fs readdirsync 字符串数组 首先map 有一个称为文件名的参数 flatMap readFileAsObservable uses bindNodeCall
  • Azure 数据工厂:如何在另一个管道成功完成后触发管道

    在 Azure 数据工厂中 如何在其他管道成功完成后触发管道 详细地 我试图在其他管道成功完成后触发 SSIS 包 我已经知道我可以将 SSIS 包保存为管道 并像其他管道一样使用触发器运行它 但是如何确保 SSIS 包管道仅在其他管道完成
  • 从服务器端查找 HttpRequest 中的时区

    我努力了var dateHeaders HttpContext Current Request Headers Date 但它包含 null 显然没有这样的键 谁能告诉我还能在哪里找到当前客户的时区 参考 http en wikipedia
  • 让 CSS 插入框阴影出现在内部背景之上

    我希望 CSS 插入框阴影出现在带有框阴影的容器内元素的顶部 特别是子元素的背景颜色 Demo http jsfiddle net Q8n77 http jsfiddle net Q8n77 div class parent foo div
  • C# 面板列表

    我想创建包含有关某个项目的详细信息的面板 包括左侧的缩略图 然后将它们添加到可滚动列表中 与 iPhone 上的 iTunes 显示可用应用程序列表的方式非常相似 我已经进行了一些搜索 但迄今为止尚未找到任何帮助 有谁有任何想法或样本链接想
  • Bing 拼写检查 API 是否对 mode = 拼写有额外的长度限制?

    The Bing 拼写检查 API https msdn microsoft com en us library mt711411 aspx对于请求想要进行拼写检查 mode spell 或校对 mode proof default 的文本
  • 后备机制 - 最佳方法?

    我有三种不同类型的服务器连接 这些可以在属性文件中配置 假设有三台服务器 Server1 Server2 Server3 In Properties文件 我的配置如下 ServerPref1 Server1 ServerPref2 Serv
  • 如何使用 Spark-Submit 和 pyspark 运行 luigi 任务

    我有一个luigipython 任务 其中包括一些 pyspark 库 现在我想用spark submit在mesos上提交这个任务 我应该做什么来运行它 下面是我的代码框架 from pyspark sql import function