我正在尝试处理保存在磁盘上的 > 10000 个 xts 对象,当加载到 R 中时,每个对象的大小约为 0.2 GB。我想使用 foreach 并行处理这些对象。我的代码适用于类似 100 个 xts 对象的情况,我将这些对象预加载到内存中、导出等。但是在超过 100 个 xts 对象之后,我的机器就达到了内存限制。
我正在尝试做的示例:
require(TTR)
require(doMPI)
require(foreach)
test.data <- runif(n=250*10*60*24)
xts.1 <- xts(test.data, order.by=as.Date(1:length(test.data)))
xts.1 <- cbind(xts.1, xts.1, xts.1, xts.1, xts.1, xts.1)
colnames(xts.1) <- c("Open", "High", "Low", "Close", "Volume", "Adjusted")
print(object.size(xts.1), units="Gb")
xts.2 <- xts.1
xts.3 <- xts.1
xts.4 <- xts.1
save(xts.1, file="xts.1.rda")
save(xts.2, file="xts.2.rda")
save(xts.3, file="xts.3.rda")
save(xts.4, file="xts.4.rda")
names <- c("xts.1", "xts.2", "xts.3", "xts.4")
rm(xts.1)
rm(xts.2)
rm(xts.3)
rm(xts.4)
cl <- startMPIcluster(count=2) # Use 2 cores
registerDoMPI(cl)
result <- foreach(name=names,
.combine=cbind,
.multicombine=TRUE,
.inorder=FALSE,
.packages=c("TTR")) %dopar% {
# TODO: Move following line out of worker. One (or 5, 10,
# 20, ... but not all) object at a time should be loaded
# by master and exported to worker "just in time"
load(file=paste0(name, ".rda"))
return(last(SMA(get(name)[, 1], 10)))
}
closeCluster(cl)
print(result)
所以我想知道如何能够在发送/需要之前从磁盘“及时”加载每个(或几个,例如 5、10、20、100,...但不是一次全部)xts 对象出口给工人。我无法在工作人员中加载对象(基于名称和存储在磁盘上的文件夹),因为工作人员可以位于远程计算机上,而无需访问存储在磁盘上的对象的文件夹。所以我需要能够在主进程中“及时”读取/加载它们......
我使用 doMPI 和 doRedis 作为并行后端。 doMPI 看起来内存效率更高,但比 doRedis 慢(在 100 个对象上)。
所以我想了解什么是解决这个问题的正确“策略”/“模式”。