Skip to content
Merged

Redis #198

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/Redis/src/Eventuous.Redis/Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using StackExchange.Redis;
namespace Eventuous.Redis.Extension;

public static class Extensions
{
public static long ToLong(this RedisValue value) {
var parts = ((string?)value)?.Split('-');
return Convert.ToInt64(parts?[0]) * 10 + Convert.ToInt64(parts?[1]);
}

public static ulong ToULong(this RedisValue value) {
var parts = ((string?)value)?.Split('-');
return Convert.ToUInt64(parts?[0]) * 10 + Convert.ToUInt64(parts?[1]);
}

public static RedisValue ToRedisValue(this long position) {
if (position == 0) return "0-0";
return new RedisValue($"{position / 10}-{position % 10}");
}
}
30 changes: 30 additions & 0 deletions src/Redis/src/Eventuous.Redis/Module.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System.Reflection;
using Eventuous.Tools;

namespace Eventuous.Redis;

public class Module {

static readonly Assembly Assembly = typeof(Module).Assembly;
public async Task LoadModule(GetRedisDatabase getDatabase) {
var names = Assembly.GetManifestResourceNames()
.Where(x => x.EndsWith(".lua"))
.OrderBy(x => x);

var db = getDatabase();

foreach (var name in names) {
await using var stream = Assembly.GetManifestResourceStream(name);
using var reader = new StreamReader(stream!);
var script = await reader.ReadToEndAsync().NoContext();

try {
await db.ExecuteAsync("FUNCTION", "LOAD", "REPLACE", script);
}
catch (Exception e) {
Console.WriteLine(e);
if (!e.Message.Contains("'append_events' already exists")) throw;
}
}
}
}
66 changes: 24 additions & 42 deletions src/Redis/src/Eventuous.Redis/RedisStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,32 @@
using System.Text;
using Eventuous.Diagnostics;
using StackExchange.Redis;
using Eventuous.Redis.Extension;

namespace Eventuous.Redis;

public delegate IDatabase GetRedisDatabase();

