这是一种将并行代码编写为函数的方法。
我事先按id拆分数据,而不是将每个id与当前索引进行比较i
。这可以节省一些时间。它还可以节省提取时间results
向量仅一次。
我不知道为什么,我在并行代码中没有发现任何错误,但是最终的 data.frame 不等于顺序输出final_a
,它有更多行.
这取决于系统,但正如您在计时中看到的,6 核运行速度最快。
library(parallel)
library(doParallel)
#> Loading required package: foreach
#> Loading required package: iterators
parFun <- function(my_data, ncores) {
split_data <- split(my_data, my_data$id)
registerDoParallel(cl <- makeCluster(ncores))
on.exit(stopCluster(cl))
test <- foreach(i = seq_along(split_data)) %dopar% {
start_i_results <- split_data[[i]]$results
n <- length(start_i_results)
if(n > 1L) {
tryCatch({
pairs_i <- data.frame(first = start_i_results[-n],
second = start_i_results[-1L])
frame_i <- as.data.frame(table(pairs_i))
frame_i$id <- i
frame_i
}, error = function(e) {e})
} else NULL
}
final_b <- do.call(rbind.data.frame, test)
final_b
}
set.seed(2022)
id <- sample.int(10000, 100000, replace = TRUE)
res <- c(1,0)
results <- sample(res, 100000, replace = TRUE)
date_exam_taken <- sample(seq(as.Date('1999/01/01'), as.Date('2020/01/01'), by="day"), 100000, replace = TRUE)
my_data <- data.frame(id, results, date_exam_taken)
my_data <- my_data[order(my_data$id, my_data$date_exam_taken),]
my_data$general_id = 1:nrow(my_data)
my_data$exam_number = ave(my_data$general_id, my_data$id, FUN = seq_along)
my_data$general_id = NULL
t0 <- system.time({
my_list = list()
for (i in 1:length(unique(my_data$id)))
{
{tryCatch({
start_i = my_data[my_data$id == i,]
pairs_i = data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
frame_i = as.data.frame(table(pairs_i))
frame_i$id = i
# print(frame_i)
my_list[[i]] = frame_i
}, error = function(e){})
}}
final_a = do.call(rbind.data.frame, my_list)
})
ncores <- detectCores()
# run with 3 cores
t3 <- system.time(parFun(my_data, 3L))
# run with 6 cores and save the result in `res6`
t6 <- system.time(res6 <- parFun(my_data, ncores - 2L))
rbind(t0, t3, t6)[,1:3]
#> user.self sys.self elapsed
#> t0 12.86 1.00 15.37
#> t3 3.50 0.22 8.37
#> t6 3.61 0.46 7.65
head(final_a, 10)
#> first second Freq id
#> 1 0 0 2 1
#> 2 1 0 3 1
#> 3 0 1 4 1
#> 4 1 1 0 1
#> 5 0 0 5 2
#> 6 1 0 3 2
#> 7 0 1 2 2
#> 8 1 1 0 2
#> 9 0 0 0 3
#> 10 1 0 1 3
head(res6, 10)
#> first second Freq id
#> 1 0 0 2 1
#> 2 1 0 3 1
#> 3 0 1 4 1
#> 4 1 1 0 1
#> 5 0 0 5 2
#> 6 1 0 3 2
#> 7 0 1 2 2
#> 8 1 1 0 2
#> 9 0 0 0 3
#> 10 1 0 1 3
str(final_a)
#> 'data.frame': 38945 obs. of 4 variables:
#> $ first : Factor w/ 2 levels "0","1": 1 2 1 2 1 2 1 2 1 2 ...
#> $ second: Factor w/ 2 levels "0","1": 1 1 2 2 1 1 2 2 1 1 ...
#> $ Freq : int 2 3 4 0 5 3 2 0 0 1 ...
#> $ id : int 1 1 1 1 2 2 2 2 3 3 ...
str(res6)
#> 'data.frame': 38949 obs. of 4 variables:
#> $ first : Factor w/ 2 levels "0","1": 1 2 1 2 1 2 1 2 1 2 ...
#> $ second: Factor w/ 2 levels "0","1": 1 1 2 2 1 1 2 2 1 1 ...
#> $ Freq : int 2 3 4 0 5 3 2 0 0 1 ...
#> $ id : int 1 1 1 1 2 2 2 2 3 3 ...
Created on 2022-12-11 with reprex v2.0.2 https://reprex.tidyverse.org
Edit
以下版本似乎更快。
parFun2 <- function(my_data, ncores) {
registerDoParallel(cl <- makeCluster(ncores))
on.exit(stopCluster(cl))
results_list <- split(my_data$results, my_data$id)
test <- foreach(i = seq_along(results_list)) %dopar% {
start_i_results <- results_list[[i]]
n <- length(start_i_results)
if(n > 1L) {
tbl <- table(first = start_i_results[-n],
second = start_i_results[-1L])
frame_i <- as.data.frame(tbl)
frame_i$id <- i
frame_i
} else NULL
}
data.table::rbindlist(test)
}