处理聚合的所有事件

2024-05-14

请参阅下面我的第一个持久订阅:

 namespace PersistentSubscription
    {
        internal class Program
        {
            private static void Main()
            {
                var subscription = new PersistentSubscriptionClient();
                subscription.Start();
            }
        }

        public class PersistentSubscriptionClient
        {
            private IEventStoreConnection _conn;
            private const string STREAM = "$ce-customer";
            private const string GROUP = "a_test_group";
            private const int DEFAULTPORT = 1113;
            private static readonly UserCredentials User = new UserCredentials("admin", "changeit");
            private EventStorePersistentSubscriptionBase _subscription;

            public void Start()
            {
                var settings = ConnectionSettings.Create(); 

                using (_conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT)))
                {
                    _conn.ConnectAsync().Wait();

                    CreateSubscription(); 
                    ConnectToSubscription();

                    Console.WriteLine("waiting for events. press enter to exit");
                    Console.ReadLine();
                }
            }

            private void ConnectToSubscription()
            {
                var bufferSize = 10;
                var autoAck = true;

                Action<EventStorePersistentSubscriptionBase, ResolvedEvent> eventAppeared = EventAppeared; 
                _subscription = _conn.ConnectToPersistentSubscription(STREAM, GROUP, eventAppeared, SubscriptionDropped, User, bufferSize, autoAck);
            }

            private void SubscriptionDropped(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase,
                SubscriptionDropReason subscriptionDropReason, Exception ex)
            {
                ConnectToSubscription();
            }

            private static void EventAppeared(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase,
                ResolvedEvent resolvedEvent)
            {
                MemoryStream stream = new MemoryStream(resolvedEvent.Event.Data);
                IFormatter formatter = new BinaryFormatter();
                stream.Seek(0, SeekOrigin.Begin);
                try
                {
                    CustomerCreated customerCreated = (CustomerCreated)formatter.Deserialize(stream); 
                    Console.WriteLine(customerCreated);
                }
                catch (Exception e)
                {
                    var test = "test";
                }

            }

            private void CreateSubscription()
            {
                PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
                    .DoNotResolveLinkTos()
                    .StartFromCurrent();

                try
                {
                    _conn.CreatePersistentSubscriptionAsync(STREAM, GROUP, settings, User).Wait();
                }
                catch (AggregateException ex)
                {
                    if (ex.InnerException.GetType() != typeof(InvalidOperationException)
                        && ex.InnerException?.Message != $"Subscription group {GROUP} on stream {STREAM} already exists")
                    {
                        throw;
                    }
                }
            }
        }
    }

下面是我的第一个客户:

using System;
using System.IO;
using System.Net;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Text;
using EventStore.ClientAPI;

namespace WritingEvents
{
    class Program
    {
        static void Main(string[] args)
        {
            const int DEFAULTPORT = 1113;
            var settings = ConnectionSettings.Create();
            using (var conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT)))
            {
                conn.ConnectAsync().Wait();
                CustomerCreated c1 = new CustomerCreated { Id = Guid.NewGuid(), Name = "Maria" };
                EventData customerCreated1 = GetEventDataFor(c1);
                conn.AppendToStreamAsync("customer-100", ExpectedVersion.Any, customerCreated1).Wait();
            }
        }

        private static EventData GetEventDataFor(CustomerCreated customerCreated)
        {
            IFormatter formatter = new BinaryFormatter();
            MemoryStream stream = new MemoryStream();
            formatter.Serialize(stream, customerCreated);
            byte[] customerCreatedEventByteArray = stream.ToArray();



            return new EventData(
                Guid.NewGuid(),
                "eventType",
                true,
                customerCreatedEventByteArray,
                null
                );
        }
    }

    [Serializable]
    public class CustomerCreated
    {
        public Guid Id { get; set; }
        public string Name { get; set; }
    }
}

我先运行服务器,然后运行客户端。在服务器端反序列化 CustomerCreated 事件时,我看到错误。错误是:“在解析完成之前遇到流结尾”。

如果我改变这一行:

private const string STREAM = "$ce-customer";

