Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ csharp_empty_block_style = together
csharp_wrap_parameters_style = chop_if_long
csharp_max_formal_parameters_on_line = 5
csharp_place_constructor_initializer_on_same_line = false
csharp_place_simple_initializer_on_single_line = true
csharp_max_initializer_elements_on_line = 0

resharper_max_attribute_length_for_same_line = 80
resharper_nested_ternary_style = expanded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ namespace Eventuous.Diagnostics;
using static Tracing.Constants.Components;

public sealed class CommandServiceMetrics : IWithCustomTags, IDisposable {
const string Category = "application";

public static readonly string MeterName = EventuousDiagnostics.GetMeterName(Category);
public readonly static string MeterName = EventuousDiagnostics.GetMeterName(Category);

public const string ListenerName = $"{DiagnosticName.BaseName}.{Category}";

public const string AppServiceTag = "command-service";
public const string CommandTag = "command-type";
const string Category = "application";
const string AppServiceTag = "command-service";
const string CommandTag = "command-type";

readonly Meter _meter;
readonly MetricsListener<CommandServiceMetricsContext> _listener;
Expand All @@ -43,8 +42,7 @@ public void Dispose() {
_meter.Dispose();
}

public void SetCustomTags(TagList customTags)
=> _customTags = customTags.ToArray();
public void SetCustomTags(TagList customTags) => _customTags = customTags.ToArray();
}

record CommandServiceMetricsContext(string ServiceName, string CommandName);
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ namespace Eventuous.Diagnostics;
public sealed class PersistenceMetrics : IWithCustomTags, IDisposable {
const string Category = "persistence";

public static readonly string MeterName = EventuousDiagnostics.GetMeterName(Category);
readonly static string MeterName = EventuousDiagnostics.GetMeterName(Category);

public const string ListenerName = $"{DiagnosticName.BaseName}.{Category}";
public const string OperationTag = "operation";

const string OperationTag = "operation";

readonly Meter _meter;
readonly MetricsListener<EventStoreMetricsContext> _listener;
Expand All @@ -25,17 +26,9 @@ public sealed class PersistenceMetrics : IWithCustomTags, IDisposable {
public PersistenceMetrics() {
_meter = EventuousDiagnostics.GetMeter(MeterName);

var duration = _meter.CreateHistogram<double>(
EventStore,
"ms",
"Event store operation duration, milliseconds"
);
var duration = _meter.CreateHistogram<double>(EventStore, "ms", "Event store operation duration, milliseconds");

var errorCount = _meter.CreateCounter<long>(
$"{EventStore}.errors",
"errors",
"Number of failed event store operations"
);
var errorCount = _meter.CreateCounter<long>($"{EventStore}.errors", "errors", "Number of failed event store operations");

_listener = new MetricsListener<EventStoreMetricsContext>(ListenerName, duration, errorCount, GetTags);

Expand All @@ -49,8 +42,7 @@ public void Dispose() {
_meter.Dispose();
}

public void SetCustomTags(TagList customTags)
=> _customTags = customTags.ToArray();
public void SetCustomTags(TagList customTags) => _customTags = customTags.ToArray();
}

record EventStoreMetricsContext(string Operation);
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ TagList GetTags(SubscriptionMetricsContext ctx) {
try {
var t = getEndOfStream(cts.Token);

var endOfStream = t.IsCompleted ? t.Result : t.NoContext().GetAwaiter().GetResult();
var endOfStream = t.IsCompletedSuccessfully ? t.Result : t.NoContext().GetAwaiter().GetResult();
streams[endOfStream.SubscriptionId] = endOfStream;
var lastProcessed = _checkpointMetrics.GetLastCommitPosition(endOfStream.SubscriptionId);
return (endOfStream, lastProcessed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,23 @@ protected EventSubscriptionWithCheckpoint(
ConsumePipe consumePipe,
int concurrencyLimit,
ILoggerFactory? loggerFactory
) : base(Ensure.NotNull(options), ConfigurePipe(consumePipe, concurrencyLimit), loggerFactory) {
)
: base(Ensure.NotNull(options), ConfigurePipe(consumePipe, concurrencyLimit), loggerFactory) {
CheckpointStore = Ensure.NotNull(checkpointStore);

CheckpointCommitHandler = new CheckpointCommitHandler(
options.SubscriptionId,
checkpointStore,
10,
loggerFactory
);
CheckpointCommitHandler = new CheckpointCommitHandler(options.SubscriptionId, checkpointStore, 10, loggerFactory);
}

static bool PipelineIsAsync(ConsumePipe pipe)
=> pipe.RegisteredFilters.Any(x => x is AsyncHandlingFilter);
static bool PipelineIsAsync(ConsumePipe pipe) => pipe.RegisteredFilters.Any(x => x is AsyncHandlingFilter);

// It's not ideal, but for now if there's any filter added on top of the default one,
// we won't add the concurrent filter, so it won't clash with any custom setup
static ConsumePipe ConfigurePipe(ConsumePipe pipe, int concurrencyLimit)
=> PipelineIsAsync(pipe)
? pipe
: pipe.AddFilterFirst(new AsyncHandlingFilter((uint)concurrencyLimit));
=> PipelineIsAsync(pipe) ? pipe : pipe.AddFilterFirst(new AsyncHandlingFilter((uint)concurrencyLimit));

protected EventPosition? LastProcessed { get; set; }
protected CheckpointCommitHandler CheckpointCommitHandler { get; }
protected ICheckpointStore CheckpointStore { get; }
EventPosition? LastProcessed { get; set; }
CheckpointCommitHandler CheckpointCommitHandler { get; }
ICheckpointStore CheckpointStore { get; }

protected abstract EventPosition GetPositionFromContext(IMessageConsumeContext context);

Expand All @@ -51,9 +44,9 @@ protected async ValueTask HandleInternal(IMessageConsumeContext context) {
Logger.Current = Log;
var ctx = new AsyncConsumeContext(context, Ack, Nack);
await Handler(ctx).NoContext();
}
catch (Exception e) {
} catch (Exception e) {
context.LogContext.MessageHandlingFailed(Options.SubscriptionId, context, e);

if (Options.ThrowOnError) throw;
}
}
Expand All @@ -65,46 +58,28 @@ ValueTask Ack(IMessageConsumeContext context) {
context.LogContext.TraceLog?.Log("Message {Type} acknowledged at {Position}", context.MessageType, context.GlobalPosition);

return CheckpointCommitHandler.Commit(
new CommitPosition(eventPosition.Position!.Value, context.Sequence, eventPosition.Created)
{ LogContext = context.LogContext },
new CommitPosition(eventPosition.Position!.Value, context.Sequence, eventPosition.Created) { LogContext = context.LogContext },
context.CancellationToken
);
}

ValueTask Nack(IMessageConsumeContext context, Exception exception) {
context.LogContext.WarnLog?.Log(exception, "Message {Type} not acknowledged at {Position}", context.MessageType, context.GlobalPosition);

return Options.ThrowOnError ? throw exception : Ack(context);
}

protected async Task<Checkpoint> GetCheckpoint(CancellationToken cancellationToken) {
if (IsRunning && LastProcessed != null) {
return new Checkpoint(Options.SubscriptionId, LastProcessed.Position);
}
if (IsRunning && LastProcessed != null) { return new Checkpoint(Options.SubscriptionId, LastProcessed.Position); }

Logger.Current = Log;

var checkpoint = await CheckpointStore
.GetLastCheckpoint(Options.SubscriptionId, cancellationToken)
.NoContext();
var checkpoint = await CheckpointStore.GetLastCheckpoint(Options.SubscriptionId, cancellationToken).NoContext();

LastProcessed = new EventPosition(checkpoint.Position, DateTime.Now);

return checkpoint;
}

protected async Task StoreCheckpoint(EventPosition eventPosition, CancellationToken cancellationToken) {
LastProcessed = eventPosition;

Logger.Current = Log;

await CheckpointStore.StoreCheckpoint(
new Checkpoint(SubscriptionId, eventPosition.Position),
true,
cancellationToken
)
.NoContext();
}

protected override ValueTask Finalize(CancellationToken cancellationToken)
=> CheckpointCommitHandler.DisposeAsync();
protected override ValueTask Finalize(CancellationToken cancellationToken) => CheckpointCommitHandler.DisposeAsync();
}
4 changes: 2 additions & 2 deletions src/Mongo/src/Eventuous.Projections.MongoDB/MongoProjector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public override async ValueTask<EventHandlingStatus> HandleEvent(IMessageConsume

var update = updateTask == NoOp
? null
: updateTask.IsCompleted
: updateTask.IsCompletedSuccessfully
? updateTask.Result
: await updateTask.NoContext();

Expand All @@ -118,7 +118,7 @@ public override async ValueTask<EventHandlingStatus> HandleEvent(IMessageConsume

var task = update.Execute(Collection, context.CancellationToken);

if (!task.IsCompleted) await task.NoContext();
if (!task.IsCompletedSuccessfully) await task.NoContext();
return EventHandlingStatus.Success;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Eventuous.Projections.MongoDB;

public partial class MongoOperationBuilder<TEvent, T> where T : ProjectedDocument where TEvent : class {
public class UpdateOneBuilder : UpdateBuilder<UpdateOneBuilder>, IMongoProjectorBuilder {
[PublicAPI]
public UpdateOneBuilder IdFromStream(GetDocumentIdFromStream getId)
=> Id(x => getId(x.Stream));

Expand Down Expand Up @@ -74,6 +75,7 @@ public TBuilder Filter(BuildFilter<TEvent, T> buildFilter) {
return Self;
}

[PublicAPI]
public TBuilder Filter(Func<IMessageConsumeContext<TEvent>, T, bool> filter) {
_filter.Filter(filter);
return Self;
Expand All @@ -84,6 +86,7 @@ public TBuilder UpdateFromContext(BuildUpdateAsync<TEvent, T> buildUpdate) {
return Self;
}

[PublicAPI]
public TBuilder Update(BuildUpdateFromEventAsync<TEvent, T> buildUpdate) {
_buildUpdate = (ctx, update) => buildUpdate(ctx.Message, update);
return Self;
Expand All @@ -99,6 +102,7 @@ public TBuilder Update(BuildUpdateFromEvent<TEvent, T> buildUpdate) {
return Self;
}

[PublicAPI]
public TBuilder Configure(Action<UpdateOptions> configure) {
_configureOptions = configure;
return Self;
Expand Down