将消息定向给消费者

2023-12-11

我的客户端正在尝试向接收者发送消息。但是我注意到接收者有时没有收到客户端发送的所有消息,因此丢失了一些消息(不确定问题出在哪里?客户端还是接收者)。 关于为什么会发生这种情况的任何建议。这就是我目前正在做的事情

在接收方,这就是我正在做的事情。

这是事件处理器

        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var eventData in messages)
            {
                var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
            }
        }

这是客户端连接到事件中心的方式

var StrBuilder = new EventHubsConnectionStringBuilder(eventHubConnectionString)
{
 EntityPath = eventHubName,
};
this.eventHubClient = EventHubClient.CreateFromConnectionString(StrBuilder.ToString());

如何将消息定向给特定消费者


我正在使用来自 eventhub 官方文档的示例代码,用于sending and 接收.

我有两个消费者群体:$Default and newcg。假设您有 2 个客户端,client_1 使用默认消费者组($Default),client_2 使用另一个消费者组(newcg)

首先,创建发送客户端后,在SendMessagesToEventHub方法中,我们需要添加一个具有值的属性。该值应该是消费者组名称。示例代码如下:

    private static async Task SendMessagesToEventHub(int numMessagesToSend)
    {
        for (var i = 0; i < numMessagesToSend; i++)
        {
            try
            {
                var message = "444 Message";
                Console.WriteLine($"Sending message: {message}");
                EventData mydata = new EventData(Encoding.UTF8.GetBytes(message));

                //here, we add a property named "cg", it's value is the consumer group. By setting this property, then we can read this message via this specified consumer group.
                mydata.Properties.Add("cg", "newcg");

                await eventHubClient.SendAsync(mydata);

            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
            }

            await Task.Delay(10);
        }

        Console.WriteLine($"{numMessagesToSend} messages sent.");
    }

然后在client_1中,创建接收器项目后,使用默认消费者组($Default)-> 在SimpleEventProcessor类->ProcessEventsAsync方法,我们可以过滤掉不必要的事件数据。示例代码为ProcessEventsAsync method:

        public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var eventData in messages)
            {
                //filter the data here
                if (eventData.Properties["cg"].ToString() == "$Default")
                {                    
                    var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);

                    Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
                    Console.WriteLine(context.ConsumerGroupName);
                }
            }

            return context.CheckpointAsync();
        }

在另一个客户端中,例如 client_2,它使用另一个消费者组,就像它的名称一样newcg,我们可以按照client_1中的步骤进行,只需稍加改动ProcessEventsAsync方法,如下:

            public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
            {
                foreach (var eventData in messages)
                {
                    //filter the data here, using another consumer group name
                    if (eventData.Properties["cg"].ToString() == "newcg")
                    {  
                       //other code
                    }
                   }

                 return context.CheckpointAsync();
               }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将消息定向给消费者 的相关文章

