Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@
namespace Eventuous;

public class OptimisticConcurrencyException : Exception {
public OptimisticConcurrencyException(Type aggregateType, StreamName streamName, Exception inner)
: base($"Update of {aggregateType.Name} failed due to the wrong version in stream {streamName}. {inner.Message} {inner.InnerException?.Message}", inner) { }
public OptimisticConcurrencyException(Type aggregateType, StreamName streamName, Exception? inner)
: base($"Update of {aggregateType.Name} failed due to the wrong version in stream {streamName}", inner) { }

public OptimisticConcurrencyException(StreamName streamName, Exception inner)
: base($"Update failed due to the wrong version in stream {streamName}. {inner.Message} {inner.InnerException?.Message}", inner) { }
public OptimisticConcurrencyException(StreamName streamName, Exception? inner)
: base($"Update failed due to the wrong version in stream {streamName}", inner) { }
}

public class OptimisticConcurrencyException<T>(StreamName streamName, Exception inner) : OptimisticConcurrencyException(typeof(T), streamName, inner)
public class OptimisticConcurrencyException<T>(StreamName streamName, Exception? inner) : OptimisticConcurrencyException(typeof(T), streamName, inner)
where T : Aggregate;

public class AggregateNotFoundException : Exception {
public AggregateNotFoundException(Type aggregateType, StreamName streamName, Exception inner)
: base($"Aggregate {aggregateType.Name} with not found in stream {streamName}. {inner.Message} {inner.InnerException?.Message}", inner) { }
public AggregateNotFoundException(Type aggregateType, StreamName streamName, Exception? inner)
: base($"Aggregate {aggregateType.Name} with not found in stream {streamName}", inner) { }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sentence does not make sense, maybe remove 'with', or make it something like 'No events for Aggregate {name} found in {streamName}' ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - I didn't catch this while going through this. I might go through the error messages and make sure they are sensible and makes sense sometime during the weekend

}

public class AggregateNotFoundException<T> : AggregateNotFoundException where T : Aggregate {
public AggregateNotFoundException(StreamName streamName, Exception inner) : base(typeof(T), streamName, inner) { }
public AggregateNotFoundException(StreamName streamName, Exception? inner) : base(typeof(T), streamName, inner) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ CancellationToken cancellationToken
} catch (OptimisticConcurrencyException e) {
Log.UnableToStoreAggregate<T>(streamName, e);

throw new OptimisticConcurrencyException<T>(streamName, e.InnerException!);
throw e.InnerException is null
? new OptimisticConcurrencyException<T>(streamName, e)
: new OptimisticConcurrencyException<T>(streamName, e.InnerException);
}
}