to this:

private const string STREAM = "customer-100";

然后反序列化在服务器端正常工作。

我如何处理所有客户事件 - 而不仅仅是客户 100?

I have --run-projections=all启动事件存储时。我还启用了所有投影:


这个问题帮助了我:使用事件存储客户端 API (.NET),如何写入流并将一个事件链接到另一个事件? https://stackoverflow.com/questions/50906995/using-the-event-store-client-api-net-how-to-i-write-to-a-stream-and-link-one

我只需要改变这一点:

PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
                .DoNotResolveLinkTos() //Specifically this line
                .StartFromCurrent();

to this:

PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
                .ResolveLinkTos() //Specifically this line
                .StartFromCurrent();

DoNotResolveLinkTos gets a link到原始事件,而 ResolveLinkTos 获取实际事件本身。因此,我试图反序列化链接对象,这导致了异常。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

处理聚合的所有事件 的相关文章

  • std::vector 的复制构造函数如何运行?

    一个如何std vector
  • C# 中的 Culture 相当于 Java 中的 Locale 吗?

    C 使用文化的概念 这在操作上与 Java 中的 Locale 类似吗 或者底层概念是否存在显着差异 从文化而不是语言环境的角度进行工作是一种寻找正确抽象层次的尝试 从以类似方式做事的人群的角度来考虑事物 而不是谈论地理区域和语言 并有点疯
  • 使用 QTextCursor 选择一段文本

    使用 Qt 框架选择文本片段时遇到问题 例如 如果我有这个文件 没有时间休息 我想选择 ime for r 并从文档中删除这段文本 我应该如何使用 QTextCursor 来做到这一点 这是我的代码 QTextCursor cursor n
  • 函数原型和数组参数

    我正在学习 C 语法 并且已经开始研究数组了 我想问你一个问题 但首先让我回顾一下 这样我就知道我已经弄清楚了 我知道您可以使用以下语法将变量定义为数组 name
  • 将内核链接到 PTX 函数

    我可以使用 PTX 文件中包含的 PTX 函数作为外部设备函数 将其链接到另一个应调用该函数的 cu 文件吗 这是另一个问题CUDA 将内核链接在一起 https stackoverflow com questions 20636800 c
  • 设置外部应用程序焦点

    在 VB NET 中 您可以使用以下命令将焦点设置到外部应用程序 AppActivate Windows Name or AppActivate processID As Integer 现在 如果您这样做 则效果很好 Dim intNot
  • 为什么测试在 TeamCity 中运行比直接在 NUnit 中运行需要更长的时间?

    我进行了一些 C 性能测试 基本上运行两种不同的方法 并检查一种方法的运行速度是否比另一种方法快得多 当我在 NUnit 本地运行它们时 其中一个测试的运行速度是另一个测试的十倍 因此我有一个 NUnit 测试 它使用Stopwatch检查
  • 为什么Boost在“程序选项”中使用全局函数覆盖来实现自定义验证器

    这个例子 http www boost org doc libs 1 55 0 doc html program options howto html idp163429032显示一个名为validate在全局范围内定义重载函数boost
  • 编译器在函数名称前添加下划线前缀的原因是什么?

    当我看到 C 应用程序的汇编代码时 如下所示 emacs hello c clang S O hello c o hello s cat hello s 函数名称以下划线作为前缀 例如callq printf 为什么这样做以及它有什么优点
  • 适用于 Windows 的键值数据库?

    除了 MongoDB 和 Memcached 之外 Windows 上还运行哪些键值存储 我见过的大多数似乎只能在 Linux 上运行 Hypertable Redis Lightcloud 相关链接 是否有经过商业验证的云存储 Key g
  • 使用 for 循环创建链表

    这是我的结构 struct ListItem int data struct ListItem next 假设链表的第一个节点的 data 0 我想编写一个 for 循环来创建大小为 5 的链表 但我不知道如何工作 我尝试了以下方法 int
  • 复杂的 C 声明

    我刚刚在互联网上浏览了一些代码 发现了这个 float foo SIZE SIZE 我如何阅读这份声明 是否有一套特定的规则来阅读如此复杂的声明 我有一段时间没做这个了 从 开始foo然后向右走 float foo SIZE SIZE fo
  • C语言:如何获取使用strtok()一次后剩余的字符串

    我的字符串是 A B C D E 分隔符是 如何获取执行 strtok 一次后剩余的字符串 即 B C D E char a A B C D E char separator char b strtok a separator printf
  • MPI_Gatherv:根数组中收到的垃圾值

    我正在尝试实施MPI Gatherv函数于C 根据我的程序 包括 root 在内的每个进程都应该创建一个大小等于 进程的等级 1 这将在所有单元格中保持进程的等级 然后这个本地数组被收集到根的 rcv array 中 不知何故 我得到了垃圾
  • 将函数作为函数参数传递

    Unity C 似乎无法识别Func lt gt 作为函数委托的符号 那么 如何将函数作为函数参数传递呢 我有一个想法Invoke functionName 0 可能有帮助 但我不确定它是否实际上立即调用该函数 或者等待帧结束 还有别的办法
  • 使用std::begin()、std::end()将ArrayXd转换为stl向量,

    在我看来我应该能够使用std begin and std end 转换ArrayXd to std vector
  • C# 记录类型:记录子类之间的相等比较

    给定父记录类型 public record Foo string Value 和两个记录子类Bar and Bee我想知道是否可以实施Equals在基类中 因此 Foo Bar 或 Bee 的实例都被考虑equal基于Value 两者都与E
  • 即使对于新上下文,OnModelCreating 也仅调用一次

    我有多个相同但内容不同的 SQL Server 表 在编写代码优先 EF6 程序时 我尝试为每个程序重用相同的数据库上下文 并将表名称传递给上下文构造函数 然而 虽然每次都会调用构造函数 但尽管每次都是从 new 创建数据库上下文 但 On
  • 如何通过Task.ContinueWith创建传递?

    我想在原始任务结束时添加一个任务 但想保留原始结果和类型 附加任务仅用于记录目的 例如写入控制台等 例如 Task Run gt DateTime Now Hour gt 12 Hey throw new Exception Continu
  • 字符串常量之前应有非限定 ID

    我目前正在编写一个 C 应用程序 它与 math h 结合实现了振荡器 我拥有的代码应该可以很好地用于该应用程序 尝试编译目标文件 但是我遇到编译器错误 很可能与语法 等有关 我认为这与命名空间有关 错误 终端输出 User Name Ma

