显然,如果您的任务没有任何await
或其他暂停点,您只需使用演员,而不是制作该方法async
,它会自动按顺序执行它们。
但是,在处理异步 Actor 方法时,必须认识到 Actor 是可重入的(请参阅SE-0306:演员 - 演员重入 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#actor-reentrancy)。如果您确实尝试串行运行一系列异步任务,您将需要手动让每个后续任务等待前一个任务。例如。,
actor Foo {
private var previousTask: Task<Void, Error>?
func add(block: @Sendable @escaping () async throws -> Void) {
previousTask = Task { [previousTask] in
let _ = await previousTask?.result
return try await block()
}
}
}
上述内容有两个微妙的方面:
-
我使用的捕获列表[previousTask]
确保获得先前任务的副本。
-
我表演await previousTask?.value
inside新任务,而不是之前的任务。
如果您在创建新任务之前等待,则会发生竞赛,如果您启动三个任务,第二个和第三个任务都将等待first任务,即第三个任务不等待第二个任务。
而且,也许不用说,因为这是在一个 actor 内,所以它避免了对分离任务的需要,同时保持主线程空闲。
注意,使用时非结构化并发 https://docs.swift.org/swift-book/documentation/the-swift-programming-language/concurrency#Unstructured-Concurrency (i.e., Task {…}
or Task.detached {…}
),您自行承担处理取消的责任,例如使用withTaskCancellationHandler https://developer.apple.com/documentation/swift/withtaskcancellationhandler(operation:oncancel:)/:
actor Foo<Value: Sendable> {
private var previousTask: Task<Value, Error>?
func add(block: @Sendable @escaping () async throws -> Value) async throws -> Value {
let task = Task { [previousTask] in
try await withTaskCancellationHandler {
let _ = try await previousTask?.value
} onCancel: {
previousTask?.cancel()
}
return try await block()
}
previousTask = task
return try await withTaskCancellationHandler {
try await task.value
} onCancel: {
task.cancel()
}
}
}
我还将其扩展为可能返回值的块。
因此,例如,我在这里添加了四个任务(即Task.sleep
两秒然后返回一个随机值):
或者,如果您在第三个任务中途取消第四个任务:
(不用说,这假设你添加的任务支持取消,抛出CancellationError https://developer.apple.com/documentation/swift/cancellationerror如果取消等。标准Apple API,例如URLSession
,执行所有这些操作,但正如您所看到的,如果引入非结构化并发,则需要小心。)
上面的内容有点脆弱,所以我可能建议异步序列(例如,任何符合AsyncSequence https://developer.apple.com/documentation/swift/asyncsequence/协议,例如AsyncStream https://developer.apple.com/documentation/swift/asyncstream或您自己的自定义异步序列),这也可以为您提供串行行为。
Or, AsyncChannel https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md from Swift 异步算法 https://github.com/apple/swift-async-algorithms是处理触发某些代码块的串行执行的请求管道的另一种好方法。
例如,这是一个使用的串行下载管理器AsyncChannel
和一个简单的for
-await
-in
循环实现串行行为:
actor SerialDownloadManager {
static let shared = SerialDownloadManager()
private let session: URLSession = …
private let urls = AsyncChannel<URL>()
private init() {
Task { try await startDownloader() }
}
// this sends URLs on the channel
func append(_ url: URL) async {
await urls.send(url)
}
}
private extension SerialDownloadManager {
func startDownloader() async throws {
let folder = try FileManager.default
.url(for: .applicationSupportDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
.appending(component: "downloads")
try? FileManager.default.createDirectory(at: folder, withIntermediateDirectories: true)
// this consumes the URLs on the channel
for await url in urls {
// if you want to observe in "points of interest"
//
// let id = OSSignpostID(log: poi)
// os_signpost(.begin, log: poi, name: "Download", signpostID: id, "%{public}@", url.lastPathComponent)
// defer { os_signpost(.end, log: poi, name: "Download", signpostID: id) }
// download
let (location, response) = try await self.session.download(from: url, delegate: nil)
if let response = response as? HTTPURLResponse, 200 ..< 300 ~= response.statusCode {
let destination = folder.appending(component: url.lastPathComponent)
try? FileManager.default.removeItem(at: destination)
try FileManager.default.moveItem(at: location, to: destination)
}
}
}
}
然后你可以做这样的事情:
func appendUrls() async {
for i in 0 ..< 10 {
await SerialDownloadManager.shared.append(baseUrl.appending(component: "\(i).jpg"))
}
}
产量:
或者,如果您愿意,您可以允许任务组进行受限并发,例如,一次执行 4 个任务:
actor DownloadManager {
static let shared = DownloadManager()
private let session: URLSession = …
private let urls = AsyncChannel<URL>()
private var count = 0
private let maxConcurrency = 4 // change to 1 for serial downloads, but 4-6 is a good balance between benefits of concurrency, but not overtaxing server
private init() {
Task {
do {
try await startDownloader()
} catch {
logger.error("\(error, privacy: .public)")
}
}
}
func append(_ url: URL) async {
await urls.send(url)
}
}
private extension DownloadManager {
func startDownloader() async throws {
let folder = try FileManager.default
.url(for: .applicationSupportDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
.appending(component: "downloads")
try? FileManager.default.createDirectory(at: folder, withIntermediateDirectories: true)
try await withThrowingTaskGroup(of: Void.self) { group in
for await url in urls {
count += 1
if count > maxConcurrency { try await group.next() }
group.addTask {
// if you want to observe in "points of interest"
//
// let id = OSSignpostID(log: poi)
// os_signpost(.begin, log: poi, name: "Download", signpostID: id, "%{public}@", url.lastPathComponent)
// defer { os_signpost(.end, log: poi, name: "Download", signpostID: id) }
// download
let (location, response) = try await self.session.download(from: url, delegate: nil)
if let response = response as? HTTPURLResponse, 200 ..< 300 ~= response.statusCode {
let destination = folder.appending(component: url.lastPathComponent)
try? FileManager.default.removeItem(at: destination)
try FileManager.default.moveItem(at: location, to: destination)
}
}
}
try await group.waitForAll()
}
}
}
产量:
有关异步序列的更多信息,一般而言,请参阅 WWDC 2021 视频认识异步序列 https://developer.apple.com/videos/play/wwdc2021/10058/.