Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Eventuous.sln
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Bookings", "samples\esdb\Bo
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.Persistence", "src\Core\test\Eventuous.Tests.Persistence\Eventuous.Tests.Persistence.csproj", "{F24F066B-FC7A-4298-B007-19CC86BB31E1}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Brokers", "Brokers", "{86D92758-EBB6-4B8C-94B7-BD91AF1E31D2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -652,6 +654,9 @@ Global
{7D24DAB3-FD49-443C-811A-96F0CA6A6F9A} = {75F337AF-7E15-4ED1-8E4F-A582DABEA373}
{C666D8E7-FB55-4435-A8D4-CF9815660E85} = {75F337AF-7E15-4ED1-8E4F-A582DABEA373}
{F24F066B-FC7A-4298-B007-19CC86BB31E1} = {0ED6785B-60EF-46B4-B938-EF04189FC8BC}
{1970CA0D-C5E8-4384-8485-82D712289002} = {86D92758-EBB6-4B8C-94B7-BD91AF1E31D2}
{6E545DFE-FE70-4486-92E0-E47E86E66210} = {86D92758-EBB6-4B8C-94B7-BD91AF1E31D2}
{2E59C5F8-3E5A-4450-B902-7648AD7ECC0F} = {86D92758-EBB6-4B8C-94B7-BD91AF1E31D2}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {0691467B-C257-46DB-BC4F-88EB7CD615B8}
Expand Down
2 changes: 1 addition & 1 deletion src/Core/src/Eventuous.Diagnostics/EventuousDiagnostics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static ActivitySource ActivitySource {
get {
if (activitySource != null) return activitySource;

activitySource = new ActivitySource(InstrumentationName, Version?.ToString());
activitySource = new(InstrumentationName, Version?.ToString());

listener = DummyActivityListener.Create();
ActivitySource.AddActivityListener(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public readonly record struct ExpectedStreamVersion(long Value) {

public record struct StreamReadPosition(long Value) {
public static readonly StreamReadPosition Start = new(0L);
public static readonly StreamReadPosition End = new(long.MaxValue);
}

public record struct StreamTruncatePosition(long Value);
2 changes: 2 additions & 0 deletions src/Core/src/Eventuous.Producers/BaseProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ protected BaseProducer(ProducerTracingOptions? tracingOptions = null) {

protected abstract Task ProduceMessages(StreamName stream, IEnumerable<ProducedMessage> messages, TProduceOptions? options, CancellationToken cancellationToken = default);

protected void SetActivityMessageType(string messageType) => Activity.Current?.SetTag(Message.Type, messageType);

public Task Produce(StreamName stream, IEnumerable<ProducedMessage> messages, CancellationToken cancellationToken = default)
=> Produce(stream, messages, null, cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public static (Activity? act, ProducedMessage msgs) Start(ProducedMessage messag
var messageId = message.MessageId.ToString();

activity?
.SetTag(TelemetryTags.Message.Type, message.MessageType)
.SetTag(TelemetryTags.Message.Id, messageId)
.SetTag(TelemetryTags.Messaging.MessageId, messageId)
.CopyParentTag(TelemetryTags.Messaging.ConversationId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class ProducerEventSource<T> : EventSource where T : class {
[NonEvent]
public void ProduceAcknowledged(ProducedMessage message) {
if (IsEnabled(EventLevel.Verbose, EventKeywords.All)) {
ProduceAcknowledged(ProducerName, message.MessageType);
ProduceAcknowledged(ProducerName, message);
}
}

Expand All @@ -29,14 +29,14 @@ public void ProduceNotAcknowledged(ProducedMessage message, string error, Except
if (!IsEnabled(EventLevel.Verbose, EventKeywords.All)) return;

var errorMessage = $"{error} {e?.Message}";
ProduceNotAcknowledged(ProducerName, message.MessageType, errorMessage);
ProduceNotAcknowledged(ProducerName, message, errorMessage);
}

[Event(ProduceAcknowledgedId, Level = EventLevel.Verbose, Message = "[{0}] Produce acknowledged: {1}")]
void ProduceAcknowledged(string producer, string message)
=> WriteEvent(ProduceAcknowledgedId, producer, message);
void ProduceAcknowledged(string producer, object message)
=> WriteEvent(ProduceAcknowledgedId, producer, message.GetType().Name);

[Event(ProduceNotAcknowledgedId, Level = EventLevel.Verbose, Message = "[{0}] Produce not acknowledged: {1} {2}")]
void ProduceNotAcknowledged(string producer, string message, string error)
=> WriteEvent(ProduceNotAcknowledgedId, producer, message, error);
void ProduceNotAcknowledged(string producer, object message, string error)
=> WriteEvent(ProduceNotAcknowledgedId, producer, message.GetType().Name, error);
}
2 changes: 0 additions & 2 deletions src/Core/src/Eventuous.Producers/ProducedMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ public ProducedMessage(object message, Metadata? metadata, Metadata? additionalH
Metadata = metadata;
AdditionalHeaders = additionalHeaders;
MessageId = messageId ?? Guid.NewGuid();
MessageType = TypeMap.GetTypeName(message, false);
}

public object Message { get; }
public Metadata? Metadata { get; init; }
public Metadata? AdditionalHeaders { get; }
public Guid MessageId { get; }
public string MessageType { get; }
public AcknowledgeProduce? OnAck { get; init; }
public ReportFailedProduce? OnNack { get; init; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ protected override async Task ProduceMessages(
foreach (var chunk in Ensure.NotNull(messages).Chunks(options.MaxAppendEventsCount)) {
var chunkMessages = chunk.ToArray();

var setMessageType = chunkMessages.Length == 1;

try {
await _client.AppendToStreamAsync(
stream,
options.ExpectedState,
// ReSharper disable once ConvertClosureToMethodGroup
chunkMessages.Select(message => CreateMessage(message)),
chunkMessages.Select(message => CreateMessage(message, setMessageType)),
null,
options.Deadline,
options.Credentials,
Expand All @@ -85,10 +87,14 @@ await chunkMessages
}
}

EventData CreateMessage(ProducedMessage message) {
EventData CreateMessage(ProducedMessage message, bool setMessageType) {
var msg = Ensure.NotNull(message.Message);
var (eventType, contentType, payload) = _serializer.SerializeEvent(msg);
message.Metadata!.Remove(MetaTags.MessageId);

if (setMessageType) {
SetActivityMessageType(eventType);
}
var metaBytes = _metaSerializer.Serialize(message.Metadata);

return new(Uuid.FromGuid(message.MessageId), eventType, payload, metaBytes, contentType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ public static class StreamRevisionExtensions {
/// </summary>
/// <param name="position">Position for stream reads</param>
/// <returns></returns>
public static StreamPosition AsStreamPosition(this StreamReadPosition position) => StreamPosition.FromInt64(position.Value);
public static StreamPosition AsStreamPosition(this StreamReadPosition position) => position == StreamReadPosition.End ? StreamPosition.End : StreamPosition.FromInt64(position.Value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ protected override async Task ProduceMessages(
var documents = messagesList.Select(x => x.Message);
var mode = options?.ProduceMode ?? ProduceMode.Create;

var bulk = GetOp(new BulkDescriptor(stream.ToString()));
var bulk = GetOp(new(stream.ToString()));
var result = await elasticClient.BulkAsync(bulk, cancellationToken);

if (!result.IsValid) {
Expand All @@ -48,6 +48,8 @@ protected override async Task ProduceMessages(

await Task.WhenAll(messagesList.Select(x => x.Ack<ElasticProducer>().AsTask())).NoContext();

return;

BulkDescriptor GetOp(BulkDescriptor descriptor)
=> mode switch {
ProduceMode.Create => GetCreateDescriptor(descriptor),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,15 @@ async Task ProduceLocal(ProducedMessage x) {
}

PubsubMessage CreateMessage(ProducedMessage message, PubSubProduceOptions? options) {
var (eventType, contentType, payload) = _serializer.SerializeEvent(message.Message);
var (messageType, contentType, payload) = _serializer.SerializeEvent(message.Message);
SetActivityMessageType(messageType);

var psm = new PubsubMessage {
Data = ByteString.CopyFrom(payload),
OrderingKey = options?.OrderingKey ?? "",
Attributes = {
{ _attributes.ContentType, contentType },
{ _attributes.EventType, eventType },
{ _attributes.EventType, messageType },
{ _attributes.MessageId, message.MessageId.ToString() }
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ protected override async Task ProduceMessages(
.AddHeader(KafkaHeaderKeys.MessageTypeHeader, serialized.EventType)
.AddHeader(KafkaHeaderKeys.ContentTypeHeader, serialized.ContentType);

SetActivityMessageType(serialized.EventType);
await ProduceLocal();

continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ void Publish(string stream, ProducedMessage message, RabbitMqProduceOptions? opt
var (msg, metadata) = (message.Message, message.Metadata);
var (eventType, contentType, payload) = _serializer.SerializeEvent(msg);

SetActivityMessageType(eventType);
var prop = _channel.CreateBasicProperties();
prop.ContentType = contentType;
prop.Persistent = options?.Persisted != false;
Expand Down