(免责声明:我是该书的作者future https://cran.r-project.org/web/packages/future框架和进步者 https://cran.r-project.org/web/packages/progressr包裹)
类似于的封闭解决方案base::lapply()
, 和你的pbapply::pblapply()
例如,是使用未来.申请 https://cran.r-project.org/web/packages/future.apply as:
library(future.apply)
## The below is same as plan(multisession, workers=4)
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
})
Chunking:
您可以使用参数控制分块量future.chunk.size
或补充future.schedule
。要禁用分块以便在唯一的并行任务中处理每个元素,请使用future.chunk.size=1
。这样,如果有一个元素比其他元素花费的时间长得多,它就不会占用任何其他元素。
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
并行更新进度:如果您想在进行并行处理时接收进度更新,您可以使用进步者 https://cran.r-project.org/web/packages/progressr打包并配置它以使用progress https://cran.r-project.org/web/packages/progress包以进度条形式报告更新(此处也带有 ETA)。
library(future.apply)
plan(multisession, workers=4)
library(progressr)
handlers(handler_progress(format="[:bar] :percent :eta :message"))
with_progress({
p <- progressor(along=xs)
results <- future_lapply(xs, FUN=function(x) {
p() ## signal progress
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
})
您可以将其包装到一个函数中,例如
my_fcn <- function(xs) {
p <- progressor(along=xs)
future_lapply(xs, FUN=function(x) {
p()
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
}
这样你就可以将它作为常规函数调用:
> result <- my_fcn(xs)
and use plan()
精确控制您希望它如何并行化。这不会报告进度。为此,您必须执行以下操作:
> with_progress(result <- my_fcn(xs))
[====>-----------------------------------------------------] 9% 1m
在后台运行所有内容:如果您的问题是如何在后台运行整个 shebang,请参阅 '未来的拓扑 https://cran.r-project.org/web/packages/future/vignettes/future-3-topologies.html'小插图。这是另一个级别的并行化,但这是可能的。