Expand Down
30 changes: 19 additions & 11 deletions src/Core/src/Eventuous.Subscriptions/Consumers/DefaultConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,30 @@ namespace Eventuous.Subscriptions.Consumers;
// ReSharper disable once ParameterTypeCanBeEnumerable.Local
public class DefaultConsumer(IEventHandler[] eventHandlers) : IMessageConsumer {
public async ValueTask Consume(IMessageConsumeContext context) {
try {
if (context.Message == null) {
context.Ignore<DefaultConsumer>();
var scope = new Dictionary<string, object> {
{"SubscriptionId", context.SubscriptionId},
{"Stream", context.Stream},
{"MessageType", context.MessageType},
};

return;
using (context.LogContext.Logger.BeginScope(scope)) {
try {
if (context.Message == null) {
context.Ignore<DefaultConsumer>();

return;
}

var typedContext = context.ConvertToGeneric();
var tasks = eventHandlers.Select(handler => Handle(typedContext, handler));
await tasks.WhenAll().NoContext();
} catch (Exception e) {
context.Nack<DefaultConsumer>(e);
}

var typedContext = context.ConvertToGeneric();
var tasks = eventHandlers.Select(handler => Handle(typedContext, handler));
await tasks.WhenAll().NoContext();
} catch (Exception e) {
context.Nack<DefaultConsumer>(e);
return;
}

return;

async ValueTask Handle(IMessageConsumeContext typedContext, IEventHandler handler) {
try {
var status = await handler.HandleEvent(typedContext).NoContext();
Expand Down
83 changes: 46 additions & 37 deletions src/Core/src/Eventuous.Subscriptions/EventSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,59 +69,68 @@ public async ValueTask Unsubscribe(OnUnsubscribed onUnsubscribed, CancellationTo

// ReSharper disable once CognitiveComplexity
protected async ValueTask Handler(IMessageConsumeContext context) {
var activity = EventuousDiagnostics.Enabled
? SubscriptionActivity.Create(
$"{Constants.Components.Subscription}.{SubscriptionId}/{context.MessageType}",
ActivityKind.Internal,
context,
EventuousDiagnostics.Tags
)
: null;
var scope = new Dictionary<string, object> {
{"SubscriptionId", SubscriptionId},
{"Stream", context.Stream},
{"MessageType", context.MessageType},
};

// ReSharper disable once NullCoalescingConditionIsAlwaysNotNullAccordingToAPIContract
Logger.Current ??= Log;
var isAsync = context is AsyncConsumeContext;
if (!isAsync) activity?.Start();

Log.MessageReceived(context);
using (Log.Logger.BeginScope(scope)) {
var activity = EventuousDiagnostics.Enabled
? SubscriptionActivity.Create(
$"{Constants.Components.Subscription}.{SubscriptionId}/{context.MessageType}",
ActivityKind.Internal,
context,
EventuousDiagnostics.Tags
)
: null;

try {
if (context.Message != null) {
if (activity != null) {
context.ParentContext = activity.Context;
var isAsync = context is AsyncConsumeContext;
if (!isAsync) activity?.Start();

if (isAsync) { context.Items.AddItem(ContextItemKeys.Activity, activity); }
}
Log.MessageReceived(context);

await Pipe.Send(context).NoContext();
}
else {
context.Ignore(SubscriptionId);
try {
if (context.Message != null) {
if (activity != null) {
context.ParentContext = activity.Context;

if (isAsync) { context.Items.AddItem(ContextItemKeys.Activity, activity); }
}

if (isAsync) {
var asyncContext = context as AsyncConsumeContext;
await asyncContext!.Acknowledge().NoContext();
await Pipe.Send(context).NoContext();
}
}
else {
context.Ignore(SubscriptionId);

if (context.WasIgnored() && activity != null) activity.ActivityTraceFlags = ActivityTraceFlags.None;
} catch (OperationCanceledException e) when (Stopping.IsCancellationRequested) {
Log.DebugLog?.Log("Message ignored because subscription is stopping: {Message}", e.Message);
} catch (Exception e) { context.Nack(SubscriptionId, e); }
if (isAsync) {
var asyncContext = context as AsyncConsumeContext;
await asyncContext!.Acknowledge().NoContext();
}
}

if (context.WasIgnored() && activity != null) activity.ActivityTraceFlags = ActivityTraceFlags.None;
} catch (OperationCanceledException e) when (Stopping.IsCancellationRequested) {
Log.DebugLog?.Log("Message ignored because subscription is stopping: {Message}", e.Message);
} catch (Exception e) { context.Nack(SubscriptionId, e); }

if (context.HasFailed()) {
if (activity != null) activity.ActivityTraceFlags = ActivityTraceFlags.Recorded;
if (context.HasFailed()) {
if (activity != null) activity.ActivityTraceFlags = ActivityTraceFlags.Recorded;

var exception = context.HandlingResults.GetException();
var exception = context.HandlingResults.GetException();

if (Options.ThrowOnError) {
activity?.Dispose();
if (Options.ThrowOnError) {
activity?.Dispose();

throw new SubscriptionException(context.Stream, context.MessageType, context.Message, exception ?? new InvalidOperationException());
throw new SubscriptionException(context.Stream, context.MessageType, context.Message, exception ?? new InvalidOperationException());
}
}
}

if (!isAsync) activity?.Dispose();
if (!isAsync) activity?.Dispose();
}
}

protected object? DeserializeData(string eventContentType, string eventType, ReadOnlyMemory<byte> data, string stream, ulong position = 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ namespace Eventuous.Subscriptions.Logging;

public class InternalLogger(ILogger logger, LogLevel logLevel, string subscriptionId) {
#pragma warning disable CA2254
public void Log(string message, params object[] args) => logger.Log(logLevel, GetMessage(message), args);
public void Log(string message, params object[] args) =>
logger.Log(logLevel, GetMessage(message), args);

public void Log(Exception? exception, string message, params object[] args) => logger.Log(logLevel, exception, GetMessage(message), args);
public void Log(Exception? exception, string message, params object[] args) =>
logger.Log(logLevel, exception, GetMessage(message), args);
#pragma warning restore CA2254

string GetMessage(string message) => $"[{subscriptionId}] {message}";
Expand Down
2 changes: 1 addition & 1 deletion src/Core/src/Eventuous.Subscriptions/Logging/Logger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ public LogContext(string subscriptionId, ILoggerFactory loggerFactory) {
ErrorLog = GetLogger(LogLevel.Error);

InternalLogger? GetLogger(LogLevel logLevel)
=> Logger.IsEnabled(logLevel) ? new InternalLogger(Logger, logLevel, SubscriptionId) : null;
=> Logger.IsEnabled(logLevel) ? new InternalLogger(Logger, logLevel, SubscriptionId) : null;;
}
}