diff --git a/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaBasicSubscription.cs b/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaBasicSubscription.cs index b14f1162..e9b7f7ea 100644 --- a/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaBasicSubscription.cs +++ b/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaBasicSubscription.cs @@ -1,17 +1,250 @@ // Copyright (C) Eventuous HQ OÜ. All rights reserved // Licensed under the Apache License, Version 2.0. +using System.Text; using Eventuous.Subscriptions; +using Eventuous.Subscriptions.Context; using Eventuous.Subscriptions.Filters; +using Eventuous.Subscriptions.Logging; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace Eventuous.Kafka.Subscriptions; -public class KafkaBasicSubscription(KafkaSubscriptionOptions options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory, IEventSerializer? eventSerializer) - : EventSubscription(options, consumePipe, loggerFactory, eventSerializer) { - protected override ValueTask Subscribe(CancellationToken cancellationToken) - => throw new NotImplementedException(); +/// +/// Kafka subscription service that consumes messages with byte[] payload without using the schema registry. +/// The message type is specified in the headers, so the type mapping is required. +/// +[PublicAPI] +public class KafkaBasicSubscription : EventSubscription { + const string DynamicSerializationMessage = "Using dynamic serialization"; - protected override ValueTask Unsubscribe(CancellationToken cancellationToken) - => throw new NotImplementedException(); + readonly HandleConsumeError _failureHandler; + readonly IConsumer _consumer; + CancellationTokenSource? _consumerCts; + Task? _consumeTask; + + /// + /// Creates Kafka subscription service instance + /// + /// Subscription options + /// Pre-constructed consume pipe + /// Logger factory + /// Event serializer + public KafkaBasicSubscription( + IOptions options, + ConsumePipe consumePipe, + ILoggerFactory? loggerFactory, + IEventSerializer? eventSerializer = null + ) : this(options.Value, consumePipe, loggerFactory, eventSerializer) { } + + /// + /// Creates Kafka subscription service instance + /// + /// Subscription options + /// Pre-constructed consume pipe + /// Logger factory + /// Event serializer + public KafkaBasicSubscription( + KafkaSubscriptionOptions options, + ConsumePipe consumePipe, + ILoggerFactory? loggerFactory, + IEventSerializer? eventSerializer = null + ) + : base( + Ensure.NotNull(options), + consumePipe.AddFilterFirst(new AsyncHandlingFilter(options.ConcurrencyLimit)), + loggerFactory, + eventSerializer + ) { + _failureHandler = options.FailureHandler ?? DefaultFailureHandler; + + var consumerConfig = Ensure.NotNull(options.ConsumerConfig); + + // Ensure GroupId is set, defaulting to SubscriptionId if not provided + if (string.IsNullOrWhiteSpace(consumerConfig.GroupId)) { + consumerConfig.GroupId = options.SubscriptionId; + } + + _consumer = new ConsumerBuilder(consumerConfig) + .SetErrorHandler((_, error) => { + if (error.IsFatal) { + Log.WarnLog?.Log("Fatal Kafka error: {Error}", error.Reason); + Dropped(DropReason.ServerError, new KafkaException(error)); + } + else { + Log.WarnLog?.Log("Kafka error: {Error}", error.Reason); + } + }) + .Build(); + + if (options is { FailureHandler: not null, ThrowOnError: false }) { + Log.WarnLog?.Log("ThrowOnError is false but custom FailureHandler is set. FailureHandler will be called on errors."); + } + } + + const string ConsumeResultKey = "consumeResult"; + + [RequiresUnreferencedCode(DynamicSerializationMessage)] + [RequiresDynamicCode(DynamicSerializationMessage)] + protected override ValueTask Subscribe(CancellationToken cancellationToken) { + var topic = Ensure.NotEmptyString(Options.Topic); + + Log.InfoLog?.Log("Subscribing to Kafka topic {Topic}", topic); + + _consumer.Subscribe(topic); + + _consumerCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _consumeTask = Task.Run(() => ConsumeLoop(_consumerCts.Token), _consumerCts.Token); + + return default; + } + + [RequiresUnreferencedCode(DynamicSerializationMessage)] + [RequiresDynamicCode(DynamicSerializationMessage)] + async Task ConsumeLoop(CancellationToken cancellationToken) { + while (!cancellationToken.IsCancellationRequested) { + try { + var result = _consumer.Consume(cancellationToken); + + if (result?.Message == null) continue; + + Logger.Current = Log; + await HandleConsumed(result).NoContext(); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { + // Normal shutdown + break; + } + catch (ConsumeException ex) { + Log.WarnLog?.Log("Kafka consume error: {Error}", ex.Error.Reason); + + if (ex.Error.IsFatal) { + Dropped(DropReason.ServerError, ex); + break; + } + } + catch (Exception ex) { + Log.WarnLog?.Log("Unexpected error in Kafka consume loop: {Error}", ex.Message); + + if (Options.ThrowOnError) throw; + } + } + } + + [RequiresUnreferencedCode(DynamicSerializationMessage)] + [RequiresDynamicCode(DynamicSerializationMessage)] + async Task HandleConsumed(ConsumeResult result) { + try { + var ctx = CreateContext(result).WithItem(ConsumeResultKey, result); + var asyncCtx = new AsyncConsumeContext(ctx, Ack, Nack); + await Handler(asyncCtx).NoContext(); + } + catch (Exception ex) { + if (Options.ThrowOnError) throw; + + Log.WarnLog?.Log("Error handling Kafka message: {Error}", ex.Message); + } + } + + ValueTask Ack(IMessageConsumeContext ctx) { + var result = ctx.Items.GetItem>(ConsumeResultKey)!; + + try { + _consumer.Commit(result); + } + catch (KafkaException ex) { + Log.WarnLog?.Log("Failed to commit Kafka offset: {Error}", ex.Error.Reason); + } + + return default; + } + + ValueTask Nack(IMessageConsumeContext ctx, Exception exception) { + if (Options.ThrowOnError) throw exception; + + var result = ctx.Items.GetItem>(ConsumeResultKey)!; + _failureHandler(_consumer, result, exception); + + return default; + } + + [RequiresUnreferencedCode(DynamicSerializationMessage)] + [RequiresDynamicCode(DynamicSerializationMessage)] + MessageConsumeContext CreateContext(ConsumeResult result) { + var headers = result.Message.Headers; + + var messageType = GetHeaderValue(headers, KafkaHeaderKeys.MessageTypeHeader) ?? "unknown"; + var contentType = GetHeaderValue(headers, KafkaHeaderKeys.ContentTypeHeader) ?? "application/json"; + + var evt = DeserializeData( + contentType, + messageType, + result.Message.Value, + result.Topic, + (ulong)result.Offset.Value + ); + + var meta = headers.AsMetadata(); + + return new MessageConsumeContext( + result.Message.Key ?? Guid.NewGuid().ToString(), + messageType, + contentType, + result.Topic, + (ulong)result.Offset.Value, + (ulong)result.Offset.Value, + (ulong)result.Offset.Value, + Interlocked.Increment(ref Sequence), + result.Message.Timestamp.UtcDateTime, + evt, + meta, + SubscriptionId, + Stopping.Token + ); + } + + static string? GetHeaderValue(Headers headers, string key) { + var header = headers.FirstOrDefault(h => h.Key == key); + return header == null ? null : Encoding.UTF8.GetString(header.GetValueBytes()); + } + + protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) { + Log.InfoLog?.Log("Unsubscribing from Kafka topic {Topic}", Options.Topic); + + if (_consumerCts != null) { + await _consumerCts.CancelAsync(); + + if (_consumeTask != null) { + try { + await _consumeTask.WaitAsync(TimeSpan.FromSeconds(10), cancellationToken).NoContext(); + } + catch (TimeoutException) { + Log.WarnLog?.Log("Kafka consume task did not complete within timeout"); + } + catch (OperationCanceledException) { + // Expected + } + } + + _consumerCts.Dispose(); + _consumerCts = null; + } + + _consumer.Unsubscribe(); + _consumer.Close(); + _consumer.Dispose(); + } + + void DefaultFailureHandler(IConsumer consumer, ConsumeResult result, Exception? exception) { + Log.WarnLog?.Log( + "Error processing Kafka message from topic {Topic}, partition {Partition}, offset {Offset}: {Error}", + result.Topic, + result.Partition.Value, + result.Offset.Value, + exception?.Message ?? "Unknown error" + ); + // By default, we don't seek back - the message is lost. + // Users can implement custom FailureHandler for different behavior. + } } diff --git a/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaSubscriptionExtensions.cs b/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaSubscriptionExtensions.cs new file mode 100644 index 00000000..de25b010 --- /dev/null +++ b/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaSubscriptionExtensions.cs @@ -0,0 +1,25 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Kafka.Subscriptions; +using Eventuous.Subscriptions.Registrations; + +// ReSharper disable CheckNamespace +namespace Microsoft.Extensions.DependencyInjection; + +[PublicAPI] +public static class KafkaSubscriptionExtensions { + /// + /// Registers a Kafka subscription with the specified configuration. + /// + /// The service collection + /// Unique subscription identifier + /// Action to configure the subscription builder + /// The service collection for chaining + public static IServiceCollection AddKafkaSubscription( + this IServiceCollection services, + string subscriptionId, + Action> configureSubscription + ) + => services.AddSubscription(subscriptionId, configureSubscription); +} diff --git a/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaSubscriptionOptions.cs b/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaSubscriptionOptions.cs index c13fd0ef..81e69929 100644 --- a/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaSubscriptionOptions.cs +++ b/src/Kafka/src/Eventuous.Kafka/Subscriptions/KafkaSubscriptionOptions.cs @@ -5,6 +5,36 @@ namespace Eventuous.Kafka.Subscriptions; +/// +/// Options for Kafka subscription. +/// +[PublicAPI] public record KafkaSubscriptionOptions : SubscriptionOptions { + /// + /// Confluent.Kafka consumer configuration. + /// public ConsumerConfig ConsumerConfig { get; init; } = null!; + + /// + /// Topic name to subscribe to. + /// + public string Topic { get; init; } = null!; + + /// + /// Number of concurrent consumers, default is one. + /// + public uint ConcurrencyLimit { get; init; } = 1; + + /// + /// A function to handle event processing failure. If not specified, the default handler will be used. + /// + public HandleConsumeError? FailureHandler { get; init; } } + +/// +/// Delegate for handling Kafka consume errors. +/// +/// The Kafka consumer +/// The consume result containing the message +/// The exception that occurred during processing +public delegate void HandleConsumeError(IConsumer consumer, ConsumeResult result, Exception? exception); diff --git a/src/Kafka/test/Eventuous.Tests.Kafka/SubscriptionSpec.cs b/src/Kafka/test/Eventuous.Tests.Kafka/SubscriptionSpec.cs new file mode 100644 index 00000000..c1d1d6ee --- /dev/null +++ b/src/Kafka/test/Eventuous.Tests.Kafka/SubscriptionSpec.cs @@ -0,0 +1,88 @@ +using Confluent.Kafka; +using Eventuous.Kafka.Producers; +using Eventuous.Kafka.Subscriptions; +using Eventuous.Producers; +using Eventuous.Subscriptions.Filters; +using Eventuous.TestHelpers.TUnit; +using Eventuous.TestHelpers.TUnit.Logging; +using Eventuous.Tests.Subscriptions.Base; + +namespace Eventuous.Tests.Kafka; + +[ClassDataSource] +public class SubscriptionSpec { + static SubscriptionSpec() => TypeMap.Instance.RegisterKnownEventTypes(typeof(TestEvent).Assembly); + + KafkaBasicProducer _producer = null!; + TestEventHandler _handler = null!; +#pragma warning disable TUnit0023 + KafkaBasicSubscription _subscription = null!; + TestEventListener _es = null!; +#pragma warning restore TUnit0023 + readonly StreamName _topic; + readonly ILogger _log; + readonly ILoggerFactory _loggerFactory; + readonly KafkaFixture _fixture; + + public SubscriptionSpec(KafkaFixture fixture) { + _fixture = fixture; + _topic = new(Guid.NewGuid().ToString()); + _loggerFactory = LoggingExtensions.GetLoggerFactory(); + _log = _loggerFactory.CreateLogger(); + } + + [Test] + public async Task SubscribeAndProduce(CancellationToken cancellationToken) { + var testEvent = TestEvent.Create(); + await _producer.Produce(_topic, testEvent, new(), cancellationToken: cancellationToken); + await _handler.AssertThat().Timebox(10.Seconds()).Any().Match(x => x as TestEvent == testEvent).Validate(cancellationToken); + } + + [Test] + public async Task SubscribeAndProduceMany(CancellationToken cancellationToken) { + const int count = 200; + + var testEvents = TestEvent.CreateMany(count); + await _producer.Produce(_topic, testEvents, new(), cancellationToken: cancellationToken); + await _handler.AssertCollection(30.Seconds(), [..testEvents]).Validate(cancellationToken); + } + + [Before(Test)] + public async ValueTask InitializeAsync() { + _es = new(); + _handler = new(); + _producer = new(new KafkaProducerOptions(new() { BootstrapServers = _fixture.BootstrapServers })); + + var subscriptionId = Guid.NewGuid().ToString(); + + var options = new KafkaSubscriptionOptions { + ConsumerConfig = new() { + BootstrapServers = _fixture.BootstrapServers, + EnableAutoCommit = false, + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true + }, + Topic = _topic, + ConcurrencyLimit = 4, + SubscriptionId = subscriptionId, + ThrowOnError = true + }; + + _subscription = new( + options, + new ConsumePipe().AddDefaultConsumer(_handler), + _loggerFactory + ); + + await _subscription.SubscribeWithLog(_log); + await _producer.StartAsync(cancellationToken: default); + } + + [After(Test)] + public async ValueTask DisposeAsync() { + await _producer.StopAsync(cancellationToken: default); + await _subscription.UnsubscribeWithLog(_log); + _es.Dispose(); + await _subscription.DisposeAsync(); + } +}