Azure 事件网格订阅控制台应用程序

2024-04-26

我想在 C# 控制台应用程序中订阅 Azure 事件网格,实际上我正在实现 eShopContainer 项目中的 EventBus 示例,我需要订阅一个主题并监听消息,处理和打印之前发送给另一个主题的消息实现 EventBus 的 C# 控制台应用程序。那么,如何使用 C# 控制台应用程序来做到这一点?

这是我的天蓝色门户,消息存储在队列存储中:

Azure 门户订阅 https://i.stack.imgur.com/qlT7p.png

这是所有消息所在的队列:

所有消息 https://i.stack.imgur.com/vA5Py.png

所以,我需要订阅并获取所有消息!


基本上可以通过三种方式在 Azure 事件网格模型中使用控制台订阅者。下图显示了它们:

请注意,混合连接 https://learn.microsoft.com/en-us/samples/azure-samples/event-grid-dotnet-hybridconnection-destination/azure-event-grid-sample/ and 恩格罗克隧道 https://ngrok.com/download用在我的Azure 事件网格测试器 https://www.codeproject.com/Articles/1254463/Azure-Event-Grid-Tester。看看他们的实现。

以下代码片段是在控制台应用程序中使用 HybridConnectionListener 的示例:

using Microsoft.Azure.Relay;
using Newtonsoft.Json.Linq;
using System;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading.Tasks;

namespace ConsoleApp3
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string connectionString = ConfigurationManager.AppSettings["HybridConnection"];
            HybridConnectionListener listener = null;

            try
            {
                listener = new HybridConnectionListener(connectionString);
                listener.Connecting += (o, hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.White;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Connecting, listener:{listener.Address}");
                    Console.ResetColor();
                };
                listener.Online += (o, hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Online, listener:{listener.Address}");
                    Console.ResetColor();
                };
                listener.Offline += (o, hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.Blue;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Offline, listener:{listener.Address}");
                    Console.ResetColor();
                };

                listener.RequestHandler = (context) =>
                {
                    try
                    {
                        if (!context.Request.Headers.AllKeys.Contains("Aeg-Event-Type", StringComparer.OrdinalIgnoreCase) || !string.Equals(context.Request.Headers["Aeg-Event-Type"], "Notification", StringComparison.CurrentCultureIgnoreCase))
                            throw new Exception("Received message is not for EventGrid subscriber");

                        string jsontext = null;
                        using (var reader = new StreamReader(context.Request.InputStream))
                        {
                            var jtoken = JToken.Parse(reader.ReadToEnd());
                            if (jtoken is JArray)
                                jsontext = jtoken.SingleOrDefault<JToken>().ToString(Newtonsoft.Json.Formatting.Indented);
                            else if (jtoken is JObject)
                                jsontext = jtoken.ToString(Newtonsoft.Json.Formatting.Indented);
                            else if (jtoken is JValue)
                                throw new Exception($"The payload (JValue) is not accepted. JValue={jtoken.ToString(Newtonsoft.Json.Formatting.None)}");
                        }

                        Console.ForegroundColor = ConsoleColor.DarkYellow;
                        Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Headers: {string.Join(" | ", context.Request.Headers.AllKeys.Where(i => i.StartsWith("aeg-") || i.StartsWith("Content-Type")).Select(i => $"{i}={context.Request.Headers[i]}"))}");
                        Console.ForegroundColor = ConsoleColor.Yellow;
                        Console.WriteLine($"{jsontext}");
                                             
                    }
                    catch (Exception ex)
                    {
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Message processing failed - {ex.Message}");
                    }
                    finally
                    {
                        context.Response.StatusCode = HttpStatusCode.NoContent;
                        context.Response.Close();
                        Console.ResetColor();
                    }
                };
                await listener.OpenAsync(TimeSpan.FromSeconds(60));
            }
            catch (Exception ex)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}");
                Console.ResetColor();
            }

            Console.ReadLine();

            if(listener != null)
                await listener.CloseAsync();
        }
    }
}

