使 Swift 并发中的任务串行运行

2024-03-21

我有一个基于文档的应用程序,它使用结构作为其主要数据/模型。由于模型是(的子类)的属性NSDocument需要从主线程访问它。到目前为止一切都很好。

但对数据的某些操作可能需要相当长的时间,我想为用户提供一个进度条。这就是问题开始的地方。特别是当用户从 GUI 快速连续启动两个操作时。

如果我在模型上同步运行操作(或以“正常”方式)Task {})我得到了正确的串行行为,但主线程被阻止,因此我无法显示进度条。 (选项A)

如果我在模型上运行操作Task.detached {}我可以更新进度条,但根据模型上操作的运行时间,用户的第二个操作可能会在第一个操作之前完成,从而导致模型的无效/意外状态。这是由于await分离任务中需要的语句(我认为)。 (选项B)。

所以我想要 a) 释放主线程来更新 GUI,b) 确保每个任务在另一个(排队的)任务开始之前运行完全完成。使用后台串行调度队列很有可能实现这一点,但我正在尝试切换到新的 Swift 并发系统,该系统也用于在访问模型之前执行任何准备工作。

我尝试使用全局演员,因为这似乎是某种串行后台队列,但它也需要await声明。尽管模型中出现意外状态的可能性降低了,但这仍然是可能的。

我写了一些小代码来演示这个问题:

该模型:

struct Model {
    var doneA = false
    var doneB = false

    mutating func updateA() {
        Thread.sleep(forTimeInterval: 5)
        doneA = true
    }

    mutating func updateB() {
        Thread.sleep(forTimeInterval: 1)
        doneB = true
    }
}

和文件(省略标准NSDocument覆盖):

@globalActor
struct ModelActor {
    actor ActorType { }

    static let shared: ActorType = ActorType()
}

class Document: NSDocument {
    var model = Model() {
        didSet {
            Swift.print(model)
        }
    }

    func update(model: Model) {
        self.model = model
    }

    @ModelActor
    func updateModel(with operation: (Model) -> Model) async {
        var model = await self.model
        model = operation(model)
        await update(model: model)
    }

    @IBAction func operationA(_ sender: Any?) {
        //Option A
//        Task {
//            Swift.print("Performing some A work...")
//            self.model.updateA()
//        }

        //Option B
//        Task.detached {
//            Swift.print("Performing some A work...")
//            var model = await self.model
//            model.updateA()
//            await self.update(model: model)
//        }

        //Option C
        Task.detached {
            Swift.print("Performing some A work...")
            await self.updateModel { model in
                var model = model
                model.updateA()
                return model
            }
        }
    }

    @IBAction func operationB(_ sender: Any?) {
        //Option A
//        Task {
//            Swift.print("Performing some B work...")
//            self.model.updateB()
//        }

        //Option B
//        Task.detached {
//            Swift.print("Performing some B work...")
//            var model = await self.model
//            model.updateB()
//            await self.update(model: model)
//        }

        //Option C
        Task.detached {
            Swift.print("Performing some B work...")
            await self.updateModel { model in
                var model = model
                model.updateB()
                return model
            }
        }
    }
}

单击“操作 A”,然后单击“操作 B”应该会生成一个包含两个的模型true的。但情况并非总是如此。

有没有办法确保操作 A 在我进行操作 B 之前完成并让主线程可用于 GUI 更新?

EDIT根据罗布的回答,我得出以下结论。我这样修改它是因为我可以等待创建的操作并向原始调用者报告任何错误。我认为通过将所有代码包含在一个单一的代码中更容易理解正在发生的事情update函数,所以我选择执行独立任务而不是actor。我还从任务中返回中间模型,否则可能会使用旧模型。

class Document {
    func updateModel(operation: @escaping (Model) throws -> Model) async throws {
        //Update the model in the background
        let modelTask = Task.detached { [previousTask, model] () throws -> Model in
            var model = model

            //Check whether we're cancelled
            try Task.checkCancellation()

            //Check whether we need to wait on earlier task(s)
            if let previousTask = previousTask {
                //If the preceding task succeeds we use its model
                do {
                    model = try await previousTask.value
                } catch {
                    throw CancellationError()
                }
            }

            return try operation(model)
        }


        previousTask = modelTask
        defer { previousTask = nil } //Make sure a later task can always start if we throw

        //Wait for the operation to finish and store the model
        do {
            self.model = try await modelTask.value
        } catch {
            if error is CancellationError { return }
            else { throw error }
        }
    }
}

呼叫方:

@IBAction func operationA(_ sender: Any?) {
    //Option D
    Task {
        do {
            try await updateModel { model in
                var model = model
                model.updateA()
                return model
            }
        } catch {
            presentError(error)
        }
    }
}

它似乎做了我需要的任何事情,即对文档上的属性进行排队更新,可以等待并返回错误,就像所有事情都发生在主线程上一样。 唯一的缺点似乎是在调用方,由于需要使模型成为一个闭包,所以闭包非常冗长。var并明确返回它。


显然,如果您的任务没有任何await或其他暂停点,您只需使用演员,而不是制作该方法async,它会自动按顺序执行它们。

但是,在处理异步 Actor 方法时,必须认识到 Actor 是可重入的(请参阅SE-0306:演员 - 演员重入 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#actor-reentrancy)。如果您确实尝试串行运行一系列异步任务,您将需要手动让每个后续任务等待前一个任务。例如。,

actor Foo {
    private var previousTask: Task<Void, Error>?

    func add(block: @Sendable @escaping () async throws -> Void) {
        previousTask = Task { [previousTask] in
            let _ = await previousTask?.result

            return try await block()
        }
    }
}

上述内容有两个微妙的方面:

  1. 我使用的捕获列表[previousTask]确保获得先前任务的副本。

  2. 我表演await previousTask?.value inside新任务,而不是之前的任务。

    如果您在创建新任务之前等待,则会发生竞赛,如果您启动三个任务,第二个和第三个任务都将等待first任务,即第三个任务不等待第二个任务。

而且,也许不用说,因为这是在一个 actor 内,所以它避免了对分离任务的需要,同时保持主线程空闲。


注意,使用时非结构化并发 https://docs.swift.org/swift-book/documentation/the-swift-programming-language/concurrency#Unstructured-Concurrency (i.e., Task {…} or Task.detached {…}),您自行承担处理取消的责任,例如使用withTaskCancellationHandler https://developer.apple.com/documentation/swift/withtaskcancellationhandler(operation:oncancel:)/:

actor Foo<Value: Sendable> {
    private var previousTask: Task<Value, Error>?

    func add(block: @Sendable @escaping () async throws -> Value) async throws -> Value {
        let task = Task { [previousTask] in
            try await withTaskCancellationHandler {
                let _ = try await previousTask?.value
            } onCancel: {
                previousTask?.cancel()
            }

            return try await block()
        }

        previousTask = task

        return try await withTaskCancellationHandler {
            try await task.value
        } onCancel: {
            task.cancel()
        }
    }
}

我还将其扩展为可能返回值的块。

因此,例如,我在这里添加了四个任务(即Task.sleep两秒然后返回一个随机值):

或者,如果您在第三个任务中途取消第四个任务:

(不用说,这假设你添加的任务支持取消,抛出CancellationError https://developer.apple.com/documentation/swift/cancellationerror如果取消等。标准Apple API,例如URLSession,执行所有这些操作,但正如您所看到的,如果引入非结构化并发,则需要小心。)


上面的内容有点脆弱,所以我可能建议异步序列(例如,任何符合AsyncSequence https://developer.apple.com/documentation/swift/asyncsequence/协议,例如AsyncStream https://developer.apple.com/documentation/swift/asyncstream或您自己的自定义异步序列),这也可以为您提供串行行为。

Or, AsyncChannel https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md from Swift 异步算法 https://github.com/apple/swift-async-algorithms是处理触发某些代码块的串行执行的请求管道的另一种好方法。

例如,这是一个使用的串行下载管理器AsyncChannel和一个简单的for-await-in循环实现串行行为:

actor SerialDownloadManager {
    static let shared = SerialDownloadManager()

    private let session: URLSession = …
    private let urls = AsyncChannel<URL>()

    private init() {
        Task { try await startDownloader() }
    }

    // this sends URLs on the channel

    func append(_ url: URL) async {
        await urls.send(url)
    }
}

private extension SerialDownloadManager {
    func startDownloader() async throws {
        let folder = try FileManager.default
            .url(for: .applicationSupportDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
            .appending(component: "downloads")

        try? FileManager.default.createDirectory(at: folder, withIntermediateDirectories: true)

        // this consumes the URLs on the channel

        for await url in urls {
            // if you want to observe in "points of interest"
            //
            // let id = OSSignpostID(log: poi)
            // os_signpost(.begin, log: poi, name: "Download", signpostID: id, "%{public}@", url.lastPathComponent)
            // defer { os_signpost(.end, log: poi, name: "Download", signpostID: id) }

            // download

            let (location, response) = try await self.session.download(from: url, delegate: nil)

            if let response = response as? HTTPURLResponse, 200 ..< 300 ~= response.statusCode {
                let destination = folder.appending(component: url.lastPathComponent)
                try? FileManager.default.removeItem(at: destination)
                try FileManager.default.moveItem(at: location, to: destination)
            }
        }
    }
}

然后你可以做这样的事情:

func appendUrls() async {
    for i in 0 ..< 10 {
        await SerialDownloadManager.shared.append(baseUrl.appending(component: "\(i).jpg"))
    }
}

产量:

或者,如果您愿意,您可以允许任务组进行受限并发,例如,一次执行 4 个任务:

actor DownloadManager {
    static let shared = DownloadManager()

    private let session: URLSession = …
    private let urls = AsyncChannel<URL>()
    private var count = 0
    private let maxConcurrency = 4       // change to 1 for serial downloads, but 4-6 is a good balance between benefits of concurrency, but not overtaxing server

    private init() {
        Task {
            do {
                try await startDownloader()
            } catch {
                logger.error("\(error, privacy: .public)")
            }
        }
    }

    func append(_ url: URL) async {
        await urls.send(url)
    }
}

private extension DownloadManager {
    func startDownloader() async throws {
        let folder = try FileManager.default
            .url(for: .applicationSupportDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
            .appending(component: "downloads")

        try? FileManager.default.createDirectory(at: folder, withIntermediateDirectories: true)

        try await withThrowingTaskGroup(of: Void.self) { group in
            for await url in urls {
                count += 1
                if count > maxConcurrency { try await group.next() }

                group.addTask {
                    // if you want to observe in "points of interest"
                    //
                    // let id = OSSignpostID(log: poi)
                    // os_signpost(.begin, log: poi, name: "Download", signpostID: id, "%{public}@", url.lastPathComponent)
                    // defer { os_signpost(.end, log: poi, name: "Download", signpostID: id) }

                    // download

                    let (location, response) = try await self.session.download(from: url, delegate: nil)

                    if let response = response as? HTTPURLResponse, 200 ..< 300 ~= response.statusCode {
                        let destination = folder.appending(component: url.lastPathComponent)
                        try? FileManager.default.removeItem(at: destination)
                        try FileManager.default.moveItem(at: location, to: destination)
                    }
                }
            }

            try await group.waitForAll()
        }
    }
}

产量:

有关异步序列的更多信息,一般而言,请参阅 WWDC 2021 视频认识异步序列 https://developer.apple.com/videos/play/wwdc2021/10058/.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使 Swift 并发中的任务串行运行 的相关文章

  • iOS 上的本地通知需要用户许可吗?

    我在我的应用程序中使用 UILocalNotification 来安排通知 通知工作正常 并在我需要时显示 我对此没有意见 我没有做任何远程 推送通知 让我想知道的是 我从未见过您通常在多个应用程序中看到的推送通知的著名权限对话框 我什至重
  • 设置属性文本后防止 UILabel 字体自动更改

    我发现如果我将属性文本设置为UILabel 预定义字体将更改为属性文本第一个字符的字体 例如 the font size is set to 20 in Interface Builder println theLabel font poi
  • 如何并行运行一组函数并等待完成结果?

    我需要同时异步运行一组繁重的函数并将结果填充到列表中 这是伪代码 List
  • 如何停止 dbus gobject 循环

    我试图阻止gobject MainLoop 几秒钟后 我不知道是否可以为这种循环设置超时 这将是完美的 但我还没有找到 因此 我尝试通过线程来解决这个问题 但不幸的是 主循环阻塞了其他线程 这是我的代码 我正在使用 python 2 7 i
  • Swift 3 中来自路径或文件名的 UIImage

    我的应用程序的文档目录中有一堆图像 我想将其中之一加载到我认为的 UIImage 中 这就是我所做的 myImage image UIImage named image jpg the file exist but this returns
  • Spring Batch 多线程

    我正在编写一个 Spring Batch 并希望在需要时对其进行扩展 我的 ApplicationContext 看起来像这样 Configuration EnableBatchProcessing EnableTransactionMan
  • 创建一个扩展来从 Swift 中的数组中过滤 nils

    我正在尝试编写一个 Array 扩展 它将允许可选 T 数组转换为非可选 T 数组 例如这可以写成一个自由函数 如下所示 func removeAllNils array T gt T return array filter 0 nil r
  • 如果两个线程同时访问同一个 bool 变量会发生什么?

    我有一个跨平台 C 程序 其中使用 boost 库创建异步计时器 我有一个全局变量 bool receivedInput false 一个线程等待并处理输入 string argStr while 1 getline cin argStr
  • 如果在系统设置中关闭隐藏式字幕,则不会显示字幕

    我正在尝试显示字幕 无论设备在辅助功能下设置了什么 目前 如果设备设置为英语并在设置中启用隐藏式字幕 则将播放英语字幕 如果设备设置为西班牙语 则将播放西班牙语字幕 我希望无论隐藏式字幕是否打开 都能播放字幕 我尝试添加这段代码 https
  • Handler、MessageQueue、Looper,它们都是运行在UI线程上的吗?

    我正在尝试解决线程问题 并且我知道我可能会使用Handler将消息 可运行对象发布到MessageQueue 这又被Looper并发送回Handler进行加工 如果我发帖到Handler在我的活动中 是Activity Handler Me
  • Alamofire 2.0 和 Swift 2 - 标头不起作用。看看如何修复它

    当我将项目升级到swift 2 with Alamofire 2 headers停止工作 代码中没有任何错误 原因是headers不按旧方式工作 login with Alamofire 1 and Swift 1 2 WITH HEADE
  • UITableView 干扰状态栏

    我正在开发一个具有 UITableViewController 的应用程序 该应用程序显示值列表 如下所示 如何将表格向下移动 使其不会与状态栏冲突 看来我无法对这个场景施加任何限制 所以我不知所措 使用以下 3 个属性UIViewCont
  • 使用随机初始密钥解码字典

    我正在接收并尝试解析包含事件数据的 json 文件 它是一个字典的字典 其组织方式如下 其中每个事件的键都是随机事件 id 19374176 122 event title Cool Fun Thing to Do description
  • xcode 9 中的 Facebook 登录按钮终止

    我正在使用 Xcode 9 并将编译器设置为 Swift 3 2 我使用 Cocoapods 安装了 Facebook Login 一切都编译没有任何错误 但每当我在模拟器中运行它时 我单击登录按钮 它就会崩溃 控制台中仅打印以下内容 li
  • Java 中的 64 位赋值在 32 位机器上是原子的吗?

    如果我有这样的代码 long x x 0xFFFFFFFFL 如果我在 32 位机器上运行此代码 它是否保证是原子的 或者读取 x 的不同线程是否可能获得不完整 垃圾值 这是简短的摘要 作为参考 读 写是ALWAYS原子 即使在 64 位实
  • Interlocked 类可以安全地与 lock() 混合吗?

    当您将互锁操作与 lock 和其他更高级别的锁 混合使用时 是否可以保证原子读取 我对混合这样的锁定机制时的一般行为以及 Int32 和 Int64 之间的任何差异感兴趣 private Int64 count private object
  • Health 处理多个步骤源的方式与 HealthKit 不同——swift

    我的 Swift iOS 应用程序与 HealthKit 连接 向用户显示他们当天到目前为止已经走了多少步 在大多数情况下 这是成功的 当步数的唯一来源是 iPhone 内置计步器功能记录的步数时 一切正常 并且我的应用程序显示的步数与健康
  • Core Data + CloudKit 无法在其他设备上自动刷新?

    我的 SwiftUI 应用程序与 Core Data CloudKit 一起使用 我可以从 Mac 或 iPhone 将新条目保存到数据库中 该应用程序对所有设备使用完全相同的项目 代码库 唯一的问题是我需要关闭应用程序并重新打开它才能查看
  • 带 cookie 的 Alamofire 请求

    我是初学者 我不知道如何使用 Alamofire 发出 GET 请求 但它需要身份验证 我设法用其他网络服务 登录 来做到这一点 因为它需要参数参数 parameters username username password passwor
  • Python 线程与 Linux 中的多处理

    基于此question https stackoverflow com questions 807506 threads vs processes in linux我假设创建新流程应该几乎和创造新线程在Linux中 然而 很少的测试显示出截

随机推荐

  • 如何使用 argparse 将列表作为命令行参数传递?

    我正在尝试将列表作为参数传递给命令行程序 有没有一个argparse https docs python org 3 library argparse html将列表作为选项传递的选项 parser add argument l list
  • 为什么 Pyglet 不能正确绘制多边形?

    我随机创建点用于使用 Pyglet 绘制多边形 但 Pyglet 大多数时候都不能正确完成工作 好吧 我尝试用另一个图形模块绘制多边形 实际上它有效 但如果 Pyglet 工作正常 它会让我的工作变得更容易 我用它来绘制多边形和点 以方便您
  • 无需插件的 jQuery 链式动画

    在使用 jQuery 之前 我可以做一个带有延迟的链式动画 如下所示 element delay 45 animate 45 delay 45 animate 45 delay 45 animate 45 现在 自从更新到 v1 6 1 以
  • 子查询是邪恶的吗?

    这个问题是在一位朋友的评论之后提出的 他说 当一个查询有很多子查询时 就表明数据库存在设计缺陷 必须避免它们 他还表示 许多书籍都提出了同样的建议 我部分同意 但我认为这些查询具有复杂的逻辑 需要大量子查询 或者为了避免子查询 查询的物化视
  • 带 read 和 IFS 的 Bash while 循环

    我必须解析以下格式的文件 line1 param1 line1 param2 line1 param3 line1 param2 line2 param2 line2 param3 line1 param3 line3 param2 lin
  • 我可以在 python 中“伪造”一个包(或至少一个模块)以用于测试目的吗?

    我想用 python 伪造一个包 我想定义一些东西以便代码可以做 from somefakepackage morefakestuff import somethingfake somefakepackage 是在代码中定义的 其下面的所有
  • 如何处理drive api的最大导出限制大小文件

    我正在尝试下载一些 google doc 文件 但之后我需要使用导出方法转换为 microsoft word mimetype 它工作正常 直到找到一个大小超过 10 mb 的文件 api 文档说这是限制导出文档的大小 但我确实需要下载这些
  • 在 ElasticSearch 中获取 SearchResponse 的结果

    我正在尝试使用 ES 作为我的 MongoDB 的索引 我已经成功地集成了它们 但我发现搜索 API 相当复杂且令人困惑 Java API 也没有太大帮助 我能够找到完全匹配的内容 但是如何才能得到这个结果呢 这是我的代码 Node nod
  • JAX-WS Web 服务线程安全和性能问题

    我从其他一些帖子以及我对 JAX WS Web 服务的理解中了解到它们不是线程安全的 我的 Web 服务将被 100 个客户端调用 我们需要能够每秒处理大约 200 个事务 我的网络服务将与数据库交互以执行其工作 如果我在访问数据库的代码周
  • WebBrowser 控制会话中下载文件

    我在用着WebBrowser control浏览登录页面并下载文件 由于我找不到使用我正在使用的控件自动管理下载的方法WebClient类来尝试实现这一目标 问题是自从WebClient与浏览器不在同一上下文 会话中 我下载的只是安全错误屏
  • 如何使用树表显示 Oracle SQL 表中的所有行?

    我有这张表 为每个区域创建表 id area INT 主键 名称 VARCHAR2 200 id areapadre INT 引用 perarea id area 添加以下内容是为了访问数据 我的目的是创建一个层次结构 在树视图中显示区域及
  • 从 python 中的文本文件中读取特定列

    我有一个文本文件 其中包含一个由数字组成的表格 例如 5 10 6 6 20 1 7 30 4 8 40 3 9 23 1 4 13 6 例如 如果我想要仅包含在第二列中的数字 我如何将该列提取到列表中 f open file r line
  • MPI 中的相同发送和接收缓冲区

    在我的代码中 每个进程都作用于数组的特定部分 我希望每个进程将其处理的部分发送到其他进程 并从其他进程接收其他部分 为此我使用了MPI Allgatherv但我保持发送和接收缓冲区相同 MPI Allgatherv vel 0 localS
  • 如何将自定义的 Firefox 配置文件与 PHPUnit 的 Webdriver 框架结合使用?

    我知道使用 selenium RC 我曾经传递一个命令行操作符 firefoxProfileTemplate 这样就可以了 现在使用 Selenium2 Webdriver 这似乎不再起作用了 由于我正在使用 PHPUnit 进行其他测试
  • AWS Step Functions 是否登录 CloudWatch

    我想知道 AWS 步骤函数执行的输出是否记录在 CloudWatch 日志组中 我是not讨论由 step 函数调用的 lambda 函数的输出 我对状态机本身的输出感兴趣 我问这个问题是因为我们通常将所有日志集中在 loggly 中 以便
  • 是否可以将代码隐藏分成多个部分文件?

    我有一个带有 aspx cs 代码隐藏的 aspx Web 表单 隐藏的代码有近 2000 行长 而且已经到了轻松导航的唯一方法就是在各部分之间放置大量空格 缩小以便我可以看到代码的物理外观 并且然后放大我要编辑的地方 换句话说 这是一个很
  • 使用 Qt Designer 调整 Qt 拆分器布局大小行为

    我在 Qt 中通过拖放制作的视图中存在尺寸问题 让我从一张图片开始来帮助我解释 这是我的表单的主窗口 发生的情况是 我们有 4 个选项卡小部件 左侧选项卡小部件有一个到 2 个中间小部件的水平分割器 2 个中间小部件有一个垂直分离器 左侧和
  • 如何检测请求是否被中止?

    我正在提出请求 然后立即中止 var x get url function d e xhr alert d x abort 问题是它执行success函数并返回空数据 这里的例子 http jsfiddle net e5NBT 有没有 jQ
  • 将 ASM 转换为 C(不是逆向工程)

    我用谷歌搜索 发现数量惊人的轻率回复 基本上都是在嘲笑提出这样问题的提问者 Microchip 免费提供一些源代码 我不想将其发布在这里 以防万一 基本上 谷歌 AN937 单击第一个链接 其中有一个 源代码 链接及其压缩文件 它在 ASM
  • 使 Swift 并发中的任务串行运行

    我有一个基于文档的应用程序 它使用结构作为其主要数据 模型 由于模型是 的子类 的属性NSDocument需要从主线程访问它 到目前为止一切都很好 但对数据的某些操作可能需要相当长的时间 我想为用户提供一个进度条 这就是问题开始的地方 特别