将阻塞调用包装为异步,以实现更好的线程重用和响应式 UI

2024-04-28

我有一个类负责通过调用遗留类来检索产品可用性。该遗留类本身通过进行 BLOCKING 网络调用在内部收集产品数据。 请注意,我无法修改旧版 API 的代码。由于所有产品都是相互独立的,因此我希望并行收集信息,而不会创建任何不必要的线程,也不会阻塞在调用此旧版 API 时被阻塞的线程。有了这个背景,这是我的基础课程。

class Product
    {
        public int ID { get; set; }
        public int  VendorID { get; set; }
        public string Name { get; set; }
    }

    class ProductSearchResult
    {
        public int ID { get; set; }
        public int AvailableQuantity { get; set; }
        public DateTime ShipDate { get; set; }
        public bool Success { get; set; }
        public string Error { get; set; }
    }

class ProductProcessor
    {
        List<Product> products;
        private static readonly SemaphoreSlim mutex = new SemaphoreSlim(2);
        CancellationTokenSource cts = new CancellationTokenSource();
        public ProductProcessor()
        {
            products = new List<Product>()
            {
                new Product() { ID = 1, VendorID = 100, Name = "PC" },
                new Product() { ID = 2, VendorID = 101, Name = "Tablet" },
                new Product() { ID = 3, VendorID = 100, Name = "Laptop" },
                new Product() { ID = 4, VendorID = 102, Name = "GPS" },
                new Product() { ID = 5, VendorID = 107, Name = "Mars Rover" }
            };

        }

        public async void Start()
        {
            Task<ProductSearchResult>[] tasks = new Task<ProductSearchResult>[products.Count];
            Parallel.For(0, products.Count(), async i =>
            {
                tasks[i] = RetrieveProductAvailablity(products[i].ID, cts.Token);

            });



            Task<ProductSearchResult> results = await Task.WhenAny(tasks);

            // Logic for waiting on indiviaul tasks and reporting results

        }

        private async Task<ProductSearchResult> RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
        {
            ProductSearchResult result = new ProductSearchResult();
            result.ID = productId;

            if (cancellationToken.IsCancellationRequested)
            {
                result.Success = false;
                result.Error = "Cancelled.";
                return result;
            }

            try
            {
                await mutex.WaitAsync();
                if (cancellationToken.IsCancellationRequested)
                {
                    result.Success = false;
                    result.Error = "Cancelled.";
                    return result;
                }

                LegacyApp app = new LegacyApp();
                bool success = await Task.Run(() => app.RetrieveProductAvailability(productId));
                if (success)
                {
                    result.Success = success;
                    result.AvailableQuantity = app.AvailableQuantity;
                    result.ShipDate = app.ShipDate;
                }
                else
                {
                    result.Success = false;
                    result.Error = app.Error;
                }
            }
            finally
            {
                mutex.Release();
            }

            return result;

        }

    }

鉴于我正在尝试将异步封装在同步 API 上,我有两个问题。

  1. 通过使用 Parallel.For 并将传统 API 调用包装在 Task.Run 中,我是否创建了任何不必要的线程,这些线程本可以在不阻塞调用线程的情况下避免,因为我们将在 UI 中使用此代码。
  2. 这段代码看起来仍然是线程安全的吗?

编译器会给你关于你的警告async拉姆达。仔细阅读;它告诉你它不是异步的。使用没有意义async那里。另外,不要使用async void.

由于您的底层 API 是阻塞的 - 并且无法更改它 - 异步代码不是一个选项。我建议使用几个Task.Run calls or Parallel.For,但不能两者兼而有之。所以让我们使用并行。实际上,让我们使用并行 LINQ,因为您转变一个序列。

没有意义做RetrieveProductAvailablity异步;它只做除节流之外的阻塞工作,并且并行方法具有更自然的节流支持。这使得你的方法看起来像:

private ProductSearchResult RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
{
  ... // no mutex code
  LegacyApp app = new LegacyApp();
  bool success = app.RetrieveProductAvailability(productId);
  ... // no mutex code
}

然后您可以进行并行处理,如下所示:

public void Start()
{
  ProductSearchResult[] results = products.AsParallel().AsOrdered()
      .WithCancellation(cts.Token).WithDegreeOfParallelism(2)
      .Select(product => RetrieveProductAvailability(product.ID, cts.Token))
      .ToArray();
  // Logic for waiting on indiviaul tasks and reporting results
}

从你的 UI 线程中,你可以call该方法使用Task.Run:

async void MyUiEventHandler(...)
{
  await Task.Run(() => processor.Start());
}

这使您的业务逻辑保持干净(仅同步/并行代码),并负责将这项工作移出 UI 线程(使用Task.Run)属于UI层。

Update:我添加了一个电话AsOrdered确保结果数组的顺序与产品序列相同。这可能是必要的,也可能不是必要的,但由于原始代码保留了顺序,因此该代码现在也这样做了。

Update:由于您需要在每次检索后更新 UI,因此您可能应该使用Task.Run对于每一个而不是AsParallel:

public async Task Start()
{
  var tasks = products.Select(product =>
      ProcessAvailabilityAsync(product.ID, cts.Token));
  await Task.WhenAll(tasks);
}