使用 AEG 订阅中的混合连接作为事件处理程序目标,所有事件都将传递到控制台应用程序,如以下屏幕片段所示:

UPDATE:

以下示例显示了订阅者的实现,其输出绑定到 signalR 服务。在这种情况下,我们需要构建两个 HttpTrigger 函数,一个用于订阅者,另一个用于 signalR 客户端,以获取特定 userId 的 url 和访问令牌:

  1. HttpTriggerGetSignalRinfo功能:

run.csx:

#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"

using System;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;

public static async Task<IActionResult> Run(HttpRequest req, SignalRConnectionInfo connectionInfo, ILogger log)
{
    log.LogInformation($"Info.Url={connectionInfo.Url}");

    return new OkObjectResult(new 
    { 
        url = connectionInfo.Url, 
        accessToken = connectionInfo.AccessToken,
    }); 
}

函数.json:

{
  "bindings": [
    {
      "authLevel": "function",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "methods": [
        "get"
      ]
    },
    {
      "type": "signalRConnectionInfo",
      "name": "connectionInfo",
      "hubName": "%AzureSignalRHubName%",
      "connectionStringSetting": "AzureSignalRConnectionString",
      "userId": "{query.userid}",
      "direction": "in"
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    }
  ]
}
  1. 信号R客户端- 控制台应用程序:

     using Microsoft.AspNetCore.SignalR.Client;
     using Newtonsoft.Json;
     using Newtonsoft.Json.Linq;
     using System;
     using System.Configuration;
     using System.Net.Http;
     using System.Threading.Tasks;
    
     namespace ConsoleApp4
     {
         class Program
         {
             static async Task Main(string[] args)
             {
                 HubConnection connection = null;
                 string userId = ConfigurationManager.AppSettings.Get("userId");
                 string signalRInfo = ConfigurationManager.AppSettings.Get("signalRInfo");
    
                 try
                 {
                     using (var client = new HttpClient())
                     {
                         var rsp = await client.GetAsync($"{signalRInfo}&userid={userId}");
                         string jsontext = await rsp.Content.ReadAsStringAsync();
                         var info = JsonConvert.DeserializeAnonymousType(jsontext, new { url = "", accessToken = "" });
    
                         connection = new HubConnectionBuilder()
                             .WithUrl(info.url, option =>
                             {
                             option.AccessTokenProvider = () =>
                                 {
                                     return Task.FromResult(info.accessToken);
                                 };
                             }).Build();
    
                         Console.ForegroundColor = ConsoleColor.Green;
                         Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] SignalR Client on {info.url}/users/{userId}");
                         Console.ResetColor();
                     }
    
                     connection.On<string, string>("SendMessage", (string headers, string message) =>
                     {
                         Console.ForegroundColor = ConsoleColor.DarkYellow;
                         Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] {headers}");
                         Console.ForegroundColor = ConsoleColor.Yellow;
                         Console.WriteLine($"{JToken.Parse(message).ToString(Formatting.Indented)}");
                         Console.ResetColor();
                     });
    
                     await connection.StartAsync();              
                 }
                 catch (Exception ex)
                 {
                     Console.ForegroundColor = ConsoleColor.Red;
                     Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}");
                     Console.ResetColor();
                 }
                 Console.ReadLine();
                 if (connection != null)
                     await connection.StopAsync();
             }       
         }
     }
    
  2. HttpTriggerSendMsgToSignalR功能-订阅者

run.csx:

#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"
#r "Newtonsoft.Json"

using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;

