Python 多处理和处理工人中的异常

2024-04-10

我使用 python 多处理库来实现一种算法,其中有许多工作人员处理某些数据并将结果返回给父进程。我使用 multiprocessing.Queue 将作业传递给工作人员,然后收集结果。

一切都运行得很好,直到工作人员无法处理某些数据块。在下面的简化示例中,每个工作人员都有两个阶段:

  • 初始化 - 可能会失败,在这种情况下,worker 应该被销毁
  • 数据处理 - 处理一块数据可能会失败,在这种情况下,工作人员应该跳过该块并继续处理下一个数据。

当这两个阶段中的任何一个失败时,我都会在脚本完成后陷入僵局。这段代码模拟了我的问题:

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
        finally:
            jobs_queue.task_done()
    # Telling that we are done with processing stop token
    jobs_queue.task_done()



#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
    result = results.get()
    print result

#Signal all workers to finish
for i in range(workers_count):
    jobs.put(None)

#Wait for them to finish
jobs.join()

我对这段代码有两个问题:

  1. When init()失败,如何检测该工作人员无效而不是等待它完成?
  2. When do_work()失败,如何通知父进程结果队列中应该有更少的结果?

谢谢你的帮助!


我稍微更改了您的代码以使其正常工作(请参阅下面的说明)。

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.5
fail_job_p = 0.4


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        result_queue.put('init failed')
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
            result_queue.put('job failed')


#========= Parent =========
jobs = mp.Queue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

init_failures = 0
job_failures = 0
successes = 0
while job_failures + successes < 30 and init_failures < workers_count:
    result = results.get()
    init_failures += int(result == 'init failed')
    job_failures += int(result == 'job failed')
    successes += int(result != 'init failed' and result != 'job failed')
    #print init_failures, job_failures, successes

for ii in range(workers_count):
    jobs.put(None)

我的改变:

  1. Changed jobs成为一个正常人Queue(代替JoinableQueue).
  2. 现在,工作人员会返回特殊结果字符串“init failed”和“job failed”。
  3. 只要特定条件有效,主进程就会监视所述特殊结果。
  4. 最后,提出“停止”请求(即None工作),无论你有多少工人,无论如何。请注意,并非所有这些都可以从队列中提取(以防工作线程初始化失败)。

顺便说一句,您的原始代码很好并且易于使用。随机概率位非常酷。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 多处理和处理工人中的异常 的相关文章