随机推荐

  • 如何创建Python Egg文件

    我对 Python 中的 Egg 文件有疑问 我有很多按包组织的Python代码 我正在尝试创建egg文件 我正在跟进指示 http peak telecommunity com DevCenter PythonEggs building
  • 如何加载位于 Windows Phone 7 中应用程序文件夹内的 XML 文件?

    我正在开发 Windows Phone 7 应用程序 我是 Windows Phone 7 应用程序的新手 我通过右键单击项目并选择 添加 gt 新项目 在项目中添加了 XML 文件 然后 我可以使用以下代码轻松地将 XML 文件加载到我的
  • 背景颜色的不透明度,但不是文本的不透明度[重复]

    这个问题在这里已经有答案了 如何使背景的跨浏览器 包括 Internet Explorer 6 透明div而文本仍然不透明 我需要在不使用任何库 例如 jQuery 等 的情况下完成此操作 但如果您知道有一个库可以实现此目的 我很想知道 这
  • 嵌入 IronRuby 和 IronPython

    我正在尝试在示例应用程序中运行ironruby 和ironpython 我遇到了一个例外 未处理的异常 System Reflection TargetInitationException 异常有 被调用的目标抛出 gt System Re
  • 在Windows Server 2003下如何在本地系统帐户下运行jvisualvm.exe?

    我在带有 Java 1 6 u 20 的 Windows Server 2003 下将 GlassFish 3 0 1 作为 Windows 服务运行 总体上我很满意 我希望能够在这个 JVM 上使用 VisualVM 并使用无法在 Tom
  • 在单个显示器中绘制多个 jpeg 图像

    我需要在单个组合显示器 或画布 中绘制和显示多个 jpeg 图像 例如 假设我有图像 a b c d jpg 每个图像的大小不同 我想将它们绘制在 2x2 网格的一页上 能够为每个子图设置标题也很好 我一直在彻底寻找解决方案 但不知道如何去
  • C# 证书生成框架

    有谁知道可以生成公钥 私钥 X 509 证书并签署这些证书的 C 框架 BouncyCastleCrypto 虽然这个名字很疯狂 但我很确定它具有所有这些功能 几乎所有 RFC 标准均已实施 当我几年前使用它时 它的文档记录很少 但是单元测
  • ElasticSearch 映射对分组文档进行折叠/执行操作的结果

    有一个对话列表 每个对话都有一个消息列表 每条消息都有不同的字段和action场地 我们需要考虑到在对话的第一条消息中使用了动作A 在几条消息之后有使用的动作A 1过了一会儿A 1 1等等 有一个聊天机器人意图列表 对对话的消息操作进行分组
  • 从图像坐标获取对象的世界坐标

    I have been following this http docs opencv org modules calib3d doc camera calibration and 3d reconstruction html docume
  • 使用 cfchart 标签在单个饼图中显示多个查询的数据

    请考虑以下代码 现在我的代码中有以下代码 cfm页面内的 tag DataSource xx xx x xx Name of the database sgemail Name of the relevant column event vc
  • 如何通过我的 ios 应用程序的指示打开苹果地图应用程序

    我的目标是从 ios 应用程序打开带有方向的地图应用程序 我可以打开地图应用程序 但它没有显示方向 我编写的代码如下 NSString mystr NSString alloc initWithFormat http maps apple
  • 使用无图像按钮有哪些优点?

    讨论关于这个答案 https stackoverflow com questions 520640 how can i use googles new imageless button how could i reverse enginee
  • 警告变量值

    如何在警报框中显示 javascript 中变量的值 例如 我有一个变量 x 100 并且alert x 不起作用 油脂猴中使用的脚本在这里 var inputs document getElementsByTagName input va
  • PHP实现的机票预订系统

    如何防止预订系统中的座位被重复预订 我正在用 PHP 和 MYSQL 制作一个航空旅行预订系统模型作为一个项目 我有一个小问题 仅在付款后 门票和座位详细信息才会永久存储在此处 座位号在付款前分配 假设人 1 预订了飞机上的座位 x 并支付
  • 连接外部 Accumulo 实例和 java

    我正在尝试使用 Accumulo 连接到虚拟机 问题是 我无法将其连接到 Java 中 我可以看到 Apache 抛出的网页 但我无法让它与代码一起工作 我认为这是缺乏知识的问题而不是真正的问题 但我找不到这方面的文档 所有示例都使用 lo
  • Ioncube 编码的文件是否可以解码?

    我是一名 php 开发人员 我的客户计划分发一个使用 Php 开发的软件 计划使用 ioncube 或类似软件对文件进行编码 在谷歌搜索时 我发现很少有人解码这些文件 这些文件使用 ioncube 甚至其他软件进行编码 如果您询问是否可以破
  • getappdata 在 MATLAB 中返回空矩阵

    我有一段代码 我在其中使用setappdata然后我使用以下方式调用数据getappdata即使它不为空 它也会返回一个空矩阵 我的一段简化代码如下 function edit1 Callback hObject eventdata han
  • 在哪里可以获得几乎所有英语单词的列表? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我想生成一些随机文本 我尝试写一个基本的Java程序 int nowords r nextInt 2000 int i j for i 0
  • Windows 上的 Node.js 和 Express

    今晚 我决定尝试在我的 Windows 7 计算机上使用 Express 构建一个简单的 Node js 应用程序 安装过程还算顺利 但 Express 拒绝配合 以下是我已采取的步骤 使用以下位置提供的 MSI 安装 Node jshtt
  • 处理聚合的所有事件

    请参阅下面我的第一个持久订阅 namespace PersistentSubscription internal class Program private static void Main var subscription new Per