public static async Task<IActionResult> Run(HttpRequest req, IAsyncCollector<SignalRMessage> signalRMessages, ILogger log)
{   
    string headers = string.Join(" | ", req.Headers.Where(h => h.Key.StartsWith("aeg-") || h.Key.StartsWith("Content-Type")).Select(i => $"{i.Key}={i.Value.First()}")); 
    log.LogInformation($"Method: {req.Method} Headers: {headers}");    
          
    if (req.Method == HttpMethod.Options.ToString())
    {
        log.LogInformation("CloudEventSchema validation");               
        req.HttpContext.Response.Headers.Add("Webhook-Allowed-Origin", req.Headers["WebHook-Request-Origin"].FirstOrDefault()?.Trim());
        return (ActionResult)new OkResult();
    }
    
    var jtoken = JToken.Parse(await new StreamReader(req.Body).ReadToEndAsync());
    string eventTypeHeader = req.Headers["aeg-event-type"].FirstOrDefault()?.Trim(); 

    if(eventTypeHeader == "SubscriptionValidation") 
    {       
        if(jtoken is JArray)
            jtoken = jtoken.SingleOrDefault<JToken>();

        if(jtoken["eventType"].Value<string>() == "Microsoft.EventGrid.SubscriptionValidationEvent")
        {
            log.LogInformation("EventGridSchema validation");
            return (ActionResult)new OkObjectResult(new { validationResponse = ((dynamic)jtoken["data"]["validationCode"])});         
        }           
        return new BadRequestObjectResult($"Not valid event schema");
    }   
    else if(eventTypeHeader == "Notification") 
    {          
        await signalRMessages.AddAsync(
            new SignalRMessage
            {
                // the message will only be sent to these user IDs or if this property not exit, the bindig path will be used it
                Target = "SendMessage",
                Arguments = new[] { headers, jtoken.ToString() }
            });        
        return (ActionResult)new OkResult();  
    }
     
    return new BadRequestObjectResult($"{eventTypeHeader} is not a valid type");
}

函数.json:

{
  "bindings": [
    {
      "authLevel": "function",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "methods": [
        "options",
        "post"
      ]
    },
    {
      "type": "signalR",
      "name": "signalRMessages",
      "hubName": "%AzureSignalRHubName%/users/{query.userid}",
      "connectionStringSetting": "AzureSignalRConnectionString",
      "direction": "out"
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    }
  ]
}

请注意,Webhook 事件处理程序用于订阅者有两个原因,例如传递 CloudEvent 消息和通过 url 查询字符串参数配置 signalR 客户端 userId。

  1. 显示事件用户 ID=abcd在控制台应用程序上:

