Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/AppCoreNet.EventStore.Abstractions/StreamPosition.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Licensed under the MIT license.
// Licensed under the MIT license.
// Copyright (c) The AppCore .NET project.

using System;
Expand All @@ -18,12 +18,12 @@ namespace AppCoreNet.EventStore;
/// <summary>
/// Specifies to read from the start of the stream.
/// </summary>
public static readonly StreamPosition Start = new (StartValue);
public static readonly StreamPosition Start = new(StartValue);

/// <summary>
/// Specifies to read from the end of the stream.
/// </summary>
public static readonly StreamPosition End = new (EndValue);
public static readonly StreamPosition End = new(EndValue);

/// <summary>
/// Gets the value.
Expand Down Expand Up @@ -102,6 +102,18 @@ public string ToString(string? format, IFormatProvider formatProvider)
public static bool operator !=(StreamPosition left, StreamPosition right)
=> !left.Equals(right);

/// <summary>
/// If the position is a specific value then adds the supplied amount to the value.
/// Otherwise if the position is a special value, then returns the existing special value.
/// </summary>
/// <param name="left">The position to which the amount should be added.</param>
/// <param name="right">The amount to add.</param>
/// <returns>A new position containing the updated value.</returns>
public static StreamPosition operator +(StreamPosition left, long right)
{
return left.Value < 0 ? left : new StreamPosition(left.Value + right);
}

/// <summary>
/// Implicitly converts a <c>long</c> to an instance of <see cref="StreamPosition"/>.
/// </summary>
Expand Down
13 changes: 9 additions & 4 deletions src/AppCoreNet.EventStore/Subscriptions/SubscriptionService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Licensed under the MIT license.
// Licensed under the MIT license.
// Copyright (c) The AppCore .NET project.

using System;
Expand All @@ -8,6 +8,7 @@
using AppCoreNet.Diagnostics;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace AppCoreNet.EventStore.Subscriptions;
Expand All @@ -20,6 +21,7 @@ public sealed class SubscriptionService : BackgroundService
private readonly SubscriptionManager _subscriptionManager;
private readonly IServiceScopeFactory _scopeFactory;
private readonly IOptionsMonitor<SubscriptionOptions> _optionsMonitor;
private readonly ILogger<SubscriptionService> _logger;

/// <summary>
/// Initializes a new instance of the <see cref="SubscriptionService"/> class.
Expand All @@ -30,7 +32,8 @@ public sealed class SubscriptionService : BackgroundService
public SubscriptionService(
SubscriptionManager subscriptionManager,
IServiceScopeFactory scopeFactory,
IOptionsMonitor<SubscriptionOptions> optionsMonitor)
IOptionsMonitor<SubscriptionOptions> optionsMonitor,
ILogger<SubscriptionService> logger)
{
Ensure.Arg.NotNull(subscriptionManager);
Ensure.Arg.NotNull(scopeFactory);
Expand All @@ -39,6 +42,7 @@ public SubscriptionService(
_subscriptionManager = subscriptionManager;
_scopeFactory = scopeFactory;
_optionsMonitor = optionsMonitor;
_logger = logger;
}

/// <inheritdoc />
Expand Down Expand Up @@ -106,7 +110,7 @@ private async Task ProcessAsync(
IReadOnlyCollection<EventEnvelope> events =
await store.ReadAsync(
watchResult.StreamId,
watchResult.Position,
watchResult.Position + 1,
maxCount: options.BatchSize,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
Expand All @@ -127,8 +131,9 @@ await listener.HandleAsync(watchResult.SubscriptionId, @event, cancellationToken
? @event.Metadata.Sequence
: @event.Metadata.Index;
}
catch
catch (Exception ex)
{
_logger.LogError(ex, "Error processing event. This action will be retried.");
break;
}
}
Expand Down