Azure ServiceBus 和异步 - 是还是不是?

2024-04-29

我正在 Azure 上运行服务总线,泵送每秒 10-100 条消息.

最近我已经切换到.net 4.5所有人都兴奋地重构了所有代码“异步”和“等待”' 每行至少两次,以确保它“正确”完成:)

现在我想知道这是否真的是为了是好是坏。如果您可以查看代码片段并告诉我您的想法。我特别担心如果线程上下文切换由于所有的异步性,并没有给我带来更多的悲伤而不是好处......(看看 !dumpheap 这绝对是一个因素)

只是一点描述 - 我将发布 2 种方法 - 一种在 ConcurrentQueue 上执行 while 循环,等待新消息,另一种方法一次发送一条消息。我还完全按照 Azure 博士的规定使用瞬态故障处理块。

发送循环(从头开始,等待新消息):

private async void SendingLoop()
    {
        try
        {
            await this.RecreateMessageFactory();

            this.loopSemaphore.Reset();
            Buffer<SendMessage> message = null;

            while (true)
            {
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }
                this.semaphore.WaitOne();
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }

                while (this.queue.TryDequeue(out message))
                {                       
                    try
                    {
                        using (message)
                        {
                            //only take send the latest message
                            if (!this.queue.IsEmpty)
                            {
                                this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
                                continue;
                            }
                            else
                            {
                                if (this.Topic == null || this.Topic.Path != message.Value.Topic)
                                    await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);

                                if (this.cancel.Token.IsCancellationRequested)
                                    break;
                                await this.SendMessage(message, this.cancel.Token);
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        break;
                    }
                    catch (Exception ex)
                    {
                        ex.LogError();
                    }
                }
            }
        }
        catch (OperationCanceledException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }
        finally
        {
            if (this.loopSemaphore != null)
                this.loopSemaphore.Set();
        }
    }

发送消息:

private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
    {
        //this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
        bool entityNotFound = false;

        if (this.MessageSender.IsClosed)
        {
            //this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
            await this.EnsureMessageSender(cancellationToken);
        }

        try
        {
            await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
            {
                message.Value.Body.Seek(0, SeekOrigin.Begin);
                using (var msg = new BrokeredMessage(message.Value.Body, false))
                {
                    await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
                }
            }, cancellationToken);
        }
        catch (MessagingEntityNotFoundException)
        {
            entityNotFound = true;                
        }
        catch (OperationCanceledException)
        { }
        catch (ObjectDisposedException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }

        if (entityNotFound)
        {
            if (!cancellationToken.IsCancellationRequested)
            {
                await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
            }
        }
    }

上面的代码来自每秒发送 1 条消息的“Sender”类。我在任何给定时间都有大约 50-100 个实例在运行,因此可能有相当多的线程。

顺便说一句,不必担心 EnsureMessageSender、RecreateMessageFactory、EnsureTopicExists 太多,它们不会经常被调用。

如果我只需要一个后台线程处理消息队列并同步发送消息,如果我只需要一次发送一条消息,而不用担心异步内容并避免随之而来的开销,那不是更好吗?

请注意,通常将一条消息发送到 Azure 服务总线只需几毫秒,这并不是很昂贵。 (除非有时速度缓慢、超时或服务总线后端出现问题,否则它可能会在尝试发送内容时挂起一段时间)。

感谢并抱歉发了这么长的帖子,

Stevo

建议的解决方案

这个例子可以解决我的情况吗?

static void Main(string[] args)
    {
        var broadcaster = new BufferBlock<int>(); //queue
        var cancel = new CancellationTokenSource();

        var run = Task.Run(async () =>
        {
            try
            {
                while (true)
                {
                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;                       

                    //async wait until a value is available
                    var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
                    int next = 0;

                    //greedy - eat up and ignore all the values but last
                    while (broadcaster.TryReceive(out next))
                    {
                        Console.WriteLine("Skipping " + val);
                        val = next;
                    }

                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;

                    Console.WriteLine("Sending " + val);

                    //simulate sending delay
                    await Task.Delay(1000).ConfigureAwait(false); 

                    Console.WriteLine("Value sent " + val);                        
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }

        }, cancel.Token);

        //simulate sending messages. One every 200mls 
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine("Broadcasting " + i);
            broadcaster.Post(i);
            Thread.Sleep(200);
        }

        cancel.Cancel();
        run.Wait();
    }

You say:

上面的代码来自每秒发送 1 条消息的“Sender”类。我 在任何给定时间都有大约 50-100 个实例在运行,因此可能是 相当多的线程。

这是异步的一个很好的例子。您可以在这里节省大量线程。异步reduces上下文切换,因为它是not基于线程。如果出现需要等待的情况,它不会进行上下文切换。相反,下一个工作项正在同一线程上处理(如果有的话)。

因此,异步解决方案肯定会比同步解决方案具有更好的扩展性。需要衡量在 50-100 个工作流实例中它是否实际上使用了更少的 CPU。实例越多,异步速度变得更快的可能性就越高。

现在,实现存在一个问题:您正在使用ConcurrentQueue这还不是异步就绪的。因此,即使在异步版本中,您实际上也使用了 50-100 个线程。它们要么阻塞(您想避免),要么忙等待燃烧 100% CPU(您的实现中似乎就是这种情况!)。您需要摆脱这个问题并使排队也异步。也许是一个SemaphoreSlim在这里很有帮助,因为它可以异步等待。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Azure ServiceBus 和异步 - 是还是不是? 的相关文章

随机推荐

  • Bootstrap 轮播下一个和上一个功能不起作用

    使用最新版本并具有基本的轮播 我已经让它可以使用所有默认设置 但是当尝试添加或停止某些功能时 事情会中断或根本不起作用 我希望能够手动循环浏览图像 不希望它自动循环 我只想使用下一个和上一个按钮来循环 我在这里读过一些帖子 但解决方案不起作
  • 将闪亮应用程序部署到 Shinyapps.io 时出错

    我有一个闪亮的应用程序 它在server R file library shiny Creating the app library ggplot2 library plyr library reshape2 library scales
  • Python 3 urllib 与请求性能

    我正在使用 python 3 5 并且正在检查 urllib 模块与 requests 模块的性能 我用 python 编写了两个客户端 第一个使用 urllib 模块 第二个使用 request 模块 它们都生成二进制数据 我将其发送到基
  • 在循环中初始化变量[重复]

    这个问题在这里已经有答案了 我试图弄清楚初始化某些变量时的最佳实践是什么 我的代码现在看起来像这样 int nHexCount 0 int prevState sc state bool bOnlySpaces true bool bIsV
  • 编译器处理包含保护头的开销有多大?

    为了加速大型源文件的编译 修剪翻译单元中使用的标头数量是否更有意义 或者编译代码的成本是否远远超过处理包含保护的时间标头 如果后者是真的 那么工程工作最好花在创建更多 轻量级的标头上 而不是更少 那么 现代编译器需要多长时间才能处理有效包含
  • 比较 nginx+Apache+mod_wsgi 与 nginx+uWSGI?

    在生产中使用 nginx Apache mod wsgi 与 nginx uWSGI vurtualenv 有何优缺点 我在自 2007 年以来开发的 mod wsgi 中看到了第一个变体的优点 并且具有更稳定的版本和易于管理 第二种变体的
  • Magento 以编程方式删除产品图像

    这肯定是一个非常简单的编程任务 我绝对无法在网上找到任何有关它的信息 基本上 我正在尝试删除产品图像 我想删除产品媒体库中的所有图像 我可以在不为如此简单的任务编写一百万行代码的情况下完成此操作吗 请注意 我已经尝试过 attributes
  • ActiveModel::ForbiddenAttributesError + cancan + Rails 4 + 具有作用域控制器的模型

    我正在使用 cancan 1 6 10 和 Rails 4 0 0 我有一个名为 App 未限定范围 的模型和一个控制器 Admin AppsController 其限定范围 即 app controllers admin apps con
  • 如何为json可序列化对象设置默认值?

    我想设置一个默认值AvailableService 对于原语来说它足够简单 我将如何使用自定义对象来做到这一点 class Submenu extends Equatable JsonKey defaultValue final Strin
  • Identity Server 4 上授权客户端的自定义端点

    我希望我的 Identity Server 4 服务器提供附加服务 例如 MyAdditionalService 对于一些注册客户 他们将通过在服务器上定义的自定义端点来使用该服务 我正在考虑为我的该服务定义一个 API 例如 名为 myA
  • 如何捕获 UITextField 文本的变化?

    我查看了所有类似的问题 它们与我所问的不同 我需要捕捉 UITextField 中实际文本的变化 而不仅仅是编辑状态 当视图加载时 它将成为第一响应者 我需要知道何时输入文本 以便我可以在导航栏中启用 下一步 如果可以的话请帮助我 这确实阻
  • 寻求:如何将 jsDoc 与 Webstorm IDE 一起使用的示例 (v 4)

    我仍在学习 js 并尝试 Webstorm IDE 这看起来很不错 包括跳转到 var function 声明 我可以看到如何获取 jsdoc 评论的模板 但我对此没有经验 并且正在寻找如何在评论中提供更多详细信息以及如何提供的示例view
  • 类型“Observable>”不可分配给类型“Observable<>”

    我有这个代码片段 SubmitTransaction transNumber string transactionRequest ITransactionRequestObj Observable
  • MySQL 1443:这是什么意思?

    我正在尝试在 MySQL 5 0 中进行以下形式的更新 update mytable myfield t set f blah where t id in select v id from myview v where MySQL 告诉我
  • 如何在 Delphi 中更改 TabControl 中活动 TAB 的颜色

    如何更改 TabControl 在 FireMonkey 上 中活动 TAB 的颜色 如下所示 有两种方法可以实现这一点 1 第一个选项是您可以创建定制风格 for 选项卡控件 from T样本 风格设计师 然后您可以添加您想要在自定义设计
  • 如何分别用其名称替换逗号分隔的部门 ID?

    我的桌子是这些 员工表 id name department 1 Carrera 1 2 Taylor 1 2 部门表 id name 1 CS 2 IT
  • 从 mongoDB 数组中获取特定元素[重复]

    这个问题在这里已经有答案了 我有像下面这样的 mongo 集合 auther xyz location zzz books book1 b1 date 2 3 00 book1 b2 date 4 9 00 auther pqr locat
  • 从命令行将 clojure 源代码编译为类(AOT)(不使用 lein)

    我正在尝试将 clojure 源代码编译成类文件 并仅使用命令行运行它 没有 lein 也没有 可能 回复 我有 core cljsrc hello目录 src hello core clj 这是源代码 ns hello core defn
  • ASP.NET core 获取 URL 绑定

    ASP NET core webhost 启动后 我想获取它的绑定 URL 即 http 0 0 0 0 5001 http 0 0 0 0 5001 https 192 168 42 42 8081 https 192 168 42 42
  • Azure ServiceBus 和异步 - 是还是不是?

    我正在 Azure 上运行服务总线 泵送每秒 10 100 条消息 最近我已经切换到 net 4 5所有人都兴奋地重构了所有代码 异步 和 等待 每行至少两次 以确保它 正确 完成 现在我想知道这是否真的是为了是好是坏 如果您可以查看代码片