随机推荐

  • 如何替换 Pandas 数据框字符串列中的文本?

    我的数据框中有一列 如下所示 range 2 30 50 290 400 1000 我想更换 逗号与 短跑 我目前正在使用这种方法 但没有任何改变 org info exc range replace inplace True 有人可以帮忙
  • 如何从存储中干净地删除证书

    您可以使用 certmgr msc 中的向导将证书安装到证书存储中 右键单击安装 有谁知道如何使用向导 代码 首选 脚本 干净地 删除所有证书 我希望能够从 LocalMachine 和 或 CurrentUser 存储中删除所有内容 我之
  • 应用程序传输安全阻止 HTTPS

    我的 ATS 有问题 我使用的是 XCode 9 1 我的开发目标是 11 0 我正在使用react native 0 49进行开发 我的程序正在获取https https www xxxx com https www xxxx com 具
  • 使用Windows powershell从实时(更新)日志文件中过滤字符串

    我有一台计算机正在通过串行端口记录来自设备的事件 Putty 正在将它们记录到文件中 我曾经在 Linux 机器上运行过这个 基本上会tail f event log gt gt script sh 这是 nix 脚本 bin bash o
  • 这个 jQuery 代码片段可以缩短吗?

    我刚刚开始使用 jQuery 虽然下面的代码可以完成工作 但我感觉它可以缩短 var accountAddress document createElement input addClass readOnly attr contentEdi
  • Grails 2.3 和 GGTS 3.4 停止按钮不起作用

    如何使用 grails 2 3 停止 GGTS 3 4 中的 grails 进程 GGTS 的停止按钮没有停止服务器运行 并且 java exe 进程仍然显示在任务管理器中 评论一下grails project fork BuildConf
  • 如何在每次实体框架数据库迁移后访问上下文

    当我添加迁移时 我使用 Up Down 方法获得适当的 DbMigration 类 我可以在其中进行架构更改 并且 使用 Sql 方法 也可以进行数据 内容更改 我希望能够使用数据库上下文在每次迁移时进行内容更改 我知道我可以在 Confi
  • if 语句后的变量声明

    另一个论坛上出现了一个问题 我知道如何解决它 但它揭示了我特有的编译器功能 该人收到错误 嵌入式语句不能是声明或标记语句 因为他们在 if 语句后面有一个变量声明 没有括号 这不是他们的意图 但他们注释掉了紧跟在 if 语句之后的代码行 这
  • 相当于 Java 中的 C# 匿名方法吗?

    在 C 中 您可以匿名定义委托 即使它们只不过是语法糖 例如 我可以这样做 public string DoSomething Func
  • TensorFlow tf.group 忽略依赖关系?

    继从先前的问题 https stackoverflow com questions 44244275 tensorflow fifoqueue not fifo 它似乎tf group确实忽略了依赖关系 这是一个简单的独立示例 我已在 Py
  • 为什么 Chrome 在使用 HTTP/2 时会对请求进行排队?

    我有一个使用 HTTP 2 的网站 该网站加载图像的速度很慢 看看 Chrome 的 Devtools 大部分时间都花在 排队 图像的网络请求上 我的理解是 使用 HTTP 2 可以通过同一个 TCP 连接同时发出多个请求 但我看到 Chr
  • 超链接右侧带有图标的 JQuery UI

    我试图在超链接的右侧放置一个图标 使用 JQuery UI 主题 然而 我得到的最令人满意的结果是页面最右侧的图标 而不是紧接在实际文本之后 最简单的选择是有一个 img 标签位于文本后面 但图标需要根据当前主题设置样式 这就是我所拥有的
  • Zookeeper管理服务器端口

    在Windows上安装了zookeeper 3 5 6 bin 出现错误 无法启动AdminServer 异常退出 org apache zookeeper server admin AdminServer AdminServerExcep
  • 将变量从 Github Action 传递到 Docker 镜像构建

    我一直致力于设置 Github Actions 工作流程来构建 docker 映像 我需要将环境变量传递到图像中 以便我的 Django 项目能够正确运行 不幸的是 当我构建图像时 它没有收到变量的值 我的工作流程文件的相关部分 name
  • 如何通过索引列表过滤 numpy 数组?

    我有一个 numpy 数组 filtered rows 由 LAS 数据组成 x y z intensity classification 我创建了一个cKDTree点并找到最近的邻居 query ball point 这是该点及其邻居的索
  • webview_flutter 和 flutter_webview_plugin 哪个更好

    我已经在flutter中开发了web view 我不清楚哪个更好 webview flutter 与 flutter webview plugin In webview flutter Flutter 小部件可以在 Web 视图上显示 so
  • 如何制作Python模块或函数并在编写其他程序时使用它?

    在很多情况下 我必须在多个程序中一遍又一遍地编写大行代码 所以我想知道是否可以只编写一个程序 保存它 然后在不同的程序 例如函数或模块 中调用它 一个基本的例子 我编写一个程序来检查一个数字是否是回文 然后我想编写一个程序来检查一个数字是否
  • 强调 WordPress 主题 - 添加第二个侧边栏

    使用下划线主题 s 启动 WordPress 网站 我已经有一个侧边栏可以工作 但想在同一页面上制作第二个侧边栏 包含不同的小部件 我已将新的侧边栏添加到functions php 中 它出现在Wordpress 登录屏幕中 我可以将小部件
  • 存储对特定文件的更改

    我有一个大型 git 项目 我愚蠢地将其导入到 eclipse 并运行自动格式 现在 项目中的每个文件都显示为已修改 我宁愿还原所有仅格式化且未进行其他更改的文件 而不是提交格式化的文件 例如 git status On branch ma
  • Python 多处理和处理工人中的异常

    我使用 python 多处理库来实现一种算法 其中有许多工作人员处理某些数据并将结果返回给父进程 我使用 multiprocessing Queue 将作业传递给工作人员 然后收集结果 一切都运行得很好 直到工作人员无法处理某些数据块 在下