private SemaphoreSlim mutex = new SempahoreSlim(2);
private async Task ProcessAvailabilityAsync(int id, CancellationToken token)
{
  await mutex.WaitAsync();
  try
  {
    var result = await RetrieveProductAvailability(id, token);
    // Logic for reporting results
  }
  finally
  {
    mutex.Release();
  }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将阻塞调用包装为异步,以实现更好的线程重用和响应式 UI 的相关文章

随机推荐

  • 如何使用相机谷歌地图 xcode 移动标记(图钉)

    我在我的应用程序中使用谷歌地图 API 我的应用程序中有两个按钮 第一个按钮在我的地图中添加一个标记 图钉 现在我想要第二个按钮将添加的图钉水平移动到页面中心 并使其移动到页面顶部的 25 我希望相机 用户正在查看的区域 也移动它 这是我的
  • 使用 python 从 XSD 文件创建特定的 XML 文件

    我有一个现有的 xsd 架构 并且需要创建 希望使用 Python 带有一些特定输入的 XML 文件 最好的方法是什么 我尝试了 Element Tree 和 xmlschema 但我无法判断它们是否允许从已知的 XSD 架构开始生成 XM
  • 您应该通过属性访问同一类中的变量吗?

    如果您有一个获取和设置实例变量的属性 那么通常您总是使用该类外部的属性来访问它 我的问题是你也应该在课堂上这样做吗 如果有的话 我总是使用该属性 即使是在班级内 但我想听到一些支持和反对的论据 以确定哪个是最正确的以及为什么 或者这只是项目
  • 使 HTML5 视频海报与视频本身大小相同

    有谁知道如何调整 HTML5 视频海报的大小 使其适合视频本身的确切尺寸 这是一个显示问题的 jsfiddle http jsfiddle net zPacg 7 http jsfiddle net zPacg 7 这是代码 HTML
  • Console.ReadLine() 末尾没有换行符?

    问题很简单 当我使用 Console ReadLine 控制台上打印的下一个内容将在下一行 有什么办法可以继续打印该行吗 提前致谢 请检查 控制台 Read 这不会导致新行或换行
  • MySQL 监听通知等效项

    是否有相当于 PostgresQL 的notify http www postgresql org docs 9 1 static sql notify html and listen http www postgresql org doc
  • 如何在 C# 中将 IEnumerable 转换为 Enum?

    我已将多个字符串解析为枚举标志 但看不到将它们合并为单个枚举位字段的巧妙方法 我使用的方法循环遍历字符串值 然后 将值转换为 Enum 对象 如下所示 Flags public enum MyEnum None 0 First 1 Seco
  • Spring - 使用 new 是一种不好的做法吗?

    正在创建对象by hand 即使用new操作员而不是注册Springbean 和使用依赖注入被认为是不好的做法吗 我的意思是 确实Spring IoC容器必须了解应用程序中的所有对象吗 如果是这样 为什么 你希望 Spring 创建 bea
  • dask groupby 不合并分区

    我有一组数据 我想要对其进行一些简单的 groupby count 操作 但我似乎无法使用 dask 来完成此操作 我很可能不理解 dask 中执行 groupby reduce 的方式 特别是当索引位于分组键中时 所以我将用玩具数据来说明
  • 当父类也实现 IDisposable 时,在子类上实现 IDisposable

    我有一个父类和子类都需要实现IDisposable 应该在哪里virtual and base Dispose 通话发挥作用 当我刚刚覆盖Dispose bool disposing 打电话 说我实现了感觉真的很奇怪IDisposable没
  • PyCharm 虚拟环境和 Anaconda 环境有什么区别?

    当我在 PyCharm 中创建新项目时 它会创建一个新的虚拟环境 我读到 当我执行Python脚本时 它们是使用此环境中的解释器而不是系统环境来执行的 因此 如果我需要安装一些软件包 我只能将它们安装在这个环境中 而不是在系统环境中 这很酷
  • 绘图 Deedle 框架

    我有以下代码 let mychart frame GetAllSeries gt Seq iter fun key value gt Chart Line value Name key gt Chart Combine where fram
  • 如何使用 haproxy 负载均衡器 Kafka Bootstrap?

    我有一个 kafka 集群 由 3 台在 AWS 上运行的机器组成 卡夫卡1到卡夫卡3 我正在使用新型卡夫卡消费者 gt 0 8 我知道kafka客户端连接到其中一台kafka服务器 获取服务器元数据 然后直接连接到代理 我想确保在代理发生
  • 执行aapt失败

    请帮忙解决这个错误 Failed to execute aapt com android ide common process ProcessException Failed to execute aapt at com android b
  • Hibernate 在更新集合时删除孤儿

    我发现从 Hibernate 中的集合中删除时 孤立记录不会被删除 我一定是做了一些简单的错误 这是 Hibernate 101 但我找不到它 鉴于以下情况 public class Book ManyToOne NotNull Autho
  • Javascript Replace() 仅替换第一个匹配项[重复]

    这个问题已经存在了 你好 请参阅这里的 jsfiddle http jsfiddle net moolood jU9QY http jsfiddle net moolood jU9QY var toto bien address 1 bie
  • 在 Java 中从 Json 字符串中提取字段

    我正在尝试从以下 Json 字符串中提取每个 company id 的 id String test company id 4100 data drm user id 572901936637129135 direct status id
  • 什么是window.onpaint?

    有人推荐here https stackoverflow com questions 6944037 how to run java script code before page load that window onpaint用于在页面
  • 分层边缘捆绑:添加父组标签

    我对 HTML 和 JavaScript 还很陌生 我面临着著名的分层边缘捆绑可用here https bl ocks org mbostock 7607999 由 D3 js 库生成 My goal is to add a semi ci
  • 将阻塞调用包装为异步,以实现更好的线程重用和响应式 UI

    我有一个类负责通过调用遗留类来检索产品可用性 该遗留类本身通过进行 BLOCKING 网络调用在内部收集产品数据 请注意 我无法修改旧版 API 的代码 由于所有产品都是相互独立的 因此我希望并行收集信息 而不会创建任何不必要的线程 也不会