Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,30 @@ namespace Eventuous;

using static Diagnostics.ApplicationEventSource;

public abstract class FunctionalCommandService<TState>(IEventReader reader, IEventWriter writer, TypeMapper? typeMap = null)
/// <summary>
/// Base class for a functional command service for a given <seealso cref="State{T}"/> type.
/// Add your command handlers to the service using <see cref="On{TCommand}"/>.
/// </summary>
/// <param name="reader">Event reader or event store</param>
/// <param name="writer">Event writer or event store</param>
/// <param name="typeMap"><seealso cref="TypeMapper"/> instance or null to use the default type mapper</param>
/// <param name="amendEvent">Optional function to add extra information to the event before it gets stored</param>
/// <typeparam name="TState">State object type</typeparam>
public abstract class FunctionalCommandService<TState>(IEventReader reader, IEventWriter writer, TypeMapper? typeMap = null, AmendEvent? amendEvent = null)
: IFuncCommandService<TState>, IStateCommandService<TState> where TState : State<TState>, new() {
readonly TypeMapper _typeMap = typeMap ?? TypeMap.Instance;
readonly FuncHandlersMap<TState> _handlers = new();

bool _initialized;

protected FunctionalCommandService(IEventStore store, TypeMapper? typeMap = null)
: this(store, store, typeMap) { }
/// <summary>
/// Alternative constructor for the functional command service, which uses an <seealso cref="IEventStore"/> instance for both reading and writing.
/// </summary>
/// <param name="store">Event store</param>
/// <param name="typeMap"><seealso cref="TypeMapper"/> instance or null to use the default type mapper</param>
/// <param name="amendEvent">Optional function to add extra information to the event before it gets stored</param>
protected FunctionalCommandService(IEventStore store, TypeMapper? typeMap = null, AmendEvent? amendEvent = null)
: this(store, store, typeMap, amendEvent) { }

/// <summary>
/// Returns the command handler builder for the specified command type.
Expand All @@ -41,6 +56,14 @@ protected void OnExisting<TCommand>(GetStreamNameFromCommand<TCommand> getStream
protected void OnAny<TCommand>(GetStreamNameFromCommand<TCommand> getStreamName, ExecuteCommand<TState, TCommand> action) where TCommand : class
=> On<TCommand>().InState(ExpectedState.Any).GetStream(getStreamName).Act(action);

/// <summary>
/// Function to handle a command and return the resulting state and changes.
/// </summary>
/// <param name="command">Command to handle</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <typeparam name="TCommand">Command type</typeparam>
/// <returns><seealso cref="Result{TState}"/> instance</returns>
/// <exception cref="ArgumentOutOfRangeException">Throws when there's no command handler was registered for the command type</exception>
public async Task<Result<TState>> Handle<TCommand>(TCommand command, CancellationToken cancellationToken) where TCommand : class {
if (!_initialized) BuildHandlers();

Expand Down Expand Up @@ -73,7 +96,7 @@ public async Task<Result<TState>> Handle<TCommand>(TCommand command, Cancellatio
// Zero in the global position would mean nothing, so the receiver need to check the Changes.Length
if (newEvents.Length == 0) return new OkResult<TState>(newState, Array.Empty<Change>(), 0);

var storeResult = await writer.Store(streamName, (int)loadedState.StreamVersion.Value, newEvents, static e => e, cancellationToken).NoContext();
var storeResult = await writer.Store(streamName, (int)loadedState.StreamVersion.Value, newEvents, amendEvent, cancellationToken).NoContext();
var changes = newEvents.Select(x => new Change(x, _typeMap.GetTypeName(x)));
Log.CommandHandled<TCommand>();

Expand Down
34 changes: 17 additions & 17 deletions src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ namespace Eventuous;
using static Diagnostics.PersistenceEventSource;

public class AggregateStore : IAggregateStore {
readonly Func<StreamEvent, StreamEvent> _amendEvent;
readonly AggregateFactoryRegistry _factoryRegistry;
readonly IEventReader _eventReader;
readonly IEventWriter _eventWriter;
readonly AmendEvent _amendEvent;
readonly AggregateFactoryRegistry _factoryRegistry;
readonly IEventReader _eventReader;
readonly IEventWriter _eventWriter;

/// <summary>
/// Creates a new instance of the default aggregate store
Expand All @@ -19,11 +19,11 @@ public class AggregateStore : IAggregateStore {
/// <param name="amendEvent"></param>
/// <param name="factoryRegistry"></param>
public AggregateStore(
IEventReader reader,
IEventWriter writer,
Func<StreamEvent, StreamEvent>? amendEvent = null,
AggregateFactoryRegistry? factoryRegistry = null
) {
IEventReader reader,
IEventWriter writer,
AmendEvent? amendEvent = null,
AggregateFactoryRegistry? factoryRegistry = null
) {
_amendEvent = amendEvent ?? (x => x);
_factoryRegistry = factoryRegistry ?? AggregateFactoryRegistry.Instance;
_eventReader = Ensure.NotNull(reader);
Expand All @@ -37,10 +37,11 @@ public AggregateStore(
/// <param name="amendEvent"></param>
/// <param name="factoryRegistry"></param>
public AggregateStore(
IEventStore eventStore,
Func<StreamEvent, StreamEvent>? amendEvent = null,
AggregateFactoryRegistry? factoryRegistry = null
) : this(eventStore, eventStore, amendEvent, factoryRegistry) { }
IEventStore eventStore,
AmendEvent? amendEvent = null,
AggregateFactoryRegistry? factoryRegistry = null
)
: this(eventStore, eventStore, amendEvent, factoryRegistry) { }

public Task<AppendEventsResult> Store<T>(StreamName streamName, T aggregate, CancellationToken cancellationToken) where T : Aggregate
=> _eventWriter.Store(streamName, aggregate, _amendEvent, cancellationToken);
Expand All @@ -57,12 +58,11 @@ async Task<T> LoadInternal<T>(StreamName streamName, bool failIfNotFound, Cancel
try {
var events = await _eventReader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken);
aggregate.Load(events.Select(x => x.Payload));
}
catch (StreamNotFound) when (!failIfNotFound) {
} catch (StreamNotFound) when (!failIfNotFound) {
return aggregate;
}
catch (Exception e) {
} catch (Exception e) {
Log.UnableToLoadAggregate<T>(streamName, e);

throw e is StreamNotFound ? new AggregateNotFoundException<T>(streamName, e) : e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ namespace Eventuous;
using static Diagnostics.PersistenceEventSource;

public class AggregateStore<TReader>(
IEventStore eventStore,
TReader archiveReader,
Func<StreamEvent, StreamEvent>? amendEvent = null,
AggregateFactoryRegistry? factoryRegistry = null
IEventStore eventStore,
TReader archiveReader,
AmendEvent? amendEvent = null,
AggregateFactoryRegistry? factoryRegistry = null
) : IAggregateStore where TReader : class, IEventReader {
readonly Func<StreamEvent, StreamEvent> _amendEvent = amendEvent ?? (x => x);
readonly AggregateFactoryRegistry _factoryRegistry = factoryRegistry ?? AggregateFactoryRegistry.Instance;
readonly AggregateFactoryRegistry _factoryRegistry = factoryRegistry ?? AggregateFactoryRegistry.Instance;

public Task<AppendEventsResult> Store<T>(StreamName streamName, T aggregate, CancellationToken cancellationToken) where T : Aggregate
=> eventStore.Store(streamName, aggregate, _amendEvent, cancellationToken);
=> eventStore.Store(streamName, aggregate, amendEvent, cancellationToken);

public Task<T> Load<T>(StreamName streamName, CancellationToken cancellationToken) where T : Aggregate
=> LoadInternal<T>(streamName, true, cancellationToken);
Expand Down
9 changes: 9 additions & 0 deletions src/Core/src/Eventuous.Persistence/AmendEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (C) Ubiquitous AS.All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous;

/// <summary>
/// Function to add additional information to the event before it's stored.
/// </summary>
public delegate StreamEvent AmendEvent(StreamEvent originalEvent);
57 changes: 45 additions & 12 deletions src/Core/src/Eventuous.Persistence/EventStore/StoreFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,25 @@ namespace Eventuous;
using static Diagnostics.PersistenceEventSource;

public static class StoreFunctions {
/// <summary>
/// Stores a collection of events to the event store
/// </summary>
/// <param name="eventWriter">Event writer or event store</param>
/// <param name="streamName">Name of the stream where events will be appended to</param>
/// <param name="originalVersion">Expected version of the stream in the event store</param>
/// <param name="changes">Collection of events to store</param>
/// <param name="amendEvent">Optional: function to add extra information to an event before it gets stored</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>Append events result</returns>
/// <exception cref="Exception">Any exception that occurred in the event store</exception>
/// <exception cref="OptimisticConcurrencyException">Gets thrown if the expected stream version mismatches with <see cref="originalVersion"/></exception>
public static async Task<AppendEventsResult> Store(
this IEventWriter eventWriter,
StreamName streamName,
int originalVersion,
IReadOnlyCollection<object> changes,
Func<StreamEvent, StreamEvent> amendEvent,
CancellationToken cancellationToken
this IEventWriter eventWriter,
StreamName streamName,
int originalVersion,
IReadOnlyCollection<object> changes,
AmendEvent? amendEvent,
CancellationToken cancellationToken
) {
Ensure.NotNull(changes);

Expand All @@ -39,16 +51,27 @@ CancellationToken cancellationToken
StreamEvent ToStreamEvent(object evt, int position) {
var streamEvent = new StreamEvent(Guid.NewGuid(), evt, new Metadata(), "", position);

return amendEvent(streamEvent);
return amendEvent?.Invoke(streamEvent) ?? streamEvent;
}
}

/// <summary>
/// Store aggregate changes to the event store
/// </summary>
/// <param name="eventWriter">Event writer or event store</param>
/// <param name="streamName">Stream name for the aggregate</param>
/// <param name="aggregate">Aggregate instance</param>
/// <param name="amendEvent">Optional: function to add extra information to the event before it gets stored</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <typeparam name="T">Aggregate type</typeparam>
/// <returns>Append event result</returns>
/// <exception cref="OptimisticConcurrencyException{T}"></exception>
public static async Task<AppendEventsResult> Store<T>(
this IEventWriter eventWriter,
StreamName streamName,
T aggregate,
Func<StreamEvent, StreamEvent> amendEvent,
CancellationToken cancellationToken
this IEventWriter eventWriter,
StreamName streamName,
T aggregate,
AmendEvent? amendEvent,
CancellationToken cancellationToken
) where T : Aggregate {
Ensure.NotNull(aggregate);

Expand All @@ -61,6 +84,16 @@ CancellationToken cancellationToken
}
}

/// <summary>
/// Reads a stream from the event store to a collection of <seealso cref="StreamEvent"/>
/// </summary>
/// <param name="eventReader">Event reader or event store</param>
/// <param name="streamName">Name of the stream to read from</param>
/// <param name="start">Stream version to start reading from</param>
/// <param name="failIfNotFound">Set to true if the function needs to throw when the stream isn't found. Default is false, and if there's no
/// stream with the given name found in the store, the function will return an empty collection.</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>Collection of events wrapped in <seealso cref="StreamEvent"/></returns>
public static async Task<StreamEvent[]> ReadStream(
this IEventReader eventReader,
StreamName streamName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,56 @@ namespace Eventuous;
using static Diagnostics.PersistenceEventSource;

public static class StateStoreFunctions {
/// <summary>
/// Reads the event stream and folds it into a state object. This function will fail if the stream does not exist.
/// </summary>
/// <param name="reader">Event reader or event store</param>
/// <param name="streamName">Name of the stream to read from</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <typeparam name="T">State object type</typeparam>
/// <returns>Instance of <seealso cref="FoldedEventStream{T}"/> containing events and folded state</returns>
public static Task<FoldedEventStream<T>> LoadState<T>(this IEventReader reader, StreamName streamName, CancellationToken cancellationToken)
where T : State<T>, new()
=> reader.LoadEventsInternal<T>(streamName, true, cancellationToken);

/// <summary>
/// Reads the event stream and folds it into a state object. This function will fail if the stream does not exist.
/// </summary>
/// <param name="reader">Event reader or event store</param>
/// <param name="id">State identity value</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <param name="streamNameMap">Mapper between identity and stream name</param>
/// <typeparam name="T">State object type</typeparam>
/// <typeparam name="TId">State identity type</typeparam>
/// <returns>Instance of <seealso cref="FoldedEventStream{T}"/> containing events and folded state</returns>
public static async Task<FoldedEventStream<T>> LoadState<T, TId>(this IEventReader reader, StreamNameMap streamNameMap, TId id, CancellationToken cancellationToken)
where T : State<T>, new() where TId : Id {
var foldedStream = await reader.LoadEventsInternal<T>(streamNameMap.GetStreamName(id), true, cancellationToken);
return foldedStream with { State = foldedStream.State.WithId(id) };
}

/// <summary>
/// Reads the event stream and folds it into a state object. This function will return a new state instance if the stream does not exist.
/// </summary>
/// <param name="reader">Event reader or event store</param>
/// <param name="streamName">Name of the stream to read from</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <typeparam name="T">State object type</typeparam>
/// <returns>Instance of <seealso cref="FoldedEventStream{T}"/> containing events and folded state</returns>
public static Task<FoldedEventStream<T>> LoadStateOrNew<T>(this IEventReader reader, StreamName streamName, CancellationToken cancellationToken)
where T : State<T>, new()
=> reader.LoadEventsInternal<T>(streamName, false, cancellationToken);

/// <summary>
/// Reads the event stream and folds it into a state object. This function will return a new state instance if the stream does not exist.
/// </summary>
/// <param name="reader">Event reader or event store</param>
/// <param name="id">State identity value</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <param name="streamNameMap">Mapper between identity and stream name</param>
/// <typeparam name="T">State object type</typeparam>
/// <typeparam name="TId">State identity type</typeparam>
/// <returns>Instance of <seealso cref="FoldedEventStream{T}"/> containing events and folded state</returns>
public static async Task<FoldedEventStream<T>> LoadStateOrNew<T, TId>(this IEventReader reader, StreamNameMap streamNameMap, TId id, CancellationToken cancellationToken)
where T : State<T>, new()
where TId : Id {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
namespace Eventuous.Tests.Application;

public class BookingFuncService : FunctionalCommandService<BookingState> {
public BookingFuncService(IEventStore store, TypeMapper? typeMap = null)
: base(store, typeMap) {
public BookingFuncService(IEventStore store, TypeMapper? typeMap = null, AmendEvent? amendEvent = null)
: base(store, typeMap, amendEvent) {
#pragma warning disable CS0618 // Type or member is obsolete
OnNew<BookRoom>(cmd => GetStream(cmd.BookingId), BookRoom);
#pragma warning restore CS0618 // Type or member is obsolete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public async Task ExecuteOnExistingStream() {

[Fact]
public async Task ExecuteOnAnyForNewStream() {
var bookRoom = GetBookRoom();
var paymentTime = DateTimeOffset.Now;
var bookRoom = GetBookRoom();

var cmd = new Commands.ImportBooking {
BookingId = "dummy",
Expand All @@ -69,6 +68,21 @@ public async Task ExecuteOnAnyForNewStream() {
result.Changes.Should().HaveCount(1);
}

[Fact]
public async Task AmendEventAddsMeta() {
var service = new BookingFuncService(_store, amendEvent: AddMeta);
var cmd = GetBookRoom();

await service.Handle(cmd, default);

var stream = await _store.ReadStream(StreamName.For<Booking>(cmd.BookingId), StreamReadPosition.Start, true, default);
stream[0].Metadata["foo"].Should().Be("bar");

return;

StreamEvent AddMeta(StreamEvent evt) => evt with { Metadata = new Metadata { ["foo"] = "bar" } };
}

static Commands.BookRoom GetBookRoom() {
var checkIn = LocalDate.FromDateTime(DateTime.Today);
var checkOut = checkIn.PlusDays(1);
Expand Down