发生的事情相当简单。如何正确编写它还不太清楚,我的建议是“不要这样做”。
首先,一个不是问题的小问题:
async let collectValues = await subject.values.reduce(into: []) { $0.append($1) }
你不应该使用await
这里。如果没有其他问题的话,这可能会成为一个问题。
根本问题是,如果没有订阅者,PassthroughSubject 就会丢弃消息。在您当前的代码中,这绝对会发生,但也很难修复。
// Taking out the extra `await`
async let collectValues = subject.values.reduce(into: []) { $0.append($1) }
// That line is pretty close to:
let collectValues = Task {
var values: [Int] = []
for await value in subject.values {
values.append(value)
}
return values
}
问题是这会启动一项可能不会立即启动的任务。所以你的下一行代码,subject.send(10)
没有订阅者(它甚至还没有到达for-await
线),然后就被扔掉了。
你可以通过添加一个来修复它try await Task.sleep(for: .seconds(1))
创建任务后,但没有多大帮助。 PassthroughSubject 上没有缓冲。当你打电话时append
,没有什么在听。该值将被丢弃,并且您将删除 20。
你可以通过缓冲来改善事情,但你仍然需要睡觉(我认为这是不可接受的)。尽管如此,以下内容对我来说非常可靠:
func test_collectingPassthroughValues() async throws {
// In the real test this is injected in to the unit under test.
let subject = PassthroughSubject<Int, Never>()
let readSubject = subject.buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)
async let collectValues = readSubject.values.reduce(into: []) { $0.append($1) }
try await Task.sleep(for: .seconds(1))
subject.send(10)
subject.send(20)
subject.send(completion: .finished)
// Await the values so we can check we got what's expected.
let values = await collectValues
XCTAssertEqual(values, [10, 20])
}
但在我看来,这是一种完全失败的方法。
我不会尝试将 PassthroughSubject 与.values
。我只是看不出有什么方法可以让它变得健壮。更广泛地说,我建议非常小心地混合组合和结构化并发。他们对于事情应该如何运作往往有非常不同的想法。