您应该使用结构化并发来适当地确定并发范围。如果您不这样做,您的协程可能会泄漏。就您而言,将它们的范围限定为单个消息的处理似乎是合适的。
这是一个例子:
/* I don't know Kafka, but let's pretend this function gets
* called when you receive a new message
*/
suspend fun onMessage(msg: Message) {
val ids: List<Int> = msg.getIds()
val jobs = ids.map { id ->
GlobalScope.launch { restService.post(id) }
}
jobs.joinAll()
}
如果其中一个调用restService.post(id)
如果因异常而失败,该示例将立即重新抛出异常,并且所有尚未完成的作业都将泄漏。他们将继续执行(可能无限期地),如果他们失败,你不会知道。
为了解决这个问题,您需要确定协程的范围。这是没有泄漏的相同示例:
suspend fun onMessage(msg: Message) = coroutineScope {
val ids: List<Int> = msg.getIds()
ids.forEach { id ->
// launch is called on "this", which is the coroutineScope.
launch { restService.post(id) }
}
}
在这种情况下,如果其中一个调用restService.post(id)
失败,那么协程范围内的所有其他未完成的协程将被取消。当您离开范围时,您可以确定您没有泄漏任何协程。
另外,因为coroutineScope
将等到所有子协程完成,您可以删除jobs.joinAll()
call.
边注:
编写启动某些协程的函数时的一个约定是让调用者使用接收者参数来决定协程范围。这样做与onMessage
函数可能如下所示:
fun CoroutineScope.onMessage(msg: Message): List<Job> {
val ids: List<Int> = msg.getIds()
return ids.map { id ->
// launch is called on "this", which is the coroutineScope.
launch { restService.post(id) }
}
}