如何在AWS Lambda中模拟multiprocessing.Pool.map()?

2024-04-22

AWS Lambda 上的 Python 不支持multiprocessing.Pool.map(),如记录在这另一个问题 https://stackoverflow.com/questions/34005930/multiprocessing-semlock-is-not-implemented-when-running-on-aws-lambda。请注意,另一个问题是问为什么它不起作用。这个问题是不同的,我问的是在缺乏底层支持的情况下如何模拟功能。

另一个问题的答案之一给了我们这段代码:

# Python 3.6
from multiprocessing import Pipe, Process

def myWorkFunc(data, connection):
    result = None

    # Do some work and store it in result

    if result:
        connection.send([result])
    else:
        connection.send([None])


def myPipedMultiProcessFunc():

    # Get number of available logical cores
    plimit = multiprocessing.cpu_count()

    # Setup management variables
    results = []
    parent_conns = []
    processes = []
    pcount = 0
    pactive = []
    i = 0

    for data in iterable:
        # Create the pipe for parent-child process communication
        parent_conn, child_conn = Pipe()
        # create the process, pass data to be operated on and connection
        process = Process(target=myWorkFunc, args=(data, child_conn,))
        parent_conns.append(parent_conn)
        process.start()
        pcount += 1

        if pcount == plimit: # There is not currently room for another process
            # Wait until there are results in the Pipes
            finishedConns = multiprocessing.connection.wait(parent_conns)
            # Collect the results and remove the connection as processing
            # the connection again will lead to errors
            for conn in finishedConns:
                results.append(conn.recv()[0])
                parent_conns.remove(conn)
                # Decrement pcount so we can add a new process
                pcount -= 1

    # Ensure all remaining active processes have their results collected
    for conn in parent_conns:
        results.append(conn.recv()[0])
        conn.close()

    # Process results as needed

可以修改此示例代码以支持multiprocessing.Pool.map()?

到目前为止我尝试过什么

我分析了上面的代码,没有看到要执行的函数的参数或数据,所以我推断它执行的功能与multiprocessing.Pool.map()。除了演示可以组装成解决方案的构建块之外,尚不清楚代码的作用。

这是一个“为我编写代码”的问题吗?

是的,在某种程度上,确实如此。这个问题影响了成千上万的Python开发者,如果我们所有人共享相同的代码,而不是强迫每个遇到这个问题的SO用户去开发,那么对世界经济来说会更加有效,减少温室气体排放等他们自己的解决方法。我希望我已经尽了自己的一份力量,将其提炼成一个明确的问题,并准备好假定的构建块。


我能够让这个在我自己的测试中工作。 我的代码基于此链接:https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/ https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/

注1:你必须增加 lambda 函数的内存分配。使用默认的最小数量,多处理不会提高性能。我的帐户可以分配的最大数量(3008MB)达到了以下数字。

NB2:我在这里完全忽略了最大并行进程。我的用法没有太多需要处理的元素。

使用下面的代码,用法是:

work = funcmap(yourfunction,listofstufftoworkon)
yourresults = work.run()

从我的笔记本电脑运行:

jumper@jumperdebian[3333] ~/scripts/tmp  2019-09-04 11:52:30
└─ $ ∙ python3 -c "import tst; tst.lambda_handler(None,None)"
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
SP runtime : 9.574460506439209
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
MP runtime : 6.422513484954834

从AWS运行:

Function Logs:
START RequestId: 075a92c0-7c4f-4f48-9820-f394ee899a97 Version: $LATEST
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
SP runtime : 12.135798215866089
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
MP runtime : 7.293526887893677
END RequestId: 075a92c0-7c4f-4f48-9820-f394ee899a97

这是测试代码:

import time
from multiprocessing import Process, Pipe
import boto3

class funcmap(object):

    fmfunction=None
    fmlist=None

    def __init__(self,pfunction,plist):
        self.fmfunction=pfunction
        self.fmlist=plist

    def calculation(self, pfunction, pload, conn):
        panswer=pfunction(pload)
        conn.send([pload,panswer])
        conn.close()

    def run(self):
        datalist = self.fmlist
        processes = []
        parent_connections = []
        for datum in datalist:
            parent_conn, child_conn = Pipe()
            parent_connections.append(parent_conn)
            process = Process(target=self.calculation, args=(self.fmfunction, datum, child_conn,))
            processes.append(process)

        pstart=time.time()
        for process in processes:
            process.start()
            #print("starting at t+ {} s".format(time.time()-pstart))
        for process in processes:
            process.join()
            #print("joining at t+ {} s".format(time.time()-pstart))

        results = []
        for parent_connection in parent_connections:
            resp=parent_connection.recv()
            results.append((resp[0],resp[1]))
        return results


def fibo(n):
    if n <= 2 : return 1
    return fibo(n-1)+fibo(n-2)

def lambda_handler(event, context):
    #worklist=[22,23,24,25,26,27,28,29,30,31,32,31,30,29,28,27,26,27,28,29]
    #worklist=[22,23,24,25,26,27,28,29,30]
    worklist=[30,30,30,30]
    #worklist=[30]
    _start = time.time()
    results=[]
    for a in worklist:
        results.append((a,fibo(a)))
    print("results : {}".format(results))
    _end = time.time()
    print("SP runtime : {}".format(_end-_start))

    _mstart = time.time()
    work = funcmap(fibo,worklist)
    results = work.run()
    print("results : {}".format(results))
    _mend = time.time()
    print("MP runtime : {}".format(_mend-_mstart))

