我正在尝试在 go 中实现一个简单的工作池,但不断遇到问题。我想做的就是让一定数量的工人先完成一定数量的工作,然后再做更多的工作。我正在使用的代码类似于:
jobs := make(chan imageMessage, 1)
results := make(chan imageMessage, 1)
for w := 0; w < 2; w++ {
go worker(jobs, results)
}
for j := 0; j < len(images); j++ {
jobs <- imageMessage{path: paths[j], img: images[j]}
}
close(jobs)
for r := 0; r < len(images); r++ {
<-results
}
}
func worker(jobs <-chan imageMessage, results chan<- imageMessage) {
for j := range jobs {
processImage(j.path, j.img)
results <- j
}
}
我的理解是,这应该创建 2 个工人,一次可以做 1 件“事情”,并且当他们完成这 1 件事情时,他们将继续获得更多的工作,直到没有其他事情可做。但是,我得到fatal error: all goroutines are asleep - deadlock!
如果我将缓冲区设置为像 100 这样的大值,这可以工作,但我希望能够限制一次完成的工作。
我觉得我很接近,但显然缺少一些东西。
问题是你只是开始“耗尽”results
频道,一旦您成功发送了频道上的所有作业jobs
渠道。但为了让您能够发送所有作业,无论是jobs
通道必须有足够大的缓冲区,或者工作协程必须能够从中消耗作业。
但是,worker goroutine 在使用一项工作时,在执行下一项工作之前,会将结果发送到results
渠道。如果缓冲区的results
通道已满,发送结果会阻塞。
但最后一部分——工人 goroutine 在发送结果时被阻止——只能通过从results
通道 – 在您可以发送所有作业之前,您不需要这样做。死锁如果jobs
频道和results
频道不能容纳你所有的工作。这也解释了为什么如果将缓冲区大小增加到一个大值,它就会起作用:如果作业可以放入缓冲区,则不会发生死锁,并且在所有作业都成功发送后,您的最终循环将耗尽results
渠道。
解决方案?在它自己的 goroutine 中运行生成和发送作业,这样您就可以开始从results
“立即”通道,无需等待发送所有作业,这意味着工作协程在尝试发送结果时不会永远被阻塞:
go func() {
for j := 0; j < len(images); j++ {
jobs <- imageMessage{path: paths[j], img: images[j]}
}
close(jobs)
}()
尝试一下去游乐场 https://play.golang.org/p/jvitOGmzpl.
另请查看类似的实现这是 Go 中惯用的工作线程池吗? https://stackoverflow.com/questions/38170852/is-this-an-idiomatic-worker-thread-pool-in-go/38172204#38172204
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)