随机推荐

  • 如何使用 Google Apps 脚本将表格置于 Google 文档页面的中心

    我已使用 Google Apps 脚本功能在 google 文档中插入了一个表格 var grg body appendTable griglia 我可以设置表格中文本的格式 字体大小 粗细 对齐方式 也可以设置单个单元格的格式 背景 前景
  • 我应该如何使用 HttpRequest.GetBufferlessInputStream?

    我在 WCF 服务中接收发布数据时遇到问题 如果我尝试使用 InputStream 则会出现异常 调用 HttpRequest GetBufferlessInputStream 后不支持此方法或属性 我相信我明白为什么会抛出这个错误 但我还
  • OpenGL 中的厚贝塞尔曲线

    我正在使用 jogl opengl 绑定在 java 中编写一个程序 我需要创建一条厚度沿曲线变化的贝塞尔曲线 到目前为止 我只管理了一条细的单点贝塞尔曲线 我很确定这不是一件容易的事 但我不知道从哪里开始寻找解决方案 如果有人能指出我如何
  • 当屏幕关闭时,MediaPlayer 在 Lollipop 上过早切断播放

    我在 Lollipop 设备上遇到了 MediaPlayer 的问题 基本上 当设备屏幕关闭 即用户锁定设备 时 播放会继续 但提前结束约 1 2 秒 但屏幕打开时不会发生这种情况 我在 MediaPlayer 上有一个 onComplet
  • 计算素数时堆栈空间溢出

    我正在学习 Real World Haskell 我在第 4 章 为了进行一些课外练习 我创建了以下程序来计算第 n 个素数 import System Environment isPrime primes test loop primes
  • Android 2.1:如何在 GridView 上放大/缩小和滚动

    背景 我的工作应用程序包含一个 GridView 它有 5 行 11 列 并带有一个用于显示的覆盖适配器 它非常适合我对大显示屏平板电脑的需求 移植到小型智能手机后 我意识到网格由于尺寸小而无法使用 我决定使用缩放功能 而不是实现横向 问题
  • 当使用 tcp 套接字执行 async_write 时,何时调用处理程序?

    这只是 async write 如何与 tcp 套接字配合的简单问题 基本上 当使用 tcp 套接字时 当数据写入套接字时 或者从目标接收到 ack 时 写入处理程序是否会被调用 AFAIK 一旦数据写入套接字的内核缓冲区 处理程序就会被调
  • Access 2007 SQL 中的 Group By 聚合函数中的不同计数

    您好 我浏览论坛有一段时间了 在这里问我的第一个问题 我有点陷入困境 想知道是否可以获得一些帮助 我正在使用 Access 2007 尚未在网上找到该问题的良好答案 我的数据是诊断代码和客户 ID 我正在寻找的是为什么要查找每个诊断代码的客
  • Django 表单未提交

    我有一个在模板中正确呈现的 Django 模型 视图 表单 但它没有提交输入到数据库的数据 任何对此的帮助将不胜感激 models py from django db import models from django forms impo
  • 汇编程序可以在 Linux 发行版之间移植吗?

    以汇编程序格式提供的程序是否可以在 Linux 发行版之间移植 模 CPU 架构差异 这是我的问题的背景 我正在开发一种新的编程语言 名为 Aklo 其操作方式将是经典的编译为 s 并将结果提供给 GNU 汇编器 显然最终最好能自己编写实现
  • 如何在javascript中创建txt文件

    if window XMLHttpRequest xmlhttp new XMLHttpRequest else xmlhttp new ActiveXObject Microsoft XMLHTTP xmlhttp open GET t1
  • 如何访问动态列表中的项目?

    我试图弄清楚如何枚举动态 LINQ 的结果 Select string selectors 在 NET 4 5 中 动态 linq 来自System Linq Dynamic命名空间 Edit 我还包括System Linq 我有一个看起来
  • 双向 WeakMap 保持对象存活?

    假设我有两个 WeakMap a2b new WeakMap
  • java.lang.NoSuchMethodError: org.hibernate.cfg.Configuration.addAnnotatedClass

    当我尝试这个时 我是 JPA 和 hibernate 的新手tutorial 我在 persistence xml 中添加了以下提供程序
  • 类型不匹配无法从元素类型对象转换为字符串

    在我的代码中创建搜索方法来搜索字符串时 我不断收到此错误 我已经通过很多例子试图解决这个问题 但我找不到任何例子 感谢您提供的任何帮助和建议 public class runNote public static void main Stri
  • 尝试针对 ManagementObjectNotFoundException 和 ActiveDirectory/Outlook 进行 Catch/Exception

    这可能是一个非常基本的问题 但我还没有在表格上看到它 请耐心等待 我是 powershell 新手 当我们的 Active Directory 数据库中找不到用户名时 我试图捕获此异常 ManagementObjectNotFoundExc
  • 如何使用 Selenium 允许位置访问?

    我试图在Java中使用Selenium来获取用户的地理坐标 但是使用IP地址不够准确 所以我想使用这个网站http www whataremycooperatives com 但它不起作用 我猜这是因为你必须允许位置使用 所以无论如何我可以
  • 是什么导致了“Base-64 字符数组的长度无效”

    我在这里没什么可说的 我无法在本地重现此问题 但是当用户收到错误时 我会收到自动电子邮件异常通知 Invalid length for a Base 64 char array at System Convert FromBase64Str
  • 为什么spring-boot-starter项目的github项目是空的?

    看着spring boot 启动器 web spring boot starter 安全性github 上的项目 我发现它们是空的 只有一个 build gradle 文件存在 我希望这符合预期 但这让我了解在哪里可以找到实际的源代码 而且
  • 将消息定向给消费者

    我的客户端正在尝试向接收者发送消息 但是我注意到接收者有时没有收到客户端发送的所有消息 因此丢失了一些消息 不确定问题出在哪里 客户端还是接收者 关于为什么会发生这种情况的任何建议 这就是我目前正在做的事情 在接收方 这就是我正在做的事情