Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,40 @@
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<PropertyGroup Label="Package versions for .NET 9" Condition="'$(TargetFramework)' == 'net9.0'">
<MicrosoftTestHostVer>9.0.0</MicrosoftTestHostVer>
<MicrosoftTestHostVer>9.0.6</MicrosoftTestHostVer>
</PropertyGroup>
<PropertyGroup Label="Package versions for .NET 8" Condition="'$(TargetFramework)' == 'net8.0'">
<MicrosoftTestHostVer>8.0.6</MicrosoftTestHostVer>
<MicrosoftTestHostVer>8.0.17</MicrosoftTestHostVer>
</PropertyGroup>
<PropertyGroup Label="Testcontainers version">
<TestcontainersVersion>4.1.0</TestcontainersVersion>
<TestcontainersVersion>4.6.0</TestcontainersVersion>
</PropertyGroup>
<PropertyGroup>
<NpgsqlVersion>9.0.1</NpgsqlVersion>
<NpgsqlVersion>9.0.3</NpgsqlVersion>
<TUnitVersion>0.5.18</TUnitVersion>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="BenchmarkDotNet" Version="0.14.0" />
<PackageVersion Include="FluentValidation" Version="11.11.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Json" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="9.0.6" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.6" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.6" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="9.0.6" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.6" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="9.0.6" />
<PackageVersion Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="9.0.6" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageVersion Include="Shouldly" Version="4.2.1" />
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.0" />
<PackageVersion Include="System.Reactive" Version="6.0.1" />
<PackageVersion Include="System.Linq.Async" Version="6.0.1" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.10.0" />
<PackageVersion Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="23.3.7" />
<PackageVersion Include="EventStore.Client.Grpc.Streams" Version="23.3.7" />
<PackageVersion Include="MongoDB.Driver" Version="3.1.0" />
<PackageVersion Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="23.3.9" />
<PackageVersion Include="EventStore.Client.Grpc.Streams" Version="23.3.9" />
<PackageVersion Include="MongoDB.Driver" Version="3.4.0" />
<PackageVersion Include="Google.Cloud.PubSub.V1" Version="3.19.0" />
<PackageVersion Include="Confluent.Kafka" Version="2.6.1" />
<PackageVersion Include="Npgsql" Version="$(NpgsqlVersion)" />
Expand Down Expand Up @@ -67,7 +67,7 @@
<PackageVersion Include="Bogus" Version="35.6.1" />
<PackageVersion Include="Fare" Version="2.2.1" />
<PackageVersion Include="FluentAssertions" Version="7.0.0" />
<PackageVersion Include="coverlet.collector" Version="6.0.2" PrivateAssets="All" />
<PackageVersion Include="coverlet.collector" Version="6.0.4" PrivateAssets="All" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="$(MicrosoftTestHostVer)" />
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="$(MicrosoftTestHostVer)" />
<PackageVersion Include="RestSharp" Version="112.1.0" />
Expand All @@ -85,7 +85,7 @@
<PackageVersion Include="System.Text.Json" Version="9.0.0" />
</ItemGroup>
<ItemGroup Label="Packages for samples">
<PackageVersion Include="MongoDB.Driver.Core.Extensions.DiagnosticSources" Version="2.0.0" />
<PackageVersion Include="MongoDB.Driver.Core.Extensions.DiagnosticSources" Version="2.1.0" />
<PackageVersion Include="MongoDB.Driver.Core" Version="2.30.0" />
<PackageVersion Include="Npgsql.OpenTelemetry" Version="$(NpgsqlVersion)" />
<PackageVersion Include="OpenTelemetry" Version="1.9.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public async Task<Result<TState>> Handle<TCommand>(TCommand command, Cancellatio
var result = await registeredHandler.Handler(aggregate!, command, cancellationToken).NoContext();

// Zero in the global position would mean nothing, so the receiver needs to check the Changes.Length
if (result.Changes.Count == 0) return Result<TState>.FromSuccess(result.State, Array.Empty<Change>(), 0);
if (result.Changes.Count == 0) return Result<TState>.FromSuccess(result.State, [], 0);

var proposed = new ProposedAppend(stream, new(result.OriginalVersion), result.Changes.Select(x => new ProposedEvent(x, new())).ToArray());
var final = registeredHandler.AmendAppend?.Invoke(proposed, command) ?? proposed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Eventuous;

public static class CommandServiceDelegates {
static class CommandServiceDelegates {
internal delegate ValueTask<TAggregate> HandleUntypedCommand<TAggregate, TState>(TAggregate aggregate, object command, CancellationToken cancellationToken)
where TAggregate : Aggregate<TState> where TState : State<TState>, new();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public async Task<Result<TState>> Handle<TCommand>(TCommand command, Cancellatio
var newState = newEvents.Aggregate(loadedState.State, (current, evt) => current.When(evt.Data));

// Zero in the global position would mean nothing, so the receiver needs to check the Changes.Length
if (newEvents.Length == 0) return Result<TState>.FromSuccess(newState, Array.Empty<Change>(), 0);
if (newEvents.Length == 0) return Result<TState>.FromSuccess(newState, [], 0);

var proposed = new ProposedAppend(streamName, loadedState.StreamVersion, newEvents);
var final = registeredHandler.AmendAppend?.Invoke(proposed, command) ?? proposed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Eventuous;

public static partial class FuncServiceDelegates {
static partial class FuncServiceDelegates {
internal static ExecuteUntypedCommand<TState> AsExecute<TCommand, TState>(this Func<TCommand, CancellationToken, Task<NewEvents>> execute)
where TState : State<TState> where TCommand : class
=> async (_, _, command, token) => await execute((TCommand)command, token).NoContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Eventuous;

public static partial class FuncServiceDelegates {
static partial class FuncServiceDelegates {
internal delegate ValueTask<StreamName> GetStreamNameFromUntypedCommand(object command, CancellationToken cancellationToken);

internal delegate ValueTask<NewEvents> ExecuteUntypedCommand<in T>(T state, object[] events, object command, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ protected override void GetDependencies(IServiceProvider provider) {
Log = LoggerFactory.CreateLogger(GetType());
}

protected virtual ILoggingBuilder ConfigureLogging(ILoggingBuilder builder) => builder;

public abstract Task<ulong> GetLastPosition();

public override async Task InitializeAsync() {
Expand Down
3 changes: 0 additions & 3 deletions src/Directory.Testable.targets
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,4 @@
<IsPackable>false</IsPackable>
<NoWarn>CA1816</NoWarn>
</PropertyGroup>
<Target Name="WorkaroundRider117732" AfterTargets="Build" Condition="!$([MSBuild]::IsOSPlatform('Windows'))">
<Copy Condition="Exists('$(OutputPath)$(AssemblyName)')" SourceFiles="$(OutputPath)$(AssemblyName)" DestinationFiles="$(OutputPath)$(AssemblyName).exe"/>
</Target>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;

namespace Eventuous.Tests.AspNetCore;
namespace Eventuous.Tests.DependencyInjection;

public class AggregateFactoryRegistrationTests {
readonly AggregateFactoryRegistry _registry;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<IncludeTestHost>true</IncludeTestHost>
<RootNamespace>Eventuous.Tests.AspNetCore</RootNamespace>
<OutputType>Exe</OutputType>
</PropertyGroup>

Expand Down
49 changes: 47 additions & 2 deletions src/Mongo/src/Eventuous.Projections.MongoDB/MongoProjector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public abstract class MongoProjector<T>(IMongoDatabase database, MongoProjection
: BaseEventHandler where T : ProjectedDocument {
[PublicAPI]
protected IMongoCollection<T> Collection { get; } =
options != null ? Ensure.NotNull(database).GetCollection<T>(options?.CollectionName) : Ensure.NotNull<IMongoDatabase>(database).GetDocumentCollection<T>();
options != null
? Ensure.NotNull(database).GetCollection<T>(options?.CollectionName)
: Ensure.NotNull<IMongoDatabase>(database).GetDocumentCollection<T>();

readonly Dictionary<Type, ProjectUntypedEvent> _handlers = new();
readonly ITypeMapper _map = typeMap ?? TypeMap.Instance;
Expand Down Expand Up @@ -62,25 +64,61 @@ protected void On<TEvent>(Func<MongoOperationBuilder<TEvent, T>, MongoOperationB
On(operation);
}

/// <summary>
/// Register a handler for an event type that updates a document by ID extracted from the event
/// </summary>
/// <param name="getId">Function to extract document ID from the event</param>
/// <param name="getUpdate">Function to build the update operation</param>
/// <typeparam name="TEvent">Event type</typeparam>
[PublicAPI]
protected void On<TEvent>(GetDocumentIdFromEvent<TEvent> getId, BuildUpdate<TEvent, T> getUpdate) where TEvent : class
=> On<TEvent>(b => b.UpdateOne.Id(x => getId(x.Message)).UpdateFromContext(getUpdate));

/// <summary>
/// Register a handler for an event type that updates a document by ID extracted from the stream
/// </summary>
/// <param name="getId">Function to extract document ID from the stream</param>
/// <param name="getUpdate">Function to build the update operation</param>
/// <typeparam name="TEvent">Event type</typeparam>
protected void On<TEvent>(GetDocumentIdFromStream getId, BuildUpdate<TEvent, T> getUpdate) where TEvent : class
=> On<TEvent>(b => b.UpdateOne.Id(x => getId(x.Stream)).UpdateFromContext(getUpdate));

/// <summary>
/// Register a handler for an event type that updates documents matching a filter
/// </summary>
/// <param name="getFilter">Function to build the filter for matching documents</param>
/// <param name="getUpdate">Function to build the update operation</param>
/// <typeparam name="TEvent">Event type</typeparam>
[PublicAPI]
protected void On<TEvent>(BuildFilter<TEvent, T> getFilter, BuildUpdate<TEvent, T> getUpdate) where TEvent : class
=> On<TEvent>(b => b.UpdateOne.Filter(getFilter).UpdateFromContext(getUpdate));

/// <summary>
/// Register an asynchronous handler for an event type that updates a document by ID extracted from the event
/// </summary>
/// <param name="getId">Function to extract document ID from the event</param>
/// <param name="getUpdate">Async function to build the update operation</param>
/// <typeparam name="TEvent">Event type</typeparam>
[PublicAPI]
protected void OnAsync<TEvent>(GetDocumentIdFromEvent<TEvent> getId, BuildUpdateAsync<TEvent, T> getUpdate) where TEvent : class
=> On<TEvent>(b => b.UpdateOne.Id(x => getId(x.Message)).UpdateFromContext(getUpdate));

/// <summary>
/// Register an asynchronous handler for an event type that updates a document by ID extracted from the stream
/// </summary>
/// <param name="getId">Function to extract document ID from the stream</param>
/// <param name="getUpdate">Async function to build the update operation</param>
/// <typeparam name="TEvent">Event type</typeparam>
[PublicAPI]
protected void OnAsync<TEvent>(GetDocumentIdFromStream getId, BuildUpdateAsync<TEvent, T> getUpdate) where TEvent : class
=> On<TEvent>(b => b.UpdateOne.Id(x => getId(x.Stream)).UpdateFromContext(getUpdate));

/// <summary>
/// Register an asynchronous handler for an event type that updates documents matching a filter
/// </summary>
/// <param name="getFilter">Function to build the filter for matching documents</param>
/// <param name="getUpdate">Async function to build the update operation</param>
/// <typeparam name="TEvent">Event type</typeparam>
[PublicAPI]
protected void OnAsync<TEvent>(BuildFilter<TEvent, T> getFilter, BuildUpdateAsync<TEvent, T> getUpdate) where TEvent : class
=> On<TEvent>(b => b.UpdateOne.Filter(getFilter).UpdateFromContext(getUpdate));
Expand Down Expand Up @@ -126,6 +164,12 @@ public override async ValueTask<EventHandlingStatus> HandleEvent(IMessageConsume
return EventHandlingStatus.Success;
}

/// <summary>
/// Override this method to provide custom update logic for events that don't have registered handlers
/// </summary>
/// <param name="evt">The event object</param>
/// <param name="position">The stream position</param>
/// <returns>The projection operation to execute</returns>
[PublicAPI]
protected virtual ValueTask<MongoProjectOperation<T>> GetUpdate(object evt, ulong? position) => NoOp;

Expand All @@ -142,4 +186,5 @@ public delegate ValueTask<MongoProjectOperation<T>> ProjectTypedEvent<T, TEvent>

public record MongoProjectOperation<T>(Func<IMongoCollection<T>, CancellationToken, Task> Execute);

public delegate ValueTask<WriteModel<T>> BuildWriteModel<T, TEvent>(MessageConsumeContext<TEvent> context) where T : ProjectedDocument where TEvent : class;
public delegate ValueTask<WriteModel<T>> BuildWriteModel<T, TEvent>(MessageConsumeContext<TEvent> context)
where T : ProjectedDocument where TEvent : class;
3 changes: 2 additions & 1 deletion src/Mongo/src/Eventuous.Projections.MongoDB/Options.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright (C) Eventuous HQ OÜ.All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous.Projections.MongoDB;

public static class Options<TOptions> where TOptions: new() {
static class Options<TOptions> where TOptions: new() {
public static TOptions New(Action<TOptions>? configure) {
var options = new TOptions();
configure?.Invoke(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,21 @@ public async Task ShouldProjectImported(MongoProjectionOptions<BookingDocument>?

var second = await Act(projectionFixture, stream, payment);

await projectionFixture.DisposeAsync();

expected = expected with {
PaidAmount = payment.AmountPaid,
Position = second.Append.GlobalPosition,
StreamPosition = (ulong)second.Append.NextExpectedVersion
};

second.Doc.Should().BeEquivalentTo(expected);

var cancellation = new BookingCancelled();

var third = await Act(projectionFixture, stream, cancellation);

await projectionFixture.DisposeAsync();

third.Doc.Should().BeNull();
}

static async Task<(AppendEventsResult Append, BookingDocument? Doc)> Act<T>(ProjectionTestBase<SutProjection> f, StreamName stream, T evt) where T : class {
Expand Down Expand Up @@ -90,6 +96,8 @@ public SutProjection(IMongoDatabase database) : base(database) {
.DefaultId()
.Update((evt, update) => update.Set(x => x.PaidAmount, evt.AmountPaid))
);

On<BookingCancelled>(b => b.DeleteOne.DefaultId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public async Task ShouldProjectImported(CancellationToken cancellationToken) {
}

public class SutProjection : MongoProjector<BookingDocument> {
public SutProjection(IMongoDatabase database)
: base(database) {
public SutProjection(IMongoDatabase database) : base(database) {
On<BookingImported>(
stream => stream.GetId(),
(ctx, update) => update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,25 @@
using Eventuous.Tests.Projections.MongoDB.Fixtures;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using static Microsoft.Extensions.Hosting.Host;

namespace Eventuous.Tests.Projections.MongoDB;

public abstract class ProjectionTestBase {
readonly string _id;
protected IHost Host = null!;
readonly IHostBuilder _builder;
readonly string _subscriptionId;
readonly IHostBuilder _builder;

protected ProjectionTestBase(string id) {
_id = id;
_builder = Microsoft.Extensions.Hosting.Host.CreateDefaultBuilder().ConfigureLogging(cfg => cfg.ForTests());
protected IHost Host = null!;

protected ProjectionTestBase(string subscriptionId) {
_subscriptionId = subscriptionId;
_builder = CreateDefaultBuilder().ConfigureLogging(cfg => cfg.ForTests());
}

protected abstract void ConfigureServices(IServiceCollection services, string id);
protected abstract void ConfigureServices(IServiceCollection services, string subscriptionId);

public async Task InitializeAsync() {
_builder.ConfigureServices(collection => ConfigureServices(collection, _id));
_builder.ConfigureServices(collection => ConfigureServices(collection, _subscriptionId));
Host = _builder.Build();
Host.Services.AddEventuousLogs();
await Host.StartAsync();
Expand All @@ -31,17 +33,17 @@ public async Task InitializeAsync() {
public async Task DisposeAsync() => await Host.StopAsync();
}

public class ProjectionTestBase<TProjection>(string id, IntegrationFixture fixture) : ProjectionTestBase(id)
public class ProjectionTestBase<TProjection>(string subscriptionId, IntegrationFixture fixture) : ProjectionTestBase(subscriptionId)
where TProjection : class, IEventHandler {
public readonly IntegrationFixture Fixture = fixture;

protected override void ConfigureServices(IServiceCollection services, string id)
protected override void ConfigureServices(IServiceCollection services, string subscriptionId)
=> services
.AddSingleton(Fixture.Client)
.AddSingleton(Fixture.Mongo)
.AddCheckpointStore<MongoCheckpointStore>()
.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
id,
subscriptionId,
builder => builder.AddEventHandler<TProjection>()
);

Expand Down
Loading