Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
dotnet: [ 'net6.0', 'net7.0' ]
env:
NUGET_PACKAGES: ${{ github.workspace }}/.nuget/packages
# TC_CLOUD_TOKEN: ${{ secrets.TC_TOKEN }}
TC_CLOUD_TOKEN: ${{ secrets.TC_TOKEN }}
steps:
-
name: Checkout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,22 @@ public sealed class CheckpointCommitHandler : IAsyncDisposable {

internal record CommitEvent(string Id, CommitPosition CommitPosition, CommitPosition? FirstPending);

public CheckpointCommitHandler(string subscriptionId, ICheckpointStore checkpointStore, TimeSpan delay, int batchSize = 1, ILoggerFactory? loggerFactory = null)
public CheckpointCommitHandler(
string subscriptionId,
ICheckpointStore checkpointStore,
TimeSpan delay,
int batchSize = 1,
ILoggerFactory? loggerFactory = null
)
: this(subscriptionId, checkpointStore.StoreCheckpoint, delay, batchSize, loggerFactory) { }

public CheckpointCommitHandler(string subscriptionId, CommitCheckpoint commitCheckpoint, TimeSpan delay, int batchSize = 1, ILoggerFactory? loggerFactory = null) {
public CheckpointCommitHandler(
string subscriptionId,
CommitCheckpoint commitCheckpoint,
TimeSpan delay,
int batchSize = 1,
ILoggerFactory? loggerFactory = null
) {
_subscriptionId = subscriptionId;
_commitCheckpoint = commitCheckpoint;
_loggerFactory = loggerFactory;
Expand Down Expand Up @@ -63,6 +75,8 @@ public CheckpointCommitHandler(string subscriptionId, CommitCheckpoint commitChe

_worker = new ChannelWorker<CommitPosition>(channel, Process, true);

return;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
ValueTask Process(CommitPosition position, CancellationToken cancellationToken) {
position.LogContext.PositionReceived(position);
Expand All @@ -73,9 +87,10 @@ ValueTask Process(CommitPosition position, CancellationToken cancellationToken)

[MethodImpl(MethodImplOptions.AggressiveInlining)]
CommitPosition AddBatchAndGetLast(IList<CommitPosition> list) {
_semaphore.Wait();
_positions.UnionWith(list);
var next = GetCommitPosition(false);

_semaphore.Release();
return next;
}
}
Expand Down Expand Up @@ -115,8 +130,13 @@ CommitPosition GetCommitPosition(bool force) {
return _positions.FirstBeforeGap();
}

readonly SemaphoreSlim _semaphore = new(1);

async Task CommitInternal(CommitPosition position, bool force, CancellationToken cancellationToken) {
if (_semaphore.CurrentCount == 0) return;
try {
await _semaphore.WaitAsync(cancellationToken).NoContext();

if (_lastCommit == position && !force) {
Log.CheckpointAlreadyCommitted(_subscriptionId, position);

Expand All @@ -132,6 +152,8 @@ async Task CommitInternal(CommitPosition position, bool force, CancellationToken
_positions.RemoveWhere(x => x.Sequence <= position.Sequence);
} catch (Exception e) {
position.LogContext.UnableToCommitPosition(position, e);
} finally {
_semaphore.Release();
}
}

Expand All @@ -153,12 +175,16 @@ await _worker.Stop(
}
}

public record struct CommitPosition(ulong Position, ulong Sequence, DateTime Timestamp) {
public readonly record struct CommitPosition(ulong Position, ulong Sequence, DateTime Timestamp) {
public bool Valid { get; private init; } = true;

public LogContext LogContext { get; init; }

public static readonly CommitPosition None = new(0, 0, DateTime.MinValue) { Valid = false };

public bool Equals(CommitPosition other) => Valid == other.Valid && Position == other.Position && Sequence == other.Sequence;

public override int GetHashCode() => HashCode.Combine(Valid, Position, Sequence);
}

public delegate ValueTask<Checkpoint> CommitCheckpoint(Checkpoint checkpoint, bool force, CancellationToken cancellationToken);
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ static ConsumePipe ConfigurePipe(ConsumePipe pipe, int concurrencyLimit)
CheckpointCommitHandler CheckpointCommitHandler { get; } = new(
options.SubscriptionId,
checkpointStore,
TimeSpan.FromMilliseconds(options.CommitDelayMs),
options.BatchSize,
TimeSpan.FromMilliseconds(options.CheckpointCommitDelayMs),
options.CheckpointCommitBatchSize,
loggerFactory
);
ICheckpointStore CheckpointStore { get; } = Ensure.NotNull(checkpointStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static void MessageReceived(this LogContext log, IMessageConsumeContext c
);

public static void MessageHandled(this LogContext log, string handlerType, IBaseConsumeContext context)
=> log.DebugLog?.Log(
=> log.TraceLog?.Log(
"{Handler} handled {MessageType} {Stream}:{Position} seq {Sequence}",
handlerType,
context.MessageType,
Expand All @@ -28,7 +28,7 @@ public static void MessageHandled(this LogContext log, string handlerType, IBase
);

public static void MessageIgnored(this LogContext log, string handlerType, IBaseConsumeContext context)
=> log.DebugLog?.Log(
=> log.TraceLog?.Log(
"{Handler} ignored {MessageType} {Stream}:{Position} seq {Sequence}",
handlerType,
context.MessageType,
Expand Down
4 changes: 2 additions & 2 deletions src/Core/src/Eventuous.Subscriptions/SubscriptionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ public abstract record SubscriptionOptions {
}

public abstract record SubscriptionWithCheckpointOptions : SubscriptionOptions {
public int BatchSize { get; set; } = 100;
public int CommitDelayMs { get; set; } = 5000;
public int CheckpointCommitBatchSize { get; set; } = 100;
public int CheckpointCommitDelayMs { get; set; } = 5000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
event: {
roomId: Guid_2,
price: 100,
checkIn: 2023-10-02,
checkOut: 2023-10-03
checkIn: 2023-10-01,
checkOut: 2023-10-02
},
eventType: V1.BookingImported
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
event: {
roomId: Guid_2,
price: 100,
checkIn: 2023-10-02,
checkOut: 2023-10-03
checkIn: 2023-10-01,
checkOut: 2023-10-02
},
eventType: V1.BookingImported
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
event: {
roomId: Guid_2,
price: 100,
checkIn: 2023-10-02,
checkOut: 2023-10-03
checkIn: 2023-10-01,
checkOut: 2023-10-02
},
eventType: V1.BookingImported
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
event: {
roomId: Guid_2,
price: 100,
checkIn: 2023-10-02,
checkOut: 2023-10-03
checkIn: 2023-10-01,
checkOut: 2023-10-02
},
eventType: V1.BookingImported
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
{
event: {
roomId: Guid_2,
checkIn: 2023-10-02,
checkOut: 2023-10-03,
checkIn: 2023-10-01,
checkOut: 2023-10-02,
price: 100,
guestId: test guest
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public Task<StreamEvent[]> ReadStream<T>(string id)
=> Resolve<IEventStore>().ReadEvents(StreamName.For<T>(id), StreamReadPosition.Start, 100, default);

internal BookRoom GetBookRoom() {
var date = LocalDate.FromDateTime(DateTime.Now);
var now = new DateTime(2023, 10, 1);
var date = LocalDate.FromDateTime(now);

return new(_fixture.Create<string>(), _fixture.Create<string>(), date, date.PlusDays(1), 100, "guest");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using Eventuous.Diagnostics.Logging;
using Eventuous.SqlServer.Subscriptions;
using Eventuous.Subscriptions.Filters;
using Eventuous.Sut.Subs;
using Eventuous.Tests.SqlServer.Fixtures;

namespace Eventuous.Tests.SqlServer.Checkpointing;

public class CheckpointTest : IClassFixture<IntegrationFixture>, IDisposable {
readonly IDisposable _listener;
readonly IntegrationFixture _fixture;
readonly ILoggerFactory _loggerFactory;

public CheckpointTest(IntegrationFixture fixture, ITestOutputHelper output) {
_fixture = fixture;
_loggerFactory = new LoggerFactory().AddXunit(output, LogLevel.Debug);
_listener = new LoggingEventListener(_loggerFactory);
}

[Fact]
public async Task EmitMassiveNumberOfEventsAndEnsureCheckpointingWorks() {
TypeMap.RegisterKnownEventTypes();

var aggregateStore = new AggregateStore(_fixture.EventStore);

var checkpointStore = new SqlServerCheckpointStore(
_fixture.GetConnection,
new SqlServerCheckpointStoreOptions {
Schema = _fixture.SchemaName
}
);
var service = new TestCommandService(aggregateStore);
var pipe = new ConsumePipe();
var handler = new TestHandler();
pipe.AddDefaultConsumer(handler);

var sub = new SqlServerAllStreamSubscription(
_fixture.GetConnection,
new SqlServerAllStreamSubscriptionOptions {
Schema = _fixture.SchemaName,
SubscriptionId = "TestSubscription",
CheckpointCommitBatchSize = 1000,
},
checkpointStore,
pipe,
_loggerFactory
);
await sub.SubscribeWithLog(_loggerFactory.CreateLogger(sub.SubscriptionId));
var accounts = Enumerable.Range(0, 100000).Select(n => new TestAccount($"user{n:D4}")).ToList();
await service.Handle(new InjectTestAccounts(accounts), default);

while (handler.HandledCount < 100000) {
await Task.Delay(100);
}

await sub.Unsubscribe(id => { }, default);

var checkpoint = await checkpointStore.GetLastCheckpoint(sub.SubscriptionId, default);
checkpoint.Position.Should().Be(99999);
}

public void Dispose() => _listener.Dispose();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace Eventuous.Tests.SqlServer.Checkpointing;

public record TestAccount(string UserName);

public record InjectTestAccounts(IList<TestAccount> Accounts);

[EventType("V1.TestAccountInserted")]
public record TestAccountInserted(TestAccount Account);

public class TestAccounts : Aggregate<TestAccountsState> {
public void InjectAccounts(IList<TestAccount> accounts) {
foreach (var insertion in accounts) {
Apply(new TestAccountInserted(insertion));
}
}
}

public record TestAccountsState : State<TestAccountsState>;

public record TestAccountsId() : Id("$$Singleton$$") {
public static readonly TestAccountsId Instance = new();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Eventuous.Tests.SqlServer.Checkpointing;

class TestCommandService : CommandService<TestAccounts, TestAccountsState, TestAccountsId> {
public TestCommandService(IAggregateStore store)
: base(store) {
On<InjectTestAccounts>()
.InState(ExpectedState.Any)
.GetId(_ => TestAccountsId.Instance)
.Act((accounts, cmd) => accounts.InjectAccounts(cmd.Accounts));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Eventuous.Subscriptions;
using Eventuous.Subscriptions.Context;

namespace Eventuous.Tests.SqlServer.Checkpointing;

public class TestHandler : IEventHandler {
public string DiagnosticName => nameof(TestHandler);

public ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context) {
_count++;

if (_count % 1000 == 0) {
context.LogContext.DebugLog?.Log("Handled {Count} events", _count);
}

return ValueTask.FromResult(EventHandlingStatus.Success);
}

int _count;

public int HandledCount => _count;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public async Task InitializeAsync() {
await schema.CreateSchema(GetConnection);
DefaultEventSerializer.SetDefaultSerializer(Serializer);
EventStore = new SqlServerStore(GetConnection, new SqlServerStoreOptions(SchemaName), Serializer);
new AggregateStore(EventStore);
ActivitySource.AddActivityListener(_listener);

return;
Expand Down
5 changes: 1 addition & 4 deletions test/Eventuous.TestHelpers/EventStoreDbContainerBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
// Copyright (C) Ubiquitous AS.All rights reserved
// Licensed under the Apache License, Version 2.0.

using System.Runtime.InteropServices;
using Docker.DotNet.Models;
using DotNet.Testcontainers.Builders;
Expand All @@ -11,7 +8,7 @@ namespace Eventuous.TestHelpers;

public sealed class EventStoreDbContainerBuilder : ContainerBuilder<EventStoreDbContainerBuilder, EventStoreDbContainer, EventStoreDbConfiguration> {
public static readonly string ContainerTag = RuntimeInformation.ProcessArchitecture switch {
Architecture.Arm64 => "22.10.2-alpha-arm64v8",
// Architecture.Arm64 => "22.10.2-alpha-arm64v8",
_ => "22.10.2-buster-slim"
};

Expand Down