这是我到目前为止所想到的。
问题澄清
问题不清楚。没有这样的东西并行序列我可能把它们搞混了Java并行流。我的意思是一个序列同时消耗.
序列是同步的
正如 @LouisWasserman 在评论中指出的那样,序列并不是为并行执行而设计的。特别是SequenceBuilder
注释为@RestrictSuspension
。引用自Kotlin 协程 repo:
这意味着在其范围内 lambda 的任何 SequenceBuilder 扩展都不能调用 suspendContinuation 或其他通用挂起函数
话虽如此,正如 @MarkoTopolnik 评论的那样,它们仍然可以像任何其他对象一样在并行程序中使用。
并行使用的序列
作为示例,这是并行使用序列的第一次尝试
fun launchProcessor(id: Int, iterator: Iterator<Int>) = launch {
println("[${Thread.currentThread().name}] Processor #$id received ${iterator.next()}")
}
fun main(args: Array<String>) {
val s = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
runBlocking {
val iterator = s.iterator()
repeat(10) { launchProcessor(it, iterator) }
}
}
此代码打印:
[ForkJoinPool.commonPool-worker-2] 处理器 #1 收到 1
[ForkJoinPool.commonPool-worker-1] 处理器 #0 收到 0
[ForkJoinPool.commonPool-worker-3] 处理器 #2 收到 2
[ForkJoinPool.commonPool-worker-2] 处理器 #3 收到 3
[ForkJoinPool.commonPool-worker-1] 处理器 #4 收到 3
[ForkJoinPool.commonPool-worker-3] 处理器 #5 收到 3
[ForkJoinPool.commonPool-worker-1] 处理器 #7 收到 5
[ForkJoinPool.commonPool-worker-2] 处理器 #6 收到 4
[ForkJoinPool.commonPool-worker-1] 处理器 #9 收到 7
[ForkJoinPool.commonPool-worker-3] 处理器 #8 收到 6
哪个当然是不是我们想要的。因为有些数字会被消耗两次。
进入频道
另一方面,如果我们要使用通道,我们可以编写如下内容:
fun produceNumbers() = produce {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
channel.consumeEach {
println("[${Thread.currentThread().name}] Processor #$id received $it")
}
}
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(1000)
producer.cancel() // cancel producer coroutine and thus kill them all
}
那么输出是:
[ForkJoinPool.commonPool-worker-2] 处理器 #0 收到 1
[ForkJoinPool.commonPool-worker-2] 处理器 #0 收到 2
[ForkJoinPool.commonPool-worker-1] 处理器 #1 收到 3
[ForkJoinPool.commonPool-worker-2] 处理器 #2 收到 4
[ForkJoinPool.commonPool-worker-1] 处理器 #3 收到 5
[ForkJoinPool.commonPool-worker-2] 处理器 #4 收到 6
[ForkJoinPool.commonPool-worker-2] 处理器 #0 收到 7
[ForkJoinPool.commonPool-worker-1] 处理器 #1 收到 8
[ForkJoinPool.commonPool-worker-1] 处理器 #2 收到 9
[ForkJoinPool.commonPool-worker-2] 处理器 #3 收到 10
此外,我们可以实施takeWhileInclusive
像这样的频道方法:
fun <E> ReceiveChannel<E>.takeWhileInclusive(
context: CoroutineContext = Unconfined,
predicate: suspend (E) -> Boolean
): ReceiveChannel<E> = produce(context) {
var shouldContinue = true
consumeEach {
val currentShouldContinue = shouldContinue
shouldContinue = predicate(it)
if (!currentShouldContinue) return@produce
send(it)
}
}
它按预期工作。