希望能帮助到你。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在AWS Lambda中模拟multiprocessing.Pool.map()? 的相关文章

  • 嵌套字典中的 Django 模板

    我正在使用 Django 模板 并且遇到了嵌套字典的一个问题 Dict result dict type 0 file name abc count 0 type 1 file name xyz count 50 我的 HTML 文件中的模
  • 检测到通过 ChromeDriver 启动的 Chrome 浏览器

    我正在尝试在 python 中使用 selenium chromedriver 来访问 www mouser co uk 网站 然而 从第一次拍摄开始 它就被检测为机器人 有人对此有解释吗 此后我使用的代码 options Options
  • 从 Python 下载/安装 Windows 更新

    我正在编写一个脚本来自动安装 Windows 更新 我可以将其部署在多台计算机上 这样我就不必担心手动更新它们 我想用 Python 编写这个 但找不到任何关于如何完成此操作的信息 我需要知道如何搜索更新 下载更新并从 python 脚本安
  • 字符串中的注释和注释中的字符串

    我正在尝试使用 Python 和 Regex 计算 C 代码中包含的注释中的字符数 但没有成功 我可以先删除字符串以删除字符串中的注释 但这也会删除注释中的字符串 结果会很糟糕 是否有机会通过使用正则表达式来询问不匹配注释中的字符串 反之亦
  • 使用 NLTK 在 Python 中获取大量名词(或形容词);或 Python Mad Libs

    Like 这个问题 https stackoverflow com questions 7439555 noun adjective etc word lists or dictionaries common words 我有兴趣按词性获取
  • 如何在Python中高效地添加稀疏矩阵

    我想知道如何在Python中有效地添加稀疏矩阵 我有一个程序 可以将大任务分解为子任务 并将它们分配到多个 CPU 上 每个子任务都会产生一个结果 一个 scipy 稀疏矩阵 格式为 lil matrix 稀疏矩阵尺寸为 100000x50
  • 将 numpy 代码点数组与字符串相互转换

    我有一个很长的 unicode 字符串 alphabet range 0x0FFF mystr join chr random choice alphabet for in range 100 mystr re sub W mystr 我想
  • 使用 Python 计算 Spark 中成对 (K,V) RDD 中每个 KEY 的平均值

    我想与 Python 共享这个特定的 Apache Spark 解决方案 因为它的文档非常贫乏 我想通过 KEY 计算 K V 对 存储在 Pairwise RDD 中 的平均值 示例数据如下所示 gt gt gt rdd1 take 10
  • ValueError:不支持连续[重复]

    这个问题在这里已经有答案了 我正在使用 GridSearchCV 进行线性回归的交叉验证 不是分类器也不是逻辑回归 我还使用 StandardScaler 对 X 进行标准化 我的数据框有 17 个特征 X 和 5 个目标 y 观察 约11
  • CNTK 抱怨 LSTM 中的动态轴

    我正在尝试在 CNTK 中实现 LSTM 使用 Python 来对序列进行分类 Input 特征是固定长度的数字序列 时间序列 标签是 one hot 值的向量 Network input input variable input dim
  • Alembic:如何迁移模型中的自定义类型?

    My User模型是 class User UserMixin db Model tablename users noinspection PyShadowingBuiltins uuid Column uuid GUID default
  • 如何使用 Bokeh 动态隐藏字形和图例项

    我正在尝试在散景中实现复选框 其中每个复选框应显示 隐藏与其关联的行 我知道可以通过图例来实现这一点 但我希望这种效果同时在两个图中发生 此外 图例也应该更新 在下面的示例中 出现了复选框 但不执行任何操作 我显然不明白如何更新用作源的数据
  • 如何从 JSON 响应重定向?

    所以我尝试使用 Flask 和 Javascript 上传器 Dropzone 上传文件并在上传完成后重定向 文件上传正常 但在烧瓶中使用传统的重定向 return redirect http somesite com 不执行任何操作 页面
  • Python Pandas:如何对组中的所有项目进行分组并为其分配 id?

    我有 df domain orgid csyunshu com 108299 dshu com 108299 bbbdshu com 108299 cwakwakmrg com 121303 ckonkatsunet com 121303
  • `pyqt5'错误`元数据生成失败`

    我正在尝试安装pyqt5使用带有 M1 芯片和 Python 3 9 12 的 mac 操作系统 我怀疑M1芯片可能是原因 我收到一个错误metadata generation failed 最小工作示例 directly in the t
  • 在 Spyder 的变量资源管理器中查看局部变量

    我是 python 新手 正在使用 Spyder 的 IDE 我欣赏它的一项功能是它的变量资源管理器 然而 根据一些研究 我发现它只显示全局变量 我找到的解决方法是使用检查模块 import inspect local vars def m
  • 使用 pybtex 将 bibtex 转换为格式化的 HTML 参考书目,例如哈佛风格

    我正在使用 Django 并将 bibtex 存储在我的模型中 并且希望能够以格式化 HTML 字符串的形式向我的视图传递引用 使其看起来像哈佛引用样式 使用中描述的方法Pybtex 无法识别 bibtex 条目 https stackov
  • 从 python 检测 macOS 中的暗模式

    我正在编写一个 PyQt 应用程序 我必须添加一个补丁 以便在启用暗模式的 Macos 上可以读取字体 app QApplication Fix for the font colours on macos when running dark
  • 附加两个具有相同列、不同顺序的数据框

    我有两个熊猫数据框 noclickDF DataFrame 0 123 321 0 1543 432 columns click id location clickDF DataFrame 1 123 421 1 1543 436 colu
  • 使用 Python 将对象列表转为 JSON

    我在转换时遇到问题Object实例到 JSON ob Object list name scaping myObj base url u number page for ob in list name json string json du

随机推荐