我已经编写了一个事件源聚合,现在实现了一个事件源传奇...我注意到两者是相似的,并创建了一个事件源对象作为两者派生的基类。
我在这里看过一个演示http://blog.jonathanoliver.com/cqrs-sagas-with-event-source-part-ii-of-ii/ http://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-ii-of-ii/但感觉可能存在问题,因为命令的发送在写入事务之外,因此如果进程崩溃,命令可能会丢失?
public void Save(ISaga saga)
{
var events = saga.GetUncommittedEvents();
eventStore.Write(new UncommittedEventStream
{
Id = saga.Id,
Type = saga.GetType(),
Events = events,
ExpectedVersion = saga.Version - events.Count
});
foreach (var message in saga.GetUndispatchedMessages())
bus.Send(message); // can be done in different ways
saga.ClearUncommittedEvents();
saga.ClearUndispatchedMessages();
}
相反,我使用 Greg Young 的 EventStore,当我保存 EventSourcedObject(聚合或传奇)时,顺序如下:
- 存储库获取新 MutatingEvents 的列表。
- 将它们写入流。
- 当流被写入并提交到流时,EventStore 会触发新事件。
- 我们监听来自 EventStore 的事件并在 EventHandler 中处理它们。
我正在实现传奇的两个方面:
- 参加events, 这可能过渡态,这反过来又可能发出命令.
- 拥有一个alarm在未来的某个时刻(通过外部计时器服务)我们可以被回调)。
问题
我认为事件处理程序不应发出命令(如果命令失败会发生什么?) - 但我对上述内容是否满意,因为 Saga 是通过此控制命令创建(对事件做出反应)的实际事物eventproxy,任何命令发送失败都可以在外部处理(在处理事件的外部EventHandler中)CommandEmittedFromSaga
如果命令失败则重新发送)?
或者我忘记包装事件并存储本机Commands
and Events
在同一个流中(与基类消息混合 - Saga 将消耗命令和事件,聚合只会消耗事件)?
网上还有其他用于实现事件溯源 Sagas 的参考资料吗?有什么可以让我理智地检查我的想法吗?
一些背景代码如下。
Saga 发出一个运行命令(包含在 CommandEmissedFromSaga 事件中)
下面的命令包含在事件中:
public class CommandEmittedFromSaga : Event
{
public readonly Command Command;
public readonly Identity SagaIdentity;
public readonly Type SagaType;
public CommandEmittedFromSaga(Identity sagaIdentity, Type sagaType, Command command)
{
Command = command;
SagaType = sagaType;
SagaIdentity = sagaIdentity;
}
}
Saga 在未来某个时刻请求回调(AlarmRequestedBySaga 事件)
警报回调请求被包装在一个事件中,并将在请求的时间或之后向 Saga 触发事件:
public class AlarmRequestedBySaga : Event
{
public readonly Event Event;
public readonly DateTime FireOn;
public readonly Identity Identity;
public readonly Type SagaType;
public AlarmRequestedBySaga(Identity identity, Type sagaType, Event @event, DateTime fireOn)
{
Identity = identity;
SagaType = sagaType;
Event = @event;
FireOn = fireOn;
}
}
或者,我可以将命令和事件存储在同一基本类型消息流中
public abstract class EventSourcedSaga
{
protected EventSourcedSaga() { }
protected EventSourcedSaga(Identity id, IEnumerable<Message> messages)
{
Identity = id;
if (messages == null) throw new ArgumentNullException(nameof(messages));
var count = 0;
foreach (var message in messages)
{
var ev = message as Event;
var command = message as Command;
if(ev != null) Transition(ev);
else if(command != null) _messages.Add(command);
else throw new Exception($"Unsupported message type {message.GetType()}");
count++;
}
if (count == 0)
throw new ArgumentException("No messages provided");
// All we need to know is the original number of events this
// entity has had applied at time of construction.
_unmutatedVersion = count;
_constructing = false;
}
readonly IEventDispatchStrategy _dispatcher = new EventDispatchByReflectionStrategy("When");
readonly List<Message> _messages = new List<Message>();
readonly int _unmutatedVersion;
private readonly bool _constructing = true;
public readonly Identity Identity;
public IList<Message> GetMessages()
{
return _messages.ToArray();
}
public void Transition(Event e)
{
_messages.Add(e);
_dispatcher.Dispatch(this, e);
}
protected void SendCommand(Command c)
{
// Don't add a command whilst we are in the constructor. Message
// state transition during construction must not generate new
// commands, as those command will already be in the message list.
if (_constructing) return;
_messages.Add(c);
}
public int UnmutatedVersion() => _unmutatedVersion;
}