请注意,signalR 客户端实例允许为同一用户 ID 多播消息,这与混合连接相反,其中消息在侦听器实例之间进行平衡。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Azure 事件网格订阅控制台应用程序 的相关文章

  • C# 和 Javascript SHA256 哈希的代码示例

    我有一个在服务器端运行的 C 算法 它对 Base64 编码的字符串进行哈希处理 byte salt Convert FromBase64String serverSalt Step 1 SHA256Managed sha256 new S
  • ASP.NET Core Serilog 未将属性推送到其自定义列

    我有这个设置appsettings json对于我的 Serilog 安装 Serilog MinimumLevel Information Enrich LogUserName Override Microsoft Critical Wr
  • UML类图:抽象方法和属性是这样写的吗?

    当我第一次为一个小型 C 项目创建 uml 类图时 我在属性方面遇到了一些麻烦 最后我只是将属性添加为变量 lt
  • 使闭包捕获的变量变得易失性

    闭包捕获的变量如何与不同线程交互 在下面的示例代码中 我想将totalEvents 声明为易失性的 但C 不允许这样做 是的 我知道这是错误的代码 这只是一个例子 private void WaitFor10Events volatile
  • WPF 中的调度程序和异步等待

    我正在尝试学习 WPF C 中的异步编程 但我陷入了异步编程和使用调度程序的困境 它们是不同的还是在相同的场景中使用 我愿意简短地回答这个问题 以免含糊不清 因为我知道我混淆了 WPF 中的概念和函数 但还不足以在功能上正确使用它 我在这里
  • 在 Visual Studio 2008 上设置预调试事件

    我想在 Visual Studio 中开始调试程序之前运行一个任务 我每次调试程序时都需要运行此任务 因此构建后事件还不够好 我查看了设置的 调试 选项卡 但没有这样的选项 有什么办法可以做到这一点吗 你唯一可以尝试的 IMO 就是尝试Co
  • 获取没有非标准端口的原始 url (C#)

    第一个问题 环境 MVC C AppHarbor Problem 我正在调用 openid 提供商 并根据域生成绝对回调 url 在我的本地机器上 如果我点击的话 效果很好http localhost 12345 login Request
  • Qt moc 在头文件中实现?

    是否可以告诉 Qt MOC 我想声明该类并在单个文件中实现它 而不是将它们拆分为 h 和 cpp 文件 如果要在 cpp 文件中声明并实现 QObject 子类 则必须手动包含 moc 文件 例如 文件main cpp struct Sub
  • 指针减法混乱

    当我们从另一个指针中减去一个指针时 差值不等于它们相距多少字节 而是等于它们相距多少个整数 如果指向整数 为什么这样 这个想法是你指向内存块 06 07 08 09 10 11 mem 18 24 17 53 7 14 data 如果你有i
  • vector 超出范围后不清除内存

    我遇到了以下问题 我不确定我是否错了或者它是一个非常奇怪的错误 我填充了一个巨大的字符串数组 并希望在某个点将其清除 这是一个最小的例子 include
  • Qt表格小部件,删除行的按钮

    我有一个 QTableWidget 对于所有行 我将一列的 setCellWidget 设置为按钮 我想将此按钮连接到删除该行的函数 我尝试了这段代码 它不起作用 因为如果我只是单击按钮 我不会将当前行设置为按钮的行 ui gt table
  • Discord.net 无法在 Linux 上运行

    我正在尝试让在 Linux VPS 上运行的 Discord net 中编码的不和谐机器人 我通过单声道运行 但我不断收到此错误 Unhandled Exception System Exception Connection lost at
  • 将 unsigned char * (uint8_t *) 转换为 const char *

    我有一个带有 uint8 t 参数的函数 uint8 t ihex decode uint8 t in size t len uint8 t out uint8 t i hn ln for i 0 i lt len i 2 hn in i
  • 需要哪个版本的 Visual C++ 运行时库?

    microsoft 的最新 vcredist 2010 版 是否包含以前的版本 2008 SP1 和 2005 SP1 还是我需要安装全部 3 个版本 谢谢 你需要所有这些
  • WCF:将随机数添加到 UsernameToken

    我正在尝试连接到用 Java 编写的 Web 服务 但有些东西我无法弄清楚 使用 WCF 和 customBinding 几乎一切似乎都很好 除了 SOAP 消息的一部分 因为它缺少 Nonce 和 Created 部分节点 显然我错过了一
  • 为什么我收到“找不到编译动态表达式所需的一种或多种类型。”?

    我有一个已更新的项目 NET 3 5 MVC v2 到 NET 4 0 MVC v3 当我尝试使用或设置时编译出现错误 ViewBag Title财产 找不到编译动态表达式所需的一种或多种类型 您是否缺少对 Microsoft CSharp
  • 限制C#中的并行线程数

    我正在编写一个 C 程序来生成并通过 FTP 上传 50 万个文件 我想并行处理4个文件 因为机器有4个核心 文件生成需要更长的时间 是否可以将以下 Powershell 示例转换为 C 或者是否有更好的框架 例如 C 中的 Actor 框
  • 防止索引超出范围错误

    我想编写对某些条件的检查 而不必使用 try catch 并且我想避免出现 Index Out of Range 错误的可能性 if array Element 0 Object Length gt 0 array Element 1 Ob
  • 使用 libcurl 检查 SFTP 站点上是否存在文件

    我使用 C 和 libcurl 进行 SFTP FTPS 传输 在上传文件之前 我需要检查文件是否存在而不实际下载它 如果该文件不存在 我会遇到以下问题 set up curlhandle for the public private ke
  • 恢复上传文件控制

    我确实阅读了以下帖子 C 暂停 恢复上传 https stackoverflow com questions 1048330 pause resume upload in c 使用 HTTP 恢复上传 https stackoverflow

随机推荐