public class RedisStore : IEventStore {
readonly GetRedisDatabase _getDatabase;
readonly IEventSerializer _serializer;
readonly IMetadataSerializer _metaSerializer;
public record RedisStoreOptions();

public class RedisStore : IEventStore
{
readonly GetRedisDatabase _getDatabase;
readonly IEventSerializer _serializer;
readonly IMetadataSerializer _metaSerializer;

public RedisStore(
GetRedisDatabase getDatabase,
IEventSerializer? serializer = null,
IMetadataSerializer? metaSerializer = null
) {
GetRedisDatabase getDatabase,
RedisStoreOptions options,
IEventSerializer? serializer = null,
IMetadataSerializer? metaSerializer = null
)
{
_serializer = serializer ?? DefaultEventSerializer.Instance;
_metaSerializer = metaSerializer ?? DefaultMetadataSerializer.Instance;
_getDatabase = Ensure.NotNull(getDatabase, "Connection factory");
_getDatabase = Ensure.NotNull(getDatabase, "Connection factory");

}

const string ContentType = "application/json";

public async Task<StreamEvent[]> ReadEvents(
Expand All @@ -31,10 +37,10 @@ public async Task<StreamEvent[]> ReadEvents(
int count,
CancellationToken cancellationToken
) {
var result = await _getDatabase().StreamRangeAsync(stream.ToString(), ULongToStreamId(start), null, count);
if (result == null) throw new StreamNotFound(stream);

return result.Select(x => ToStreamEvent(x)).ToArray();
var result = await _getDatabase().StreamReadAsync(stream.ToString(), start.Value.ToRedisValue(), count);
if (result == null)
throw new StreamNotFound(stream);
return result.Select(x => ToStreamEvent(x)).ToArray();
}

public async Task<AppendEventsResult> AppendEvents(
Expand Down Expand Up @@ -70,7 +76,9 @@ CancellationToken cancellationToken

try {
var response = (string[]?)await database.ExecuteAsync("FCALL", fCallParams);
return new AppendEventsResult(StreamIdToULong(response?[1]), Convert.ToInt64(response?[0]));
var streamPosition = response?[0];
var globalPosition = response?[1];
return new AppendEventsResult(new RedisValue(globalPosition!).ToULong(), Convert.ToInt64(streamPosition));
}
catch (Exception e) when (e.Message.Contains("WrongExpectedVersion")) {
PersistenceEventSource.Log.UnableToAppendEvents(stream, e);
Expand Down Expand Up @@ -129,33 +137,7 @@ StreamEvent ToStreamEvent(StreamEntry evt) {
),
_ => throw new Exception("Unknown deserialization result")
};

StreamEvent AsStreamEvent(object payload)
=> new(Guid.Parse(evt["message_id"].ToString()), payload, meta ?? new Metadata(), ContentType, StreamIdToLong(evt.Id));
}

/*
16761513606580 -> 1676151360658-0
*/
static RedisValue ULongToStreamId(StreamReadPosition position) {
if (position == StreamReadPosition.Start) return "0-0";

return new RedisValue($"{position.Value / 10}-{position.Value % 10}");
}

/*
1676151360658-0 -> 16761513606580
*/
static ulong StreamIdToULong(RedisValue value) {
var parts = ((string?)value)?.Split('-');
return Convert.ToUInt64(parts?[0]) * 10 + Convert.ToUInt64(parts?[1]);
}

/*
1676151360658-0 -> 16761513606580
*/
static long StreamIdToLong(RedisValue value) {
var parts = ((string?)value)?.Split('-');
return Convert.ToInt64(parts?[0]) * 10 + Convert.ToInt64(parts?[1]);
=> new(Guid.Parse(evt["message_id"].ToString()), payload, meta ?? new Metadata(), ContentType, evt.Id.ToLong());
}
}
12 changes: 12 additions & 0 deletions src/Redis/src/Eventuous.Redis/Subscriptions/PersistentEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Eventuous.Redis;

public record PersistentEvent(
Guid MessageId,
string MessageType,
long StreamPosition,
long GlobalPosition,
string JsonData,
string? JsonMetadata,
DateTime Created,
string StreamName
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System.Linq;
using Eventuous.Subscriptions;
using Eventuous.Subscriptions.Checkpoints;
using Eventuous.Subscriptions.Context;
using Eventuous.Subscriptions.Filters;
using Eventuous.Redis;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using Eventuous.Redis.Extension;

namespace Eventuous.Redis.Subscriptions;

public class RedisAllStreamSubscription : RedisSubscriptionBase<RedisSubscriptionBaseOptions> {

public RedisAllStreamSubscription(
GetRedisDatabase getDatabase,
RedisAllStreamSubscriptionOptions options,
ICheckpointStore checkpointStore,
ConsumePipe consumePipe,
ILoggerFactory? loggerFactory
) : base (getDatabase, options, checkpointStore, consumePipe, loggerFactory) {

}

protected override async Task<PersistentEvent[]> ReadEvents(IDatabase database, long position)
{
var linkedEvents = await database.StreamReadAsync("_all", position.ToRedisValue(), 100);
var persistentEvents = new List<PersistentEvent>();

foreach(var linkEvent in linkedEvents) {
var stream = linkEvent["stream"];
var streamPosition = linkEvent["position"];

var streamEvents = await database.StreamRangeAsync(new RedisKey(stream), streamPosition);
var entry = streamEvents[0];

DateTime date;
System.DateTime.TryParse(entry["created"]!, out date);
persistentEvents.Add(new PersistentEvent(
Guid.Parse(entry["message_id"]!),
entry["message_type"]!,
entry.Id.ToLong(),
entry.Id.ToLong(),
entry["json_data"]!,
entry["json_metadata"],
date,
stream!));
}
return persistentEvents.ToArray();
}

protected override EventPosition GetPositionFromContext(IMessageConsumeContext context)
=> EventPosition.FromContext(context);

}

public record RedisAllStreamSubscriptionOptions : RedisSubscriptionBaseOptions;


Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using Eventuous.Subscriptions.Checkpoints;
using Eventuous.Subscriptions.Logging;
using Eventuous.Tools;
using Microsoft.Extensions.Logging;

namespace Eventuous.Redis.Subscriptions;

public class RedisCheckpointStore : ICheckpointStore {

readonly GetRedisDatabase _getDatabase;
readonly ILoggerFactory? _loggerFactory;

public RedisCheckpointStore(GetRedisDatabase getDatabase, ILoggerFactory? loggerFactory) {
_getDatabase = getDatabase;
_loggerFactory = loggerFactory;
}

public async ValueTask<Checkpoint> GetLastCheckpoint(
string checkpointId,
CancellationToken cancellationToken
) {
Checkpoint checkpoint;
Logger.ConfigureIfNull(checkpointId, _loggerFactory);
var position = await _getDatabase().StringGetAsync(checkpointId);
checkpoint = position.IsNull ? Checkpoint.Empty(checkpointId) : new Checkpoint(checkpointId, Convert.ToUInt64(position));
Logger.Current.CheckpointLoaded(this, checkpoint);
return checkpoint;
}

public async ValueTask<Checkpoint> StoreCheckpoint(
Checkpoint checkpoint,
bool force,
CancellationToken cancellationToken
) {
if (checkpoint.Position == null) return checkpoint;
await _getDatabase().StringSetAsync(checkpoint.Id, checkpoint.Position);
return checkpoint;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System.Linq;
using System.Text;
using Eventuous;
using Eventuous.Subscriptions;
using Eventuous.Subscriptions.Checkpoints;
using Eventuous.Subscriptions.Context;
using Eventuous.Subscriptions.Filters;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using Eventuous.Redis.Extension;

namespace Eventuous.Redis.Subscriptions;

public class RedisStreamSubscription : RedisSubscriptionBase<RedisSubscriptionBaseOptions> {

public RedisStreamSubscription(
GetRedisDatabase getDatabase,
RedisStreamSubscriptionOptions options,
ICheckpointStore checkpointStore,
ConsumePipe consumePipe,
ILoggerFactory? loggerFactory
) : base (getDatabase, options, checkpointStore, consumePipe, loggerFactory) {
_streamName = options.Stream.ToString();
}

protected override async Task BeforeSubscribe(CancellationToken cancellationToken) {
var info = await GetDatabase().StreamInfoAsync(_streamName);
if (info.Length <= 0)
throw new StreamNotFound(_streamName);
}

protected override async Task<PersistentEvent[]> ReadEvents(IDatabase database, long position)
{
var evts = await database.StreamReadAsync(_streamName, position.ToRedisValue(), 100);
return evts.Select( evt => {

DateTime date;
System.DateTime.TryParse(evt["created"]!, out date);
return new PersistentEvent (
Guid.Parse(evt["message_id"]!),
evt["message_type"]!,
evt.Id.ToLong(),
evt.Id.ToLong(),
evt["json_data"]!,
evt["json_metadata"],
date,
_streamName);
}
).ToArray();
}

readonly string _streamName;

protected override EventPosition GetPositionFromContext(IMessageConsumeContext context)
=> EventPosition.FromContext(context);

}

public record RedisStreamSubscriptionOptions(StreamName Stream) : RedisSubscriptionBaseOptions;


Loading