如何暂停后续任务直到第一次完成然后与等待的任务共享其响应?

2023-12-27

我有一个actor它以第一个请求暂停后续请求直到完成的方式限制请求,然后与它们共享其响应,这样它们就不必发出相同的请求。

这就是我想做的:

let cache = Cache()
let operation = OperationStatus()

func execute() async {
    if await operation.isExecuting else {
        await operation.waitUntilFinished()
    } else {
        await operation.set(isExecuting: true)
    }

    if let data = await cache.data {
        return data
    }

    let request = myRequest()
    let response = await myService.send(request)
    await cache.set(data: response)

    await operation.set(isExecuting: false)
}

actor Cache {
    var data: myResponse?

    func set(data: myResponse?) {
        self.data = data
    }
}

actor OperationStatus {
    @Published var isExecuting = false
    private var cancellable = Set<AnyCancellable>()

    func set(isExecuting: Bool) {
        self.isExecuting = isExecuting
    }

    func waitUntilFinished() async {
        guard isExecuting else { return }

        return await withCheckedContinuation { continuation in
            $isExecuting
                .first { !$0 } // Wait until execution toggled off
                .sink { _ in continuation.resume() }
                .store(in: &cancellable)
        }
    }
}

// Do something
DispatchQueue.concurrentPerform(iterations: 1_000_000) { _ in execute() }

这确保一次一个请求,后续调用将等待直到完成。看起来这可行,但想知道是否有一种纯粹的并发方式而不是混合Combine,我该如何测试这个?这是我开始的测试,但我很困惑如何测试它:

final class OperationStatusTests: XCTestCase {
    private let iterations = 10_000 // 1_000_000
    private let outerIterations = 10

    actor Storage {
        var counter: Int = 0

        func increment() {
            counter += 1
        }
    }

    func testConcurrency() {
        // Given
        let storage = Storage()
        let operation = OperationStatus()
        let promise = expectation(description: "testConcurrency")
        promise.expectedFulfillmentCount = outerIterations * iterations

        @Sendable func execute() async {
            guard await !operation.isExecuting else {
                await operation.waitUntilFinished()
                promise.fulfill()
                return
            }

            await operation.set(isExecuting: true)
            try? await Task.sleep(seconds: 8)
            await storage.increment()
            await operation.set(isExecuting: false)
            promise.fulfill()
        }

        waitForExpectations(timeout: 10)

        // When
        DispatchQueue.concurrentPerform(iterations: outerIterations) { _ in
            (0..<iterations).forEach { _ in
                Task { await execute() }
            }
        }

        // Then
        // XCTAssertEqual... how to test?
    }
}

在处理更一般的示例之前,让我们首先放弃一些异步任务顺序执行的自然示例,将一个任务的结果作为下一个任务的参数传递。考虑:

func entireProcess() async throws {
    let value = try await first()

    let value2 = try await subsequent(with: value)
    let value3 = try await subsequent(with: value2)
    let value4 = try await subsequent(with: value3)

    // do something with `value4`
}

Or

func entireProcess() async throws {
    var value = try await first()

    for _ in 0 ..< 4 {
        value = try await subsequent(with: value)
    }

    // do something with `value`
}

这是声明一系列的最简单的方法async函数,每个函数都将先前的结果作为下一次迭代的输入。因此,让我们扩展上述内容,包括 Instruments 的“兴趣点”工具的一些路标:

import os.log

private let log = OSLog(subsystem: "Test", category: .pointsOfInterest)

func entireProcess() async throws {
    let id = OSSignpostID(log: log)
    os_signpost(.begin, log: log, name: #function, signpostID: id, "start")

    var value = try await first()

    for _ in 0 ..< 4 {
        os_signpost(.event, log: log, name: #function, "Scheduling: %d with input of %d", i, value)
        value = try await subsequent(with: value)
    }

    os_signpost(.end, log: log, name: #function, signpostID: id, "%d", value)
}

func first() async throws -> Int {
    let id = OSSignpostID(log: log)
    os_signpost(.begin, log: log, name: #function, signpostID: id, "start")

    try await Task.sleep(seconds: 1)

    let value = 42
    os_signpost(.end, log: log, name: #function, signpostID: id, "%d", value)

    return value
}

func subsequent(with value: Int) async throws -> Int {
    let id = OSSignpostID(log: log)
    os_signpost(.begin, log: log, name: #function, signpostID: id, "%d", value)

    try await Task.sleep(seconds: 1)

    let newValue = value + 1
    defer { os_signpost(.end, log: log, name: #function, signpostID: id, "%d", newValue) }

    return newValue
}

因此,您会看到一系列请求,它们将其结果传递给后续请求。所有的os_signpost路标的内容是这样我们可以直观地看到它们在 Instrument 的“兴趣点”工具中按顺序运行:

你可以看到计划每个任务时的事件路标,并且间隔说明了这些异步任务的顺序执行。

这是在任务之间建立依赖关系的最简单方法,将值从一个任务传递到另一个任务。


现在,问题是如何概括上述内容,即我们在开始下一个任务之前等待上一个任务。

一种模式是编写一个等待前一个演员结果的演员。考虑:

actor SerialTasks<Success> {
    private var previousTask: Task<Success, Error>?

    func add(block: @Sendable @escaping () async throws -> Success) {
        previousTask = Task { [previousTask] in
            let _ = await previousTask?.result
            return try await block()
        }
    }
}

与前面的示例不同,这不需要您有一个函数来启动后续任务。例如,当某些单独的用户交互要求我将新任务添加到先前提交的任务列表的末尾时,我使用了上述方法。

上述演员有两个微妙但关键的方面:

  1. The add方法本身必须not是一个异步函数。我们需要避开演员可重入性 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#actor-reentrancy。如果这是一个async函数(如您的示例中),我们将失去任务的顺序执行。

  2. The Task has a [previousTask]捕获列表捕获先前任务的副本。这样,每个任务都会await前一个,避免任何比赛。

上述可用于使一系列任务按顺序运行。但它本身并不是在任务之间传递值。我承认我已经使用了这种模式,我只需要顺序执行很大程度上独立的任务(例如,发送单独的命令发送给某些Process)。但它可能适合您的场景,在该场景中您希望“与[后续请求]共享其响应”。

我建议您使用 MCVE 发布一个单独的问题,并提供一个实际示例,准确说明您想要从一个异步函数传递到另一个异步函数的内容。例如,我已经完成了上述的排列,将整数从一个任务传递到另一个任务。但在实践中,这并没有多大用处,因为当您开始处理异构结果解析的现实时,它会变得更加复杂。在实践中,我开始这个问题的简单例子是最实用的模式。

关于处理/围绕演员重入的更广泛问题,我建议密切关注SE-0306 - 未来方向 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#future-directions其中明确考虑了一些潜在的优雅的即将到来的替代方案。如果看到一些改进,无论是在语言本身还是在Swift 异步算法 https://github.com/apple/swift-async-algorithms图书馆。


tl;dr

我不想在上面讨论有关您的代码片段的内容,但存在很多问题。所以,如果你原谅我,这里有一些观察结果:

  • 尝试使用OperationStatus强制顺序执行async通话无法进行,因为演员有可重入性 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#actor-reentrancy。如果你有一个async功能,每次你点击一个await,这是一个暂停点,在此点允许对该异步函数的另一个调用继续进行。您的诚信度OperationStatus逻辑就会被违反。您不会遇到串行行为。

    如果您对悬挂点感兴趣,我建议您观看 WWDC 2021 视频Swift 并发:幕后花絮 https://developer.apple.com/videos/play/wwdc2021/10254/.

  • The testConcurrency正在打电话waitForExpectations在它实际开始任何能够满足任何期望的任务之前。那总是会超时。

  • The testConcurrency正在使用GCDconcurrentPerform,反过来,它只是调度一个异步任务并立即返回。这违背了整个目的concurrentPerform(这是一种运行一系列的节流机制同步并行任务,但不能超过 CPU 上的最大内核数)。此外,Swift 并发具有自己的模拟功能concurrentPerform,即受约束的“协作线程池”(也在该讨论中讨论过)video https://developer.apple.com/videos/play/wwdc2021/10254/,IIRC),渲染concurrentPerform在 Swift 并发领域已经过时了。

    最重要的是,包含在内没有意义concurrentPerform在 Swift 并发代码库中。使用它也没有意义concurrentPerform启动异步任务(无论是 Swift 并发还是 GCD)。它用于并行启动一系列同步任务。

  • In execute在您的测试中,您有两种执行路径,一种将等待某种状态更改并满足期望,而无需增加存储。这意味着您将失去一些增加该值的尝试。您的总计将与所需的结果值不匹配。现在,如果您的目的是在另一个请求待处理时放弃请求,那没问题。但我不认为那是你的意图。

  • 回答你关于最后如何测试成功的问题。你可能会这样做:

    actor Storage {
        private var counter: Int = 0
    
        func increment() {
            counter += 1
        }
    
        var value: Int { counter }
    }
    
    func testConcurrency() async {
        let storage = Storage()
        let operation = OperationStatus()
        let promise = expectation(description: "testConcurrency")
        let finalCount = outerIterations * iterations
        promise.expectedFulfillmentCount = finalCount
    
        @Sendable func execute() async {
            guard await !operation.isExecuting else {
                await operation.waitUntilFinished()
                promise.fulfill()
                return
            }
    
            await operation.set(isExecuting: true)
            try? await Task.sleep(seconds: 1)
            await storage.increment()
            await operation.set(isExecuting: false)
            promise.fulfill()
        }
    
        // waitForExpectations(timeout: 10)                                      // this is not where you want to wait; moved below, after the tasks started
    
        // DispatchQueue.concurrentPerform(iterations: outerIterations) { _ in   // no point in this
    
        for _ in 0 ..< outerIterations {
            for _ in 0 ..< iterations {
                Task { await execute() }
            }
        }
    
        await waitForExpectations(timeout: 10)
    
        // test the success to see if the store value was correct
    
        let value = await storage.value                                          // to test that you got the right count, fetch the value; note `await`, thus we need to make this an `async` test
    
        // Then
        XCTAssertEqual(finalCount, value, "Count")
    }
    

    现在,此测试将因多种原因而失败,但希望这说明了您将如何验证测试的成功或失败。但是,请注意,这只会测试最终结果是否正确,而不是测试它们是否按顺序执行。事实是Storage是一个演员将隐藏他们没有真正按顺序调用的事实。也就是说,如果您确实需要一个请求的结果来准备下一个请求,则此处不会进行测试。

  • 如果在您经历此过程时,您想真正确认您的行为OperationStatus模式,我建议使用os_signpost间隔(或任务开始和结束的简单日志记录语句)。您将看到异步的单独调用execute方法不是按顺序运行的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何暂停后续任务直到第一次完成然后与等待的任务共享其响应? 的相关文章

  • 是否可以快速读取权利文件

    我正在我的应用程序中实现通用深度链接 当我注册不同的域时 它会创建一个 AppName entitlements 文件 我想像 plist 一样读取这个文件的值 I tried if let path NSBundle mainBundle
  • 使用 CommonCrypto 的 Swift AES 加密

    我正在开发一个 iOS 应用程序代码7 1 with 斯威夫特2 1我正在尝试进行简单的加密AES 128 位 and PKCS7填充使用通用加密库 该代码有效 但每次我尝试投射NSData反对NSString然后对于 String 我得到
  • 获取键盘高度在 iOS 11 beta 上不起作用

    我有以下在 IOS 10 上运行的代码 但现在在 IOS 11 beta 上运行时不再运行 if let userInfo notification userInfo if let keyboardSize userInfo UIKeybo
  • Swift 3 中的 _ArrayType 去了哪里?

    我有一些用于中继的代码 ArrayType在 Swift 3 之前 我试图了解公共协议发生了什么 ArrayType 任何想法 ArrayType被重命名 ArrayProtocol 您可以在ArrayType swift https gi
  • 在 iMessage 应用程序中检查横向/纵向方向(扩展)

    见过很多检查方向的解决方案 但奇怪的是 没有一个有效 下面是代码片段 override func viewWillTransition to size CGSize with coordinator UIViewControllerTran
  • 在 Swift 中使用显式对象类型迭代数组

    我有一个数组 let individualScores 75 43 103 87 12 我这样迭代 for score in individualScores 但是 有没有办法显式声明对象类型呢 我认为以后使用自定义对象或其他原因它会派上用
  • iPad 的自适应布局

    我正在关注这篇文章在 ios 中构建自适应布局为 iOS 8 构建自适应用户界面 http www sitepoint com building adaptive user interfaces ios 8 它在 iPhone 上运行良好
  • 如何在 iOS 11 上的 Swift 中获取 FLAC 文件元数据?

    我需要获取 FLAC 文件的元数据 我尝试了以下代码 let item AVPlayerItem url URL fileURLWithPath path let commonMetadata item asset commonMetada
  • 如何在 UIImagePickerController 捕获图像的瞬间获取当前位置?

    我研究了如何从返回的图像中获取位置数据UIImagePickerController相机 但是 我认为最简单的方法是获取当前位置CLLocationManager此刻UIImagePickerController捕获图像 有办法做到这一点吗
  • 身份验证后如何退出 Google

    所以我的应用程序可以选择使用 Google 登录 单击 Google 提供的按钮后 将打开一个 Web 视图并让用户输入其凭据 允许应用程序访问其信息后 应用程序将用户登录并将 SignInViewController 更改为 TabBar
  • 如何为具有圆角的精灵设置物理体

    我创建了一个SKShapeNode通过以下方式 let sprite SKShapeNode rect CGRect x 20 y 10 width 40 height 20 cornerRadius 10 我也像这样设置了一个物理体 sp
  • ARKit – 无法在 MCSession 中解码 ARAnchor

    我正在使用 WWDC 2018 中推出的 ARKit 2 0 测试 Apple 的多用户 AR 演示应用程序 创建多用户 AR 体验 https developer apple com documentation arkit creatin
  • NSURLConnection 完成时出现错误 - 代码 -1002 修复不起作用

    我收到此错误 NSURLConnection 已完成 错误代码 1002 我已将下面的代码添加到我的 info plist 中 有谁知道为什么 提前致谢
  • 一个 AVPlayerItem 一次只能在玩家队列中占据一个位置?

    我见过许多问题 https stackoverflow com questions 6605771 an avplayeritem can occupy only one position in a players queue at a t
  • 减缓 Push Segue 的过渡

    我想知道是否有一种方法可以像 Tinder 和 Snapchat 那样减慢 推进和返回 速度 这就像正常的转场 但不知怎的 它过渡得很慢 我通常通过在 Storyboard 上设置推送通知并以编程方式调用 segue 来处理推送 Segue
  • Swift 中的“is”关键字

    据我所知 似乎共识是is在 Swift 关键字中是同义词isKindOfClass method 但是 我在执行以下操作时遇到困难 inside of a method in UITabViewController check if the
  • 使用 Swift 的 UIPopoverController、Xcode 6、IOS 8

    我在使用 swift 使 UIPopover 出现时遇到一些麻烦 注释掉的代码在 Objective C 中工作正常 但在 Swift 中不起作用 当我点击视图控制器中的 时 我确实在调试器中得到了 点击 但是没有出现弹出窗口 class
  • 在 Swift 中在地图上显示路线

    我试图在苹果地图上绘制两点之间的路线 Swift 代码 下面的结构体用于存储坐标 struct GeoLocation var latitude Double var longitude Double func distanceBetwee
  • 为什么NWPathMonitor状态总是满足?

    当没有连接时 我从 URL 会话中收到一条错误消息 指出请求超时 我正在使用网络协议来事先检查连接情况 但显然当我在内部调用它时这不起作用viewDidLoad static func startUpdateProcess let moni
  • 三元运算符结合性

    我无法理解三元运算符上下文中的结合性概念 在大多数情况下 三元运算符如下所示 a b c 在这种情况下 不需要结合性来计算表达式 但有时 三元运算符是嵌套的 a b c d e a b c d e is right associative

随机推荐