如何为 Azure 服务编写 MassTransit Json 反序列化器

2023-12-19

这就是我将对象发布到事件网格的方式。我希望能够使用天蓝色服务总线来收听它。

        public void Publicar<T>(T model, string operation, string entity)
    {
        _nomeEvento = entity + operation;

        Boolean.TryParse(Configuration["EventGridConfig:Enabled"], out var eventGridIsActive);
        if (!eventGridIsActive)
            return;

        var primaryTopicKey = Configuration["EventGridConfig:AcessKey"];
        var primaryTopic = Configuration["EventGridConfig:Endpoint"];

        var primaryTopicHostname = new Uri(primaryTopic).Host;

        var topicCredentials = new TopicCredentials(primaryTopicKey);
        var client = new EventGridClient(topicCredentials);

        client.PublishEventsAsync(primaryTopicHostname, GetEventsList(model)).GetAwaiter().GetResult();
    }

    private List<EventGridEvent> GetEventsList<T>(T model)
    {
        return new List<EventGridEvent>
        {
            new EventGridEvent()
            {
                Id = Guid.NewGuid().ToString(),
                EventType = _nomeEvento,
                Data = model,
                EventTime = DateTime.Now,
                Subject = "MS_Clientes",
                DataVersion = "1.0",
            }
        };
    }

这就是我连接到服务总线的方式

    static class CustomExtensionsMethods
{
    public static IServiceCollection AddBus(this IServiceCollection services, IConfiguration configuration,
        IHostingEnvironment env)
    {
        services.AddMassTransit(x => { x.AddConsumer<NomeEmailChangeConsumer>(); });
        services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
        {
            var keyName = "RootManageSharedAccessKey";
            var busName = configuration["ServiceBus:Name"];
            var secret = configuration["ServiceBus:Secret"];
            var host = cfg.Host(
                "Endpoint=sb://" + busName + ".servicebus.windows.net/;" +
                "SharedAccessKeyName=" + keyName + ";" +
                "SharedAccessKey=" + secret,
                z =>
                {
                    TokenProvider
                        .CreateSharedAccessSignatureTokenProvider(keyName, secret);
                });
            cfg.ConfigureJsonSerializer(settings =>
            {
                settings.Converters.Add(new InterfaceConverter());

                return settings;
            });
            cfg.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
            cfg.ReceiveEndpoint(host, configuration["ServiceBus:Topic"],
                e => { e.Consumer<NomeEmailChangeConsumer>(provider); });
        }));
        services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
        services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
        services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
        services.AddScoped(provider => provider.GetRequiredService<IBus>().CreateRequestClient<NomeEmailChange>());
        services.AddSingleton<IHostedService, BusService>();
        return services;
    }
}

但后来我得到了同样的错误

    fail: MassTransit.Messages[0]
      R-FAULT sb://dev.servicebus.windows.net/bff-queue 9ade19ec-238c-4c08-8e03-28bac695ea7b No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
System.Runtime.Serialization.SerializationException: No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
   at MassTransit.Serialization.SupportedMessageDeserializers.Deserialize(ReceiveContext receiveContext)
   at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)

我尝试添加在网上找到的 JsonConverter,但没有成功

    public class InterfaceConverter : JsonConverter
    {
        public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
        {
            serializer.Serialize(writer, value);
        }

        public override object ReadJson(JsonReader reader, Type objectType, object existingValue,
            JsonSerializer serializer)
        {
            // Set TypeNameHandling to Auto for deserializing objects with $type
            // Should be set directly in ConfigureJsonDeserializer when setting up MT Service bus
            serializer.TypeNameHandling = TypeNameHandling.Auto;
            return serializer.Deserialize(reader);
        }

        public override bool CanConvert(Type objectType)
        {
            return objectType.IsInterface;
        }
    }

我尝试了几个测试并想出了一个可行的解决方案。在我的测试用例中,我将消息从事件网格主题重定向到服务总线队列,就像您的情况一样 - 如果我理解得很好的话。

由于 MassTransit 要求消息采用某种格式才能解释它们,因此我们需要确保具有以下内容:

  1. 事件网格消息的自定义解串器,其类型为事件网格事件
  2. 确保 MassTransit 必须使用的所有消息有内容类型- 没有这个,它将无法工作

因此,我构建了一个示例,该示例在您从 EventGrid 重定向消息以及将消息直接通过管道传送到服务总线时都可以工作。 以下代码是如何为 EventGrid 消息实现反序列化器的示例:

public class EventGridMessgeDeserializer : IMessageDeserializer
    {
        private string _contentType;

        public EventGridMessgeDeserializer(string contentType)
        {
            _contentType = contentType;
        }
        public ContentType ContentType => new ContentType(_contentType);

        public ConsumeContext Deserialize(ReceiveContext receiveContext)
        {
            var body = Encoding.UTF8.GetString(receiveContext.GetBody());
            var customMessage = JsonConvert.DeserializeObject<EventGridEvent>(body);
            var serviceBusSendContext = new AzureServiceBusSendContext<EventGridEvent>(customMessage, CancellationToken.None);

            // this is the default scheme, that has to match in order messages to be processed
            // EventGrid messages type of EventGridEvent within namespace Microsoft.Azure.EventGrid.Models
            string[] messageTypes = { "urn:message:Microsoft.Azure.EventGrid.Models:EventGridEvent" };
            var serviceBusContext = receiveContext as ServiceBusReceiveContext;
            serviceBusSendContext.ContentType = new ContentType(JsonMessageSerializer.JsonContentType.ToString());
            serviceBusSendContext.SourceAddress = serviceBusContext.InputAddress;
            serviceBusSendContext.SessionId = serviceBusContext.SessionId;

            // sending JToken because we are using default Newtonsoft deserializer/serializer
            var messageEnv = new JsonMessageEnvelope(serviceBusSendContext, JObject.Parse(body), messageTypes);
            return new JsonConsumeContext(JsonSerializer.CreateDefault(), receiveContext, messageEnv);
        }

        public void Probe(ProbeContext context)
        {
        }
    }

这里重要的部分是您在自定义反序列化器中指定消息类型是什么。由于 MassTransit 需要某种格式并忽略不符合要求的消息,因此我们在此处指定 MassTransit 所需的信息。

string[] messageTypes = { "urn:message:Microsoft.Azure.EventGrid.Models:EventGridEvent" }

这是默认方案,必须匹配才能处理消息

最后,您可以找到完整的代码Github: https://github.com/kgalic/MassTransitSample https://github.com/kgalic/MassTransitSample

边注:如果您直接向 SB 队列发送消息并想要反序列化它们,如前所述,您需要指定 ContentType,如下所示:

var message = new Message(UTF8Encoding.UTF8.GetBytes(request));
message.ContentType = "application/json"; //must have
await _senderClient.SendAsync(message);

如果您有类似的情况,您需要编写类似于 EventGridEvent 的反序列化器,您可以将其用作示例。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何为 Azure 服务编写 MassTransit Json 反序列化器 的相关文章

  • Http 标头已删除 Azure Web 应用程序

    我在 Azure 上托管的 Web 应用程序遇到问题 该应用程序是一个用于身份验证 授权的identityserver4应用程序 asp net core 此应用程序可以在本地运行 但不能在 Azure 上运行 通过跟踪来自服务器的响应标头
  • 使用 PySpark 从 azure blob 存储读取 csv 文件

    我正在尝试使用 Microsoft Azure 上的 PySpark HDInsight 集群来做一个机器学习项目 要在我的集群上进行操作 请使用 Jupyter 笔记本 另外 我的数据 一个 csv 文件 存储在 Azure Blob 存
  • Elm:如何从 JSON API 解码数据

    我有这个数据使用http jsonapi org http jsonapi org format data type prospect id 1 attributes provider user id 1 provider facebook
  • 如何使用 JSON_TABLE 从 Oracle JSON 列获取键值作为结果集

    我用谷歌搜索了很多 似乎无法找到适合我的简单用例的简单解决方案 我在 Oracle 12C 数据库中有一个 json 列 当然实际上是一个带有 json 约束的 varchar 在该列中我存储了这样的 Map 表示 a 9 0847 b 8
  • 如何在单元测试中使用 JSON 发送请求

    我的 Flask 应用程序中有在请求中使用 JSON 的代码 我可以像这样获取 JSON 对象 Request request get json 这一直工作得很好 但是我正在尝试使用 Python 的 unittest 模块创建单元测试 但
  • 有没有办法扩展 angular.json 中的配置?

    在构建 Angular 6 应用程序时 我需要同时指定两件事 如果是生产或开发版本 我正在使用的区域设置 In my angular json I have build configurations production fileRepla
  • 实体创建无用的 id 字段

    我有一个CrudRepository与两个实体 Problem 特征实体总是创建一个附加的id数据库中的字段但未选择正确的characteristic id要生成的字段JSON machine entity machine id name
  • 检索 Steam 市场上物品的价格历史记录

    关于 Steam 市场上的物品 我想知道是否有办法检索某物品在一段时间内的价格历史记录 我知道 Steam 为想要将市场特定数据集成到自己网站中的开发人员提供了一个特殊的 api 但我还没有找到任何有关以 json 形式检索商品价格历史记录
  • 将 RequestBody json 转换为对象 - Spring Boot

    我是 java 开发的初学者 但之前有 PHP 和 Python 等编程语言的经验 对于如何进行 Spring Boot 的开发几乎没有什么困惑 我正在开发一个rest API 它有以下请求 key value key1 value1 pl
  • 在 Node.js 中创建 JSON 数组

    我需要在用 Node js 编写的服务器中创建一个 JSON 字符串 以便在请求时发送到客户端 问题是这个 JSON 取决于服务器中的可用数据 因此 JSON 数组的大小并不总是相同 我已经尝试了一整天 但尽管我感觉很接近 但我仍然不明白
  • 使用 JSON 解析问题警告

    我正在尝试从网站读取 JSON 数据 我在 Windows 10 上使用 Dev C 和 mingw 编译器 这是我尝试在静态项目中运行的教程中的 JSON 解析器 define CURL STATICLIB include
  • 反序列化动态 JSON 文件 C# NewtonSoft.JSON

    正在反序列化一个动态 JSON 文件 该文件可能包含 2 个单独的类 我不知道数组中将包含哪种类型的数据 问题是 我将根对象反序列化为 Base 类型 subtests 对象被反序列化为 Subtest 但 subtests 数组可能是 B
  • Elasticsearch GET API 获取分片大小

    在 Elasticsearch 2 3 3 中 有没有办法使用返回 JSON 的 GET API 获取分片大小 目前我找到了以下几种获取shard size的方法 这两种方法都存在问题 recovery gt 使用 JSON 进行响应并提供
  • 具有 Windows Azure 托管网页 + IFrame + X-Frame-Options 的 Microsoft Dynamics CRM Online

    我正在尝试使用 Microsoft Dynamics CRM Online 其中 Windows Azure 托管一个自定义网页 该网页显示在 Microsoft Dynamics CRM Web 应用程序的 IFRAME 中 我读过了ht
  • Node.js - 异步 JSON 查询

    如果这是一个愚蠢的问题 我深表歉意 但我对 Javascript 很陌生 而 Node js 确实让我很头疼 因为它是异步的 我的目标是从 API 查询 JSON 对象并能够使用它 我试图寻找关于我应该做什么的问题和答案 但它们对我来说都没
  • Azure Functions [JavaScript / Node.js] - HTTP 调用,良好实践

    从我的 Azure 函数 在 Node js 中运行 由 EventHub 消息触发 中 我想向某个外部页面发出发布请求 就像是 module exports function context eventHubMessages var ht
  • 气流:如何将读取 json 文件的方法放入本地库中

    我必须产生一些DAG 我已将 json 表架构文件保存在GCP铲斗 https cloud google com storage docs json api v1 buckets GCP 存储桶上的文件关联到composer将被重新映射到
  • 如何在 Redis 缓存中存储机器人状态

    虽然有一些文章描述了如何在 Redis 缓存中存储机器人状态 但这是推荐的方法吗 https ankitbko github io 2016 10 Microsoft Bot Framework Use Redis to store con
  • 如何将 yii2 Restful api 中两个表的关系数据显示为 json 格式

    我遇到了将两个表中的数据显示为 JSON 格式并在 yii2 Restful api 上工作的问题 这是我的结构数据库 TABLE volunteer volunteer id int 11 NOT NULL auto increment
  • 如何获取 ADLS Gen2 中存储的文件的 MD5?

    我通过 sFTP 将每日文件接收到 ADLS gen 2 存储帐户 我需要通过检查 ADLS gen2 中存储的文件的 MD5 来验证文件 我尝试使用 BLOB API 目前它不支持 ADLS gen2 如果文件存储在 Blob 存储中 我

随机推荐