Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
41308b3
feat(mssql): Minor SQL changes; no functional changes
nealmummau Sep 22, 2025
0f1e5f6
feat(mssql): Minor SQL changes; no functional changes
nealmummau Sep 22, 2025
fcb0b01
feat(mssql): Minor SQL changes; no functional changes
nealmummau Sep 22, 2025
dedcfeb
feat(mssql): Minor SQL changes; no functional changes
nealmummau Sep 22, 2025
0427f04
feat(mssql): Minor SQL changes; no functional changes
nealmummau Sep 22, 2025
f5cc536
feat(mssql): Minor SQL changes; no functional changes
nealmummau Sep 22, 2025
44d86e7
feat(mssql): Minor SQL changes; no functional changes
nealmummau Sep 22, 2025
9e2b609
feat(mssql): Minor SQL updates
nealmummau Sep 22, 2025
70df88e
fix(mssql): Messages.Created NOT NULL
nmummau Sep 27, 2025
7539557
feat(mssql): json is not null
nealmummau Oct 7, 2025
df2a662
feat(mssql): DATETIME2 is actually a DATETIME2(7) so explicitly say t…
nealmummau Oct 7, 2025
b4fb0c5
feat(mssql): ORDER BY StreamPosition per bito-code-review, and and re…
nealmummau Oct 8, 2025
596563e
feat(postgresql): ORDER BY stream_position per bito-code-review
nealmummau Oct 8, 2025
7b38d67
refactor(mssql): remove square brackets to satisfy 'inconsistent tabl…
nealmummau Oct 8, 2025
2a37166
feat(mssql): append_events improve
nealmummau Oct 8, 2025
f84fbc4
perf(mssql): enable OPTIMIZE_FOR_SEQUENTIAL_KEY on primary keys for S…
nealmummau Oct 8, 2025
1599c02
chore(mssql): enable XACT_ABORT for all stored procedures used by Eve…
nealmummau Oct 8, 2025
6718606
fix(mssql): truncate_stream wasn't properly formatting and and sendin…
nealmummau Oct 9, 2025
1dbaf93
refactor(mssql): use SCOPE_IDENTITY() to get new StreamId rather than…
nealmummau Oct 9, 2025
73c56ce
fix(mssql): correct backwards read bounds check
nealmummau Oct 10, 2025
aec84c8
fix(mssql): moved stream version update inside TRY block
nealmummau Oct 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ begin
m.json_data, m.json_metadata, m.created
from __schema__.messages m
where m.stream_id = _stream_id and m.stream_position >= _from_position
order by m.global_position
order by m.stream_position
limit _count;
end;

Expand Down
23 changes: 12 additions & 11 deletions src/SqlServer/src/Eventuous.SqlServer/Scripts/1_Schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ IF OBJECT_ID('__schema__.Streams', 'U') IS NULL
StreamId INT IDENTITY (1,1) NOT NULL,
StreamName NVARCHAR(850) NOT NULL,
[Version] INT DEFAULT (-1) NOT NULL,
CONSTRAINT PK_Streams PRIMARY KEY CLUSTERED (StreamId),
CONSTRAINT PK_Streams PRIMARY KEY CLUSTERED (StreamId) WITH (OPTIMIZE_FOR_SEQUENTIAL_KEY = ON),
CONSTRAINT UQ_StreamName UNIQUE NONCLUSTERED (StreamName),
CONSTRAINT CK_VersionGteNegativeOne CHECK ([version] >= -1)
CONSTRAINT CK_VersionGteNegativeOne CHECK ([Version] >= -1)
);
END

Expand All @@ -25,14 +25,16 @@ IF OBJECT_ID('__schema__.Messages', 'U') IS NULL
StreamId INT NOT NULL,
StreamPosition INT NOT NULL,
GlobalPosition BIGINT IDENTITY (0,1) NOT NULL,
JsonData NVARCHAR(max) NOT NULL,
JsonMetadata NVARCHAR(max),
Created DATETIME2,
CONSTRAINT PK_Events PRIMARY KEY CLUSTERED (GlobalPosition),
JsonData NVARCHAR(MAX) NOT NULL,
JsonMetadata NVARCHAR(MAX) NOT NULL,
Created DATETIME2(7) NOT NULL,
CONSTRAINT PK_Events PRIMARY KEY CLUSTERED (GlobalPosition) WITH (OPTIMIZE_FOR_SEQUENTIAL_KEY = ON),
CONSTRAINT FK_MessageStreamId FOREIGN KEY (StreamId) REFERENCES __schema__.Streams (StreamId),
CONSTRAINT UQ_StreamIdAndStreamPosition UNIQUE NONCLUSTERED (StreamId, StreamPosition),
CONSTRAINT UQ_StreamIdAndMessageId UNIQUE NONCLUSTERED (StreamId, MessageId),
CONSTRAINT CK_StreamPositionGteZero CHECK (Messages.StreamPosition >= 0),
CONSTRAINT CK_StreamPositionGteZero CHECK (StreamPosition >= 0),
CONSTRAINT CK_JsonDataIsJson CHECK (ISJSON(JsonData) = 1),
CONSTRAINT CK_JsonMetadataIsJson CHECK (ISJSON(JsonMetadata) = 1),
INDEX IDX_EventsStream (StreamId)
);
END
Expand All @@ -42,7 +44,7 @@ IF OBJECT_ID('__schema__.Checkpoints', 'U') IS NULL
CREATE TABLE __schema__.Checkpoints
(
Id NVARCHAR(128) NOT NULL,
Position BIGINT NULL,
Position BIGINT NULL,
CONSTRAINT PK_Checkpoints PRIMARY KEY CLUSTERED (Id),
);
END
Expand All @@ -53,8 +55,7 @@ IF TYPE_ID('__schema__.StreamMessage') IS NULL
(
message_id UNIQUEIDENTIFIER NOT NULL,
message_type NVARCHAR(128) NOT NULL,
json_data NVARCHAR(max) NOT NULL,
json_metadata NVARCHAR(max)
json_data NVARCHAR(MAX) NOT NULL,
json_metadata NVARCHAR(MAX) NOT NULL
)
END

129 changes: 95 additions & 34 deletions src/SqlServer/src/Eventuous.SqlServer/Scripts/2_AppendEvents.sql
Original file line number Diff line number Diff line change
@@ -1,50 +1,111 @@
CREATE OR ALTER PROCEDURE __schema__.append_events
@stream_name VARCHAR(850),
@stream_name NVARCHAR(850),
@expected_version INT,
@created DATETIME2 NULL,
@created DATETIME2(7) NULL,
@messages __schema__.StreamMessage READONLY
AS
BEGIN
DECLARE @current_version INT,
SET NOCOUNT ON;
SET XACT_ABORT ON;

-- Note: This procedure is wrapped in a transaction by the caller. This explains why there is no explicit transaction here within the procedure.

DECLARE
@current_version INT,
@stream_id INT,
@position BIGINT,
@customErrorMessage NVARCHAR(200),
@newMessagesCount INT,
@expected_StreamVersionAfterUpdate INT,
@actual_StreamVersionAfterUpdate INT
@count_messages INT,
@new_version INT;

if @created is null
BEGIN
SET @created = SYSUTCDATETIME()
END
-- capture inserted rows to compute final position
DECLARE @inserted TABLE (
GlobalPosition BIGINT
);

EXEC [__schema__].[check_stream] @stream_name, @expected_version, @current_version = @current_version OUTPUT, @stream_id = @stream_id OUTPUT
SELECT @count_messages = COUNT(1) FROM @messages;

EXEC __schema__.check_stream
@stream_name = @stream_name,
@expected_version = @expected_version,
@current_version = @current_version OUTPUT,
@stream_id = @stream_id OUTPUT;

SET @new_version = @current_version + @count_messages;

BEGIN TRY
INSERT INTO __schema__.Messages (MessageId, MessageType, StreamId, StreamPosition, JsonData, JsonMetadata, Created)
SELECT message_id, message_type, @stream_id, @current_version + (ROW_NUMBER() OVER(ORDER BY (SELECT NULL))), json_data, json_metadata, @created
FROM @messages

/*
If another writer raced us, the unique constraint (StreamId,StreamPosition) will throw here.
Translate to WrongExpectedVersion in the CATCH below.
*/
INSERT INTO __schema__.Messages (
MessageId,
MessageType,
StreamId,
StreamPosition,
JsonData,
JsonMetadata,
Created
)
OUTPUT inserted.GlobalPosition
INTO @inserted (GlobalPosition)
SELECT
message_id,
message_type,
@stream_id,
@current_version + CAST(ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) AS INT),
json_data,
json_metadata,
ISNULL(@created, SYSUTCDATETIME())
FROM @messages;

UPDATE s
SET [Version] = @new_version
FROM __schema__.Streams s
WHERE s.StreamId = @stream_id
AND s.[Version] = @current_version;

IF @@ROWCOUNT = 0
BEGIN
DECLARE @streamUpdateErrorMessage NVARCHAR(4000) = CONCAT(
N'WrongExpectedVersion: concurrent update detected for stream ',
CAST(@stream_id AS NVARCHAR(20))
);
;THROW 50000, @streamUpdateErrorMessage, 1;
END

END TRY
BEGIN CATCH
IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamIdAndStreamPosition', ERROR_MESSAGE())) > 0
DECLARE @errmsg NVARCHAR(2048) = ERROR_MESSAGE();

IF ERROR_NUMBER() IN (
2627, -- Violation of PRIMARY KEY or UNIQUE constraint
2601 -- Cannot insert duplicate key row in object with unique index
)
AND (@errmsg LIKE N'%UQ_StreamIdAndStreamPosition%')
BEGIN
DECLARE @streamIdFromError nvarchar(20) = SUBSTRING(ERROR_MESSAGE(), PATINDEX(N'%[0-9]%,%', ERROR_MESSAGE()), PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE()) - PATINDEX(N'%[0-9]%,%', ERROR_MESSAGE()))
DECLARE @streamPositionFromError nvarchar(20) = SUBSTRING(ERROR_MESSAGE(), (PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE())) + 2, PATINDEX(N'%).', ERROR_MESSAGE()) - (PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE()) + 2))
-- Must BEGIN with "WrongExpectedVersion" for the client detection of OptimisticConcurrencyException
DECLARE @clientMsg NVARCHAR(4000) =
N'WrongExpectedVersion: duplicate append for stream '
+ CAST(@stream_id AS NVARCHAR(20))
+ N' with expected_version=' + CAST(@expected_version AS NVARCHAR(20))
+ N'. SQL: ' + @errmsg;

-- TODO: There are multiple causes of OptimisticConcurrencyExceptions, but current client code is hard-coded to check for 'WrongExpectedVersion' in message and 50000 as error number.
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion, another message has already been written at stream position %s on stream %s.', @streamIdFromError, @streamPositionFromError);
THROW 50000, @customErrorMessage, 1;
END
THROW 50000, @clientMsg, 1;
END;
ELSE
THROW
END CATCH

SELECT TOP 1 @current_version = StreamPosition, @position = GlobalPosition
FROM __schema__.Messages
WHERE StreamId = @stream_id
ORDER BY GlobalPosition DESC

UPDATE __schema__.Streams SET Version = @current_version WHERE StreamId = @stream_id

SELECT @current_version AS current_version, @position AS position
END
BEGIN
;THROW;
END;
END CATCH;

-- final GlobalPosition value to return
SELECT @position = (
SELECT MAX(GlobalPosition)
FROM @inserted
);

SELECT
@new_version current_version,
@position position;
END;
58 changes: 38 additions & 20 deletions src/SqlServer/src/Eventuous.SqlServer/Scripts/3_CheckStream.sql
Original file line number Diff line number Diff line change
@@ -1,43 +1,61 @@
CREATE OR ALTER PROCEDURE __schema__.check_stream @stream_name NVARCHAR(850),
@expected_version int,
@current_version INT OUTPUT,
@stream_id INT OUTPUT
CREATE OR ALTER PROCEDURE __schema__.check_stream
@stream_name NVARCHAR(850),
@expected_version INT,
@current_version INT OUTPUT,
@stream_id INT OUTPUT
AS
BEGIN
DECLARE @customErrorMessage NVARCHAR(200)
SET NOCOUNT ON;
SET XACT_ABORT ON;

SELECT @current_version = [Version], @stream_id = StreamId
DECLARE @customErrorMessage NVARCHAR(200);

SELECT
@current_version = [Version],
@stream_id = StreamId
FROM [__schema__].Streams
WHERE StreamName = @stream_name
WHERE StreamName = @stream_name;

IF @stream_id is null
IF @stream_id IS NULL
BEGIN
IF @expected_version = -2 -- Any
OR @expected_version = -1 -- NoStream
BEGIN
BEGIN TRY
INSERT INTO [__schema__].Streams (StreamName, Version) VALUES (@stream_name, -1);
SELECT @current_version = Version, @stream_id = StreamId
FROM [__schema__].Streams
WHERE StreamName = @stream_name
SET @current_version = -1
INSERT INTO [__schema__].Streams (
StreamName,
[Version]
) VALUES (
@stream_name,
@current_version
);

SET @stream_id = SCOPE_IDENTITY();
END TRY
BEGIN CATCH
IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamName', ERROR_MESSAGE())) > 0
BEGIN
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, stream already exists', @expected_version);
THROW 50000, @customErrorMessage, 1;
END
END;
ELSE
THROW
END CATCH
END
BEGIN
;THROW;
END;
END CATCH;
END;
ELSE
THROW 50001, N'StreamNotFound', 1;
BEGIN
;THROW 50001, N'StreamNotFound', 1;
END;
END
ELSE
IF @expected_version != -2 and @expected_version != @current_version
BEGIN
IF @expected_version != -2 AND @expected_version != @current_version
BEGIN
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, current version %i', @expected_version, @current_version);
THROW 50000, @customErrorMessage, 1;
END
END
END;
END;
END;
31 changes: 19 additions & 12 deletions src/SqlServer/src/Eventuous.SqlServer/Scripts/4_ReadAllForwards.sql
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
CREATE OR ALTER PROCEDURE __schema__.read_all_forwards
@from_position bigint,
@count int
AS
@from_position BIGINT,
@count INT
AS
BEGIN

SELECT TOP (@count)
MessageId, MessageType, StreamPosition, GlobalPosition,
JsonData, JsonMetadata, Created, StreamName
FROM __schema__.Messages
INNER JOIN __schema__.Streams ON Messages.StreamId = Streams.StreamId
WHERE Messages.GlobalPosition >= @from_position
ORDER BY Messages.GlobalPosition
SET NOCOUNT ON;
SET XACT_ABORT ON;

END
SELECT TOP (@count)
m.MessageId,
m.MessageType,
m.StreamPosition,
m.GlobalPosition,
m.JsonData,
m.JsonMetadata,
m.Created,
s.StreamName
FROM __schema__.Messages m
JOIN __schema__.Streams s ON m.StreamId = s.StreamId
WHERE m.GlobalPosition >= @from_position
ORDER BY m.GlobalPosition;
END;
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,47 @@ CREATE OR ALTER PROCEDURE __schema__.read_stream_backwards
@count INT
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;

DECLARE @current_version int, @stream_id int
DECLARE
@current_version INT,
@stream_id INT;

SELECT @current_version = Version, @stream_id = StreamId
FROM __schema__.Streams
WHERE StreamName = @stream_name
SELECT
@current_version = [Version],
@stream_id = StreamId
FROM __schema__.Streams
WHERE StreamName = @stream_name;

IF @stream_id IS NULL
THROW 50001, 'StreamNotFound', 1;
IF @stream_id IS NULL
BEGIN
;THROW 50001, 'StreamNotFound', 1;
END;

IF @current_version < @from_position + @count
RETURN
-- nothing to read / invalid request
IF @count <= 0
BEGIN
RETURN;
END;

SELECT TOP (@count)
MessageId, MessageType, StreamPosition, GlobalPosition,
JsonData, JsonMetadata, Created
FROM __schema__.Messages
WHERE StreamId = @stream_id AND StreamPosition <= @from_position
ORDER BY Messages.StreamPosition DESC
-- Validate the starting position for backwards read.
IF @from_position < 0 -- A negative starting position is invalid
OR @from_position > @current_version -- A starting position greater than the current version means we're trying to read from beyond the head of the stream
BEGIN
RETURN;
END;

END
SELECT TOP (@count)
MessageId,
MessageType,
StreamPosition,
GlobalPosition,
JsonData,
JsonMetadata,
Created
FROM __schema__.Messages
WHERE StreamId = @stream_id
AND StreamPosition <= @from_position
ORDER BY StreamPosition DESC;
END;
Loading
Loading