Skip to content

SQL Server Subscription skips an event on non-initial startup #291

@PehrGit

Description

@PehrGit

Describe the bug
When we have some events in the database and so the checkpoint for that subscription is not NULL, if we start the application and add 2 events, the first of those is not retrieved and thus not processed by the projections. For our case specifically, we have an Add and an Update event, which are both produced by a single command. When we start the application and handle this command, the Add is not processed and then the Update tries to edit a record which does not exist.

To Reproduce

  1. Have SQL Server as the event store.
  2. Have SQL Server as the projection db.
  3. Start application
  4. Produce events 1 and 2
  5. Stop application
  6. Start application
  7. Produce events 3 and 4
  8. See error

Expected behavior
Events 3 and 4 are sequentially retrieved from the db and processed by the projections.

Screenshots
N/A

Desktop (please complete the following information):

  • OS: Windows
  • Version 0.15.0-beta.6

Additional context
We believe we have found the problem and will create a PR shortly.

In SqlServerSubscriptionBase we see the following Subscribe

protected override async ValueTask Subscribe(CancellationToken cancellationToken) {
await BeforeSubscribe(cancellationToken).NoContext();
var (_, position) = await GetCheckpoint(cancellationToken).NoContext();
_runner = Task.Run(() => PollingQuery(position + 1, _cts.Token), _cts.Token);
}

It gets the current checkpoint, adds 1 to it and calls the PollingQuery

async Task PollingQuery(ulong? position, CancellationToken cancellationToken) {
var start = position.HasValue ? (long)position : -1;
while (!cancellationToken.IsCancellationRequested) {
try {
await using var connection = await OpenConnection(cancellationToken).NoContext();
await using var cmd = PrepareCommand(connection, start);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).NoContext();
var result = reader.ReadEvents(cancellationToken);

Which receives the <checkpoint + 1> value as position and then calls the PrepareCommand in the
SqlServerAllStreamSubscription (the same happens in SqlServerStreamSubscription) :

protected override SqlCommand PrepareCommand(SqlConnection connection, long start)
=> connection.GetStoredProcCommand(Schema.ReadAllForwards)
.Add("@from_position", SqlDbType.BigInt, start + 1)
.Add("@count", SqlDbType.Int, Options.MaxPageSize);

This takes the <checkpoint + 1> value, adds 1 to it again, and passes <checkpoint + 2> to the stored procedure ReadAllForwards

CREATE OR ALTER PROCEDURE __schema__.read_all_forwards
@from_position bigint,
@count int
AS
BEGIN
SELECT TOP (@count)
MessageId, MessageType, StreamPosition, GlobalPosition,
JsonData, JsonMetadata, Created, StreamName
FROM __schema__.Messages
INNER JOIN __schema__.Streams ON Messages.StreamId = Streams.StreamId
WHERE Messages.GlobalPosition >= @from_position
ORDER BY Messages.GlobalPosition
END

Where all events are retrieved whose GlobalPosition value is greater than the incoming <checkpoint + 2> value.

We believe that adding 1 to the position makes sense because you only want to get events which are ahead of the checkpoint. But it seems doing a +1 twice is causing it to retrieve events which are ahead of the checkpoint by 2 or more.

We see that the same code flow is present in the Postgres implementation so this might suffer from the same problem:

_runner = Task.Run(() => PollingQuery(position + 1, _cts.Token), _cts.Token);

.Add("_from_position", NpgsqlDbType.Bigint, start + 1)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions