在处理更一般的示例之前,让我们首先放弃一些异步任务顺序执行的自然示例,将一个任务的结果作为下一个任务的参数传递。考虑:
func entireProcess() async throws {
let value = try await first()
let value2 = try await subsequent(with: value)
let value3 = try await subsequent(with: value2)
let value4 = try await subsequent(with: value3)
// do something with `value4`
}
Or
func entireProcess() async throws {
var value = try await first()
for _ in 0 ..< 4 {
value = try await subsequent(with: value)
}
// do something with `value`
}
这是声明一系列的最简单的方法async
函数,每个函数都将先前的结果作为下一次迭代的输入。因此,让我们扩展上述内容,包括 Instruments 的“兴趣点”工具的一些路标:
import os.log
private let log = OSLog(subsystem: "Test", category: .pointsOfInterest)
func entireProcess() async throws {
let id = OSSignpostID(log: log)
os_signpost(.begin, log: log, name: #function, signpostID: id, "start")
var value = try await first()
for _ in 0 ..< 4 {
os_signpost(.event, log: log, name: #function, "Scheduling: %d with input of %d", i, value)
value = try await subsequent(with: value)
}
os_signpost(.end, log: log, name: #function, signpostID: id, "%d", value)
}
func first() async throws -> Int {
let id = OSSignpostID(log: log)
os_signpost(.begin, log: log, name: #function, signpostID: id, "start")
try await Task.sleep(seconds: 1)
let value = 42
os_signpost(.end, log: log, name: #function, signpostID: id, "%d", value)
return value
}
func subsequent(with value: Int) async throws -> Int {
let id = OSSignpostID(log: log)
os_signpost(.begin, log: log, name: #function, signpostID: id, "%d", value)
try await Task.sleep(seconds: 1)
let newValue = value + 1
defer { os_signpost(.end, log: log, name: #function, signpostID: id, "%d", newValue) }
return newValue
}
因此,您会看到一系列请求,它们将其结果传递给后续请求。所有的os_signpost
路标的内容是这样我们可以直观地看到它们在 Instrument 的“兴趣点”工具中按顺序运行:
你可以看到ⓢ
计划每个任务时的事件路标,并且间隔说明了这些异步任务的顺序执行。
这是在任务之间建立依赖关系的最简单方法,将值从一个任务传递到另一个任务。
现在,问题是如何概括上述内容,即我们在开始下一个任务之前等待上一个任务。
一种模式是编写一个等待前一个演员结果的演员。考虑:
actor SerialTasks<Success> {
private var previousTask: Task<Success, Error>?
func add(block: @Sendable @escaping () async throws -> Success) {
previousTask = Task { [previousTask] in
let _ = await previousTask?.result
return try await block()
}
}
}
与前面的示例不同,这不需要您有一个函数来启动后续任务。例如,当某些单独的用户交互要求我将新任务添加到先前提交的任务列表的末尾时,我使用了上述方法。
上述演员有两个微妙但关键的方面:
-
The add
方法本身必须not是一个异步函数。我们需要避开演员可重入性 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#actor-reentrancy。如果这是一个async
函数(如您的示例中),我们将失去任务的顺序执行。
-
The Task
has a [previousTask]
捕获列表捕获先前任务的副本。这样,每个任务都会await
前一个,避免任何比赛。
上述可用于使一系列任务按顺序运行。但它本身并不是在任务之间传递值。我承认我已经使用了这种模式,我只需要顺序执行很大程度上独立的任务(例如,发送单独的命令发送给某些Process
)。但它可能适合您的场景,在该场景中您希望“与[后续请求]共享其响应”。
我建议您使用 MCVE 发布一个单独的问题,并提供一个实际示例,准确说明您想要从一个异步函数传递到另一个异步函数的内容。例如,我已经完成了上述的排列,将整数从一个任务传递到另一个任务。但在实践中,这并没有多大用处,因为当您开始处理异构结果解析的现实时,它会变得更加复杂。在实践中,我开始这个问题的简单例子是最实用的模式。
关于处理/围绕演员重入的更广泛问题,我建议密切关注SE-0306 - 未来方向 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#future-directions其中明确考虑了一些潜在的优雅的即将到来的替代方案。如果看到一些改进,无论是在语言本身还是在Swift 异步算法 https://github.com/apple/swift-async-algorithms图书馆。
tl;dr
我不想在上面讨论有关您的代码片段的内容,但存在很多问题。所以,如果你原谅我,这里有一些观察结果:
-
尝试使用OperationStatus
强制顺序执行async
通话无法进行,因为演员有可重入性 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#actor-reentrancy。如果你有一个async
功能,每次你点击一个await
,这是一个暂停点,在此点允许对该异步函数的另一个调用继续进行。您的诚信度OperationStatus
逻辑就会被违反。您不会遇到串行行为。
如果您对悬挂点感兴趣,我建议您观看 WWDC 2021 视频Swift 并发:幕后花絮 https://developer.apple.com/videos/play/wwdc2021/10254/.
-
The testConcurrency
正在打电话waitForExpectations
在它实际开始任何能够满足任何期望的任务之前。那总是会超时。
-
The testConcurrency
正在使用GCDconcurrentPerform
,反过来,它只是调度一个异步任务并立即返回。这违背了整个目的concurrentPerform
(这是一种运行一系列的节流机制同步并行任务,但不能超过 CPU 上的最大内核数)。此外,Swift 并发具有自己的模拟功能concurrentPerform
,即受约束的“协作线程池”(也在该讨论中讨论过)video https://developer.apple.com/videos/play/wwdc2021/10254/,IIRC),渲染concurrentPerform
在 Swift 并发领域已经过时了。
最重要的是,包含在内没有意义concurrentPerform
在 Swift 并发代码库中。使用它也没有意义concurrentPerform
启动异步任务(无论是 Swift 并发还是 GCD)。它用于并行启动一系列同步任务。
-
In execute
在您的测试中,您有两种执行路径,一种将等待某种状态更改并满足期望,而无需增加存储。这意味着您将失去一些增加该值的尝试。您的总计将与所需的结果值不匹配。现在,如果您的目的是在另一个请求待处理时放弃请求,那没问题。但我不认为那是你的意图。
-
回答你关于最后如何测试成功的问题。你可能会这样做:
actor Storage {
private var counter: Int = 0
func increment() {
counter += 1
}
var value: Int { counter }
}
func testConcurrency() async {
let storage = Storage()
let operation = OperationStatus()
let promise = expectation(description: "testConcurrency")
let finalCount = outerIterations * iterations
promise.expectedFulfillmentCount = finalCount
@Sendable func execute() async {
guard await !operation.isExecuting else {
await operation.waitUntilFinished()
promise.fulfill()
return
}
await operation.set(isExecuting: true)
try? await Task.sleep(seconds: 1)
await storage.increment()
await operation.set(isExecuting: false)
promise.fulfill()
}
// waitForExpectations(timeout: 10) // this is not where you want to wait; moved below, after the tasks started
// DispatchQueue.concurrentPerform(iterations: outerIterations) { _ in // no point in this
for _ in 0 ..< outerIterations {
for _ in 0 ..< iterations {
Task { await execute() }
}
}
await waitForExpectations(timeout: 10)
// test the success to see if the store value was correct
let value = await storage.value // to test that you got the right count, fetch the value; note `await`, thus we need to make this an `async` test
// Then
XCTAssertEqual(finalCount, value, "Count")
}
现在,此测试将因多种原因而失败,但希望这说明了您将如何验证测试的成功或失败。但是,请注意,这只会测试最终结果是否正确,而不是测试它们是否按顺序执行。事实是Storage
是一个演员将隐藏他们没有真正按顺序调用的事实。也就是说,如果您确实需要一个请求的结果来准备下一个请求,则此处不会进行测试。
-
如果在您经历此过程时,您想真正确认您的行为OperationStatus
模式,我建议使用os_signpost
间隔(或任务开始和结束的简单日志记录语句)。您将看到异步的单独调用execute
方法不是按顺序运行的。