diff --git a/Netmash.sln b/Netmash.sln index 2b90a66..25e66aa 100644 --- a/Netmash.sln +++ b/Netmash.sln @@ -66,6 +66,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Netmash.Security.Authentica EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Netmash.Test.Api", "src\test\Netmash.Test.Api\Netmash.Test.Api.csproj", "{B7B073F6-91D5-4EB1-876C-76F6BD612FD9}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Netmash.Infrastructure.Messaging", "src\infrastructure\Netmash.Infrastructure.Messaging\Netmash.Infrastructure.Messaging.csproj", "{DC243FBB-3F39-4DBD-857F-93F2B9DBB35E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -124,6 +126,10 @@ Global {B7B073F6-91D5-4EB1-876C-76F6BD612FD9}.Debug|Any CPU.Build.0 = Debug|Any CPU {B7B073F6-91D5-4EB1-876C-76F6BD612FD9}.Release|Any CPU.ActiveCfg = Release|Any CPU {B7B073F6-91D5-4EB1-876C-76F6BD612FD9}.Release|Any CPU.Build.0 = Release|Any CPU + {DC243FBB-3F39-4DBD-857F-93F2B9DBB35E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DC243FBB-3F39-4DBD-857F-93F2B9DBB35E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DC243FBB-3F39-4DBD-857F-93F2B9DBB35E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DC243FBB-3F39-4DBD-857F-93F2B9DBB35E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -155,6 +161,7 @@ Global {5CB39BE9-0C83-4FDC-AB26-32642EA13639} = {06D5F056-4099-4636-A45C-D6C3B2CCDD66} {D926068E-7C61-4130-B7A0-C38294F13855} = {B8132F39-6677-4D70-84CA-9747DC9086B3} {B7B073F6-91D5-4EB1-876C-76F6BD612FD9} = {CCEE458E-02A8-42FD-8F5F-A35481A23303} + {DC243FBB-3F39-4DBD-857F-93F2B9DBB35E} = {1C1D634E-06CC-4707-9564-E31A76F27D9E} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {87541BAB-3FAC-4ADB-A7FB-8228DA87843D} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/DependencyInjectionExtensions.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/DependencyInjectionExtensions.cs new file mode 100644 index 0000000..9f8469c --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/DependencyInjectionExtensions.cs @@ -0,0 +1,21 @@ +using Microsoft.Extensions.DependencyInjection; +using Netmash.Infrastructure.Messaging.Services; + +namespace Netmash.Infrastructure.Messaging.Abstractions +{ + public static class DependencyInjectionExtensions + { + public static IServiceCollection AddMessageBus(this IServiceCollection services) + { + services.AddSingleton(); + // services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + // services.AddSingleton(); + // services.AddSingleton(); + + return services; + } + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/IMessageBusPublisher.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/IMessageBusPublisher.cs new file mode 100644 index 0000000..47258c4 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/IMessageBusPublisher.cs @@ -0,0 +1,15 @@ +using Netmash.Infrastructure.Messaging.Models; +using System.Threading; +using System.Threading.Tasks; + +namespace Netmash.Infrastructure.Messaging.Abstractions +{ + public interface IMessageBusPublisher + { + Task PublishAsync(T message, MessagingPublisherOptions publisherOptions = null, + CancellationToken cancellationToken = default); + + Task PublishAsync(T message, CancellationToken cancellationToken) => + PublishAsync(message, null, cancellationToken); + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/IMessageSerDes.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/IMessageSerDes.cs new file mode 100644 index 0000000..9c93b0e --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/IMessageSerDes.cs @@ -0,0 +1,22 @@ +using Netmash.Infrastructure.Messaging.Models; +using System.Collections.Generic; + +namespace Netmash.Infrastructure.Messaging.Abstractions +{ + public interface IMessageSerDes + { + byte[] SerializeMessageEnvelope(MessagingEnvelope envelope, MessageSerDesOptions options = null); + + TMessage DeserializePayload(byte[] payloadBytes, IDictionary medatada = null, + MessageSerDesOptions options = null); + + (byte[] payloadBytes, IDictionary additionalMetadata) + SerializePayload(TMessage message, MessageSerDesOptions options = null); + + MessagingEnvelope DeserializeMessageEnvelope(byte[] envelopeData, + MessageSerDesOptions options = null); + + MessagingEnvelope DeserializeMessageEnvelope(byte[] envelopeData, MessageSerDesOptions options = null) + => DeserializeMessageEnvelope(envelopeData, options); + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/IMessageTypeRegistry.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/IMessageTypeRegistry.cs new file mode 100644 index 0000000..2101609 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/IMessageTypeRegistry.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Reflection; + +namespace Netmash.Infrastructure.Messaging.Abstractions +{ + public interface IMessageTypeRegistry + { + Type ResolveType(string messageTypeId, IEnumerable scannedAssemblies); + + string GetTypeId(Type messageType); + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/IMessagingTransport.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/IMessagingTransport.cs new file mode 100644 index 0000000..682da1d --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/IMessagingTransport.cs @@ -0,0 +1,50 @@ +using Netmash.Infrastructure.Messaging.Models; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Netmash.Infrastructure.Messaging.Abstractions +{ + public interface IMessagingTransport + { + /// + /// Subscribes a message handler to the given topic + /// + /// The topic/channel to subscribe to + /// The message handler + /// Subscription options + /// + /// An object that when disposed unsubscribes the handler from the topic + Task SubscribeAsync(string topic, Func handler, + SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default); + + /// + /// Publishes a message to a topic + /// + /// The topic/channel to publish to + /// The context: message and headers + /// + Task PublishAsync(string topic, TransportSendContext sendContext, CancellationToken cancellationToken = default); + } + + public delegate void TransportErrorHandler(Exception ex); + + public interface ITransportMonitor + { + event TransportErrorHandler OnError; + } + + public record TransportSendContext( + Func<(byte[] payloadData, IDictionary additionalHeaders)> PayloadBytesAccessor, + Func EnvelopeBytesAccessor, + Func> HeadersAccessor); + + public record TransportReceiveContext(TransportReceivedData ReceivedData); + + public abstract record TransportReceivedData + { + public record EnvelopeBytes(byte[] Bytes) : TransportReceivedData; + public record PayloadBytesAndHeaders(byte[] PayloadBytes, IDictionary headers) : TransportReceivedData; + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/ITopicRegistry.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/ITopicRegistry.cs new file mode 100644 index 0000000..88096cf --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Abstractions/ITopicRegistry.cs @@ -0,0 +1,11 @@ +using System; + +namespace Netmash.Infrastructure.Messaging.Abstractions +{ + public interface ITopicRegistry + { + string GetTopicForMessageType(Type messageType, bool includePrefix = true); + string GetTopicForName(string topicName, bool includePrefix = true); + string GetTopicPrefix(); + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Attributes/MessageTypeIdAttribute.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Attributes/MessageTypeIdAttribute.cs new file mode 100644 index 0000000..0133b22 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Attributes/MessageTypeIdAttribute.cs @@ -0,0 +1,15 @@ +using System; + +namespace Netmash.Infrastructure.Messaging.Attributes +{ + [AttributeUsage(AttributeTargets.Class)] + public class MessageTypeIdAttribute : Attribute + { + public string MessageTypeId { get; } + + public MessageTypeIdAttribute(string messageTypeId) + { + MessageTypeId = messageTypeId; + } + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Attributes/TopicNameResolverAttribute.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Attributes/TopicNameResolverAttribute.cs new file mode 100644 index 0000000..90cf125 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Attributes/TopicNameResolverAttribute.cs @@ -0,0 +1,10 @@ +using Microsoft.Extensions.Configuration; +using System; + +namespace Netmash.Infrastructure.Messaging.Attributes +{ + public abstract class TopicNameResolverAttribute : Attribute + { + public abstract string ResolveTopicName(Type messageType, IConfiguration configuration); + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Constants/MessagingHeaders.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Constants/MessagingHeaders.cs new file mode 100644 index 0000000..119fa60 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Constants/MessagingHeaders.cs @@ -0,0 +1,11 @@ +namespace Netmash.Infrastructure.Messaging.Constants +{ + public static class MessagingHeaders + { + public const string + MessageType = "X-MESSAGE-TYPE", + Source = "X-SOURCE", + MessageId = "X-MESSAGE-ID", + PublishTime = "X-PUBLISH-TIME"; + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Extensions/MessagingEnvelopeExtensions.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Extensions/MessagingEnvelopeExtensions.cs new file mode 100644 index 0000000..df676f7 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Extensions/MessagingEnvelopeExtensions.cs @@ -0,0 +1,24 @@ +using Netmash.Infrastructure.Messaging.Constants; +using Netmash.Infrastructure.Messaging.Models; + +namespace Netmash.Infrastructure.Messaging.Extensions +{ + public static class MessagingEnvelopeExtensions + { + public static string GetMessageTypeId(this MessagingEnvelope envelope) + { + return envelope.Headers.TryGetValue(MessagingHeaders.MessageType, out var value) + ? value + : null; + } + + public static void SetHeader(this MessagingEnvelope envelope, string header, string value, bool overwrite = false) + { + if (!overwrite && envelope.Headers.ContainsKey(header) && !string.IsNullOrEmpty(envelope.Headers[header])) + return; + + envelope.Headers[header] = value; + + } + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Extensions/TypeExtensions.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Extensions/TypeExtensions.cs new file mode 100644 index 0000000..e61831c --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Extensions/TypeExtensions.cs @@ -0,0 +1,85 @@ +using System; +using System.Linq; +using System.Text; + +namespace Netmash.Infrastructure.Messaging.Extensions +{ + internal static class TypeExtensions + { + public static string GetPrettyName(this Type type) + { + var retType = new StringBuilder(); + + if (type.IsGenericType) + { + var parentTypeName = type.FullName?.Split('`')[0].Split('.').Last(); + // We will build the type here. + Type[] arguments = type.GetGenericArguments(); + + var argList = new StringBuilder(); + foreach (Type t in arguments) + { + // Let's make sure we get the argument list. + string arg = t.GetPrettyName(); + if (argList.Length > 0) + { + argList.AppendFormat(", {0}", arg); + } + else + { + argList.Append(arg); + } + } + + if (argList.Length > 0) + { + retType.AppendFormat("{0}<{1}>", parentTypeName, argList.ToString()); + } + } + else + { + return type.FullName?.Split('.').Last() ?? type.Name; + } + + return retType.ToString(); + } + + public static string GetLongPrettyName(this Type type) + { + var retType = new StringBuilder(); + + if (type.IsGenericType) + { + var parentTypeName = type.FullName?.Split('`')[0]; + // We will build the type here. + Type[] arguments = type.GetGenericArguments(); + + var argList = new StringBuilder(); + foreach (Type t in arguments) + { + // Let's make sure we get the argument list. + string arg = t.GetLongPrettyName(); + if (argList.Length > 0) + { + argList.AppendFormat(", {0}", arg); + } + else + { + argList.Append(arg); + } + } + + if (argList.Length > 0) + { + retType.AppendFormat("{0}<{1}>", parentTypeName, argList.ToString()); + } + } + else + { + return type.FullName; + } + + return retType.ToString(); + } + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Host/DependencyInjectionExtensions.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Host/DependencyInjectionExtensions.cs new file mode 100644 index 0000000..5d46c23 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Host/DependencyInjectionExtensions.cs @@ -0,0 +1,29 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Netmash.Infrastructure.Messaging.Abstractions; +using Netmash.Infrastructure.Messaging.Host.Models; +using System; + +namespace Netmash.Infrastructure.Messaging.Host +{ + public static class DependencyInjectionExtensions + { + public static IServiceCollection AddMessagingHost(this IServiceCollection serviceCollection, IConfiguration configuration, Action configure) + { + serviceCollection.Configure(configuration.GetSection("Messaging").GetSection("Host")); + + serviceCollection.AddSingleton(); + serviceCollection.AddHostedService(); + serviceCollection.AddSingleton(); + serviceCollection.Decorate(); + + + serviceCollection.TryAddSingleton(serviceCollection); + + var builder = new MessagingHostBuilder(serviceCollection); + configure.Invoke(builder); + + return serviceCollection; + } + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Host/Models/MessagingHostOptions.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Host/Models/MessagingHostOptions.cs new file mode 100644 index 0000000..858b731 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Host/Models/MessagingHostOptions.cs @@ -0,0 +1,14 @@ +namespace Netmash.Infrastructure.Messaging.Host.Models +{ + public class MessagingHostOptions + { + public TransportErrorStrategy TransportErrorStrategy { get; set; } = TransportErrorStrategy.Retry; + public int StartRetryCount { get; set; } = 10; + } + + public enum TransportErrorStrategy + { + Throw = 0, + Retry = 1 + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Models/MessageSerDesOptions.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Models/MessageSerDesOptions.cs new file mode 100644 index 0000000..8b8774a --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Models/MessageSerDesOptions.cs @@ -0,0 +1,28 @@ +using System.Collections.Generic; +using System.Reflection; + +namespace Netmash.Infrastructure.Messaging.Models +{ + /// + /// Options for serialization/deserialization + /// + public record MessageSerDesOptions + { + /// + /// Options for message deserialization. + /// + /// + /// The type of deserialization. + /// + public bool UseDynamicDeserialization { get; init; } + /// + /// he list of assemblies to be scanned in case of the 'Dynamic' deserialization option. + /// + /// + /// The scanned assemblies used in dynamic deserialization. + /// + public IEnumerable DynamicDeserializationScannedAssemblies { get; init; } + + public static MessageSerDesOptions Default { get; } = new(); + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Models/MessagingEnvelope.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Models/MessagingEnvelope.cs new file mode 100644 index 0000000..6b11724 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Models/MessagingEnvelope.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; + +namespace Netmash.Infrastructure.Messaging.Models +{ + public record MessagingEnvelope + { + public IDictionary Headers { get; } + + public object Payload { get; } + + public MessagingEnvelope(IDictionary headers, object payload) + { + Headers = headers ?? throw new ArgumentNullException(nameof(headers)); + Payload = payload ?? throw new ArgumentNullException(nameof(payload)); + } + } + + public record MessagingEnvelope : MessagingEnvelope + { + public MessagingEnvelope(IDictionary headers, TMessage payload) + : base(headers, payload) + { + } + + public new TMessage Payload => (TMessage)base.Payload; + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Models/MessagingPublisherOptions.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Models/MessagingPublisherOptions.cs new file mode 100644 index 0000000..0e4f75f --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Models/MessagingPublisherOptions.cs @@ -0,0 +1,13 @@ +using System; + +namespace Netmash.Infrastructure.Messaging.Models +{ + public record MessagingPublisherOptions + { + public Action EnvelopeCustomizer { get; init; } + + public string TopicName { get; init; } + + public static MessagingPublisherOptions Default = new(); + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Models/SubscriptionTransportOptions.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Models/SubscriptionTransportOptions.cs new file mode 100644 index 0000000..aabfa21 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Models/SubscriptionTransportOptions.cs @@ -0,0 +1,50 @@ +namespace Netmash.Infrastructure.Messaging.Models +{ + public record SubscriptionTransportOptions + { + /// + /// Specifies if a durable subscription should be made (default true). + /// + public bool IsDurable { get; init; } = true; + + /// + /// Specifies if a consumer group should be used (default true). + /// + public bool UseGroup { get; init; } = true; + + /// + /// Specifies the maximum number of messages that can be handled concurrently (default 1). + /// + public int MaxConcurrentMessages { get; init; } = 1; + + /// + /// If set, only new messages are delivered. Otherwise all available messages are delivered even if published before the subscription. + /// + public bool DeliverNewMessagesOnly { get; set; } = true; + + /// + /// Timeout before message redelivery (in milliseconds) + /// + public int? AckWait { get; init; } = null; + + /// + /// Default transport options + /// + public static SubscriptionTransportOptions Default => new(); + + /// + /// Stream processor transport options (with durable subscription) + /// + public static SubscriptionTransportOptions StreamProcessor => new() + { DeliverNewMessagesOnly = false }; + + /// + /// Request/Reply transport options (non-durable + /// + public static SubscriptionTransportOptions RequestReply => new() + { IsDurable = false, UseGroup = false }; + + public static SubscriptionTransportOptions PubSub => new() + { IsDurable = false }; + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Nats/DependencyInjectionExtensions.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Nats/DependencyInjectionExtensions.cs new file mode 100644 index 0000000..d0f867f --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Nats/DependencyInjectionExtensions.cs @@ -0,0 +1,20 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Netmash.Infrastructure.Messaging.Abstractions; +using Netmash.Infrastructure.Messaging.Nats.Records; + +namespace Netmash.Infrastructure.Messaging.Nats +{ + public static class DependencyInjectionExtensions + { + public static IServiceCollection AddNatsTransport(this IServiceCollection services, IConfiguration configuration) + { + services.Configure(configuration.GetSection("Messaging").GetSection("Nats")); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(sp => sp.GetRequiredService()); + + return services; + } + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Nats/Records/NatsOptions.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Nats/Records/NatsOptions.cs new file mode 100644 index 0000000..071db60 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Nats/Records/NatsOptions.cs @@ -0,0 +1,35 @@ +namespace Netmash.Infrastructure.Messaging.Nats.Records +{ + internal record NatsOptions + { + /// + /// URL of the Streaming NATS cluster + /// + public string NatsUrl { get; set; } + + /// + /// Streaming NATS cluster name + /// + public string Cluster { get; set; } + + /// + /// Identifier of the Streaming NATS client + /// + public string ClientId { get; set; } + + /// + /// Durable subscriptions name. + /// + public string DurableName { get; set; } + + /// + /// Queue group name + /// + public string QGroup { get; set; } + + /// + /// The time the server awaits for acknowledgement from the client before redelivering the message (in milliseconds) + /// + public int? AckWait { get; set; } + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Nats/StanConnectionProvider.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Nats/StanConnectionProvider.cs new file mode 100644 index 0000000..0e86939 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Nats/StanConnectionProvider.cs @@ -0,0 +1,163 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using NATS.Client; +using Netmash.Infrastructure.Messaging.Abstractions; +using Netmash.Infrastructure.Messaging.Nats.Records; +using STAN.Client; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Netmash.Infrastructure.Messaging.Nats +{ + internal class StanConnectionProvider : IDisposable, ITransportMonitor + { + private readonly IOptions _natsOptions; + private readonly ILogger _logger; + private IStanConnection _connection; + private Exception _unrecoverableException; + private AtomicLazy _lazyConnection; + + public event TransportErrorHandler OnError; + + public StanConnectionProvider(IOptions natsOptions, ILogger logger) + { + _natsOptions = natsOptions; + _logger = logger; + _lazyConnection = new AtomicLazy(GetConnection); + } + + public async Task ExecuteAsync(Func action) + { + var connection = GetAndCheckConnection(); + + await action(connection); + } + + public void Execute(Action action) + { + var connection = GetAndCheckConnection(); + + action(connection); + } + + private IStanConnection GetAndCheckConnection() + { + ThrowIfUnrecoverableState(); + + return _lazyConnection.Value; + } + + private IStanConnection GetConnection() + { + var clientId = _natsOptions.Value.ClientId?.Replace(".", "_"); + var options = StanOptions.GetDefaultOptions(); + options.NatsURL = _natsOptions.Value.NatsUrl; + + options.ConnectionLostEventHandler = (_, args) => + { + SetConnectionLostState(args.ConnectionException ?? new Exception("NATS connection was lost")); + }; + + //fix https://github.com/nats-io/csharp-nats-streaming/issues/28 + options.PubAckWait = 30000; + + var cf = new StanConnectionFactory(); + _connection = cf.CreateConnection(_natsOptions.Value.Cluster, clientId + Guid.NewGuid(), options); + + _logger.LogInformation($"NATS connection to {_natsOptions.Value.NatsUrl} was established"); + + return _connection; + } + + private void SetConnectionLostState(Exception exception) + { + // Set the field to the current exception if not already set + var existingException = + Interlocked.CompareExchange(ref _unrecoverableException, exception, null); + + // Send the application stop signal only once + if (existingException != null) + return; + + _logger.LogError(exception, "NATS connection unrecoverable"); + + ResetConnection(); + + OnError?.Invoke(exception); + } + + private void ResetConnection() + { + _connection?.Dispose(); + _connection = null; + _lazyConnection = new AtomicLazy(GetConnection); + + Interlocked.CompareExchange(ref _unrecoverableException, null, _unrecoverableException); + + _logger.LogInformation("NATS connection was reset"); + } + + private void ThrowIfUnrecoverableState() + { + // For consistency, read the field using the same primitive used for writing instead of using Thread.VolatileRead + var exception = Interlocked.CompareExchange(ref _unrecoverableException, null, null); + if (exception != null) + { + throw new Exception("NATS connection encountered an unrecoverable exception", exception); + } + } + + private static bool IsUnrecoverableException(Exception ex) => + ex is NATSConnectionClosedException || + ex is StanConnectionClosedException || + ex is NATSConnectionException || + ex is StanConnectionException || + ex is NATSBadSubscriptionException || + ex is StanBadSubscriptionException || + ex is StanTimeoutException || + ex is NATSTimeoutException || + ex is NATSStaleConnectionException || + ex is NATSNoServersException || + ex is StanConnectRequestException || + ex is StanMaxPingsException; + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + _connection?.Dispose(); + } + } + + private class AtomicLazy + { + private readonly Func _factory; + + private T _value; + + private bool _initialized; + + private object _lock; + + public AtomicLazy(Func factory) + { + _factory = factory; + } + + public AtomicLazy(T value) + { + _value = value; + _initialized = true; + } + + public T Value => LazyInitializer.EnsureInitialized(ref _value, ref _initialized, ref _lock, _factory); + } + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Nats/StanMessagingTransport.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Nats/StanMessagingTransport.cs new file mode 100644 index 0000000..b7e10d0 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Nats/StanMessagingTransport.cs @@ -0,0 +1,75 @@ +using Microsoft.Extensions.Options; +using Netmash.Infrastructure.Messaging.Abstractions; +using Netmash.Infrastructure.Messaging.Models; +using Netmash.Infrastructure.Messaging.Nats.Records; +using STAN.Client; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Netmash.Infrastructure.Messaging.Nats +{ + internal class StanMessagingTransport : IMessagingTransport + { + private readonly StanConnectionProvider _stanConnectionManager; + private readonly IOptions _natsOptions; + + public StanMessagingTransport(StanConnectionProvider stanConnectionManager, IOptions natsOptions) + { + _stanConnectionManager = stanConnectionManager; + _natsOptions = natsOptions; + } + + public Task SubscribeAsync(string topic, Func handler, + SubscriptionTransportOptions options = null, + CancellationToken cancellationToken = default) + { + var opts = StanSubscriptionOptions.GetDefaultOptions(); + var subscriberOptions = options ?? SubscriptionTransportOptions.Default; + if (subscriberOptions.IsDurable) + { + opts.DurableName = _natsOptions.Value.DurableName; + opts.LeaveOpen = true; + } + + if (!subscriberOptions.DeliverNewMessagesOnly) + { + opts.DeliverAllAvailable(); + } + + // https://github.com/nats-io/stan.go#subscriber-rate-limiting + opts.MaxInflight = subscriberOptions.MaxConcurrentMessages; + opts.AckWait = subscriberOptions.AckWait ?? _natsOptions.Value.AckWait ?? 50000; + opts.ManualAcks = true; + + void StanMsgHandler(object obj, StanMsgHandlerArgs args) + { + if (cancellationToken.IsCancellationRequested) + return; + + var receiveContext = new TransportReceiveContext(new TransportReceivedData.EnvelopeBytes(args.Message.Data)); + + // Fire and forget + _ = handler(receiveContext).ContinueWith(_ => args.Message.Ack(), cancellationToken); + } + + IDisposable subscription = null; + _stanConnectionManager.Execute(stanConnection => + { + subscription = subscriberOptions.UseGroup + ? stanConnection.Subscribe(topic, _natsOptions.Value.QGroup, opts, StanMsgHandler) + : stanConnection.Subscribe(topic, opts, StanMsgHandler); + }); + + return Task.FromResult(subscription); + } + + public Task PublishAsync(string topic, TransportSendContext sendContext, CancellationToken cancellationToken = default) + { + var envelopeData = sendContext.EnvelopeBytesAccessor.Invoke(); + + return _stanConnectionManager.ExecuteAsync( + async connection => await connection.PublishAsync(topic, envelopeData)); + } + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Netmash.Infrastructure.Messaging.csproj b/src/infrastructure/Netmash.Infrastructure.Messaging/Netmash.Infrastructure.Messaging.csproj new file mode 100644 index 0000000..0d14b61 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Netmash.Infrastructure.Messaging.csproj @@ -0,0 +1,16 @@ + + + + net6.0 + + + + + + + + + + + + diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Services/DefaultMessageTypeRegistry.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Services/DefaultMessageTypeRegistry.cs new file mode 100644 index 0000000..50173b5 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Services/DefaultMessageTypeRegistry.cs @@ -0,0 +1,348 @@ +using Netmash.Infrastructure.Messaging.Abstractions; +using Netmash.Infrastructure.Messaging.Attributes; +using Netmash.Infrastructure.Messaging.Extensions; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; + +namespace Netmash.Infrastructure.Messaging.Services +{ + public class DefaultMessageTypeRegistry : IMessageTypeRegistry + { + private readonly ConcurrentDictionary _typeCache = new(); + + public Type ResolveType(string messageTypeId, IEnumerable scannedAssemblies) + { + var scannedAssembliesList = scannedAssemblies?.ToList() ?? new List(); + + return _typeCache.GetOrAdd(messageTypeId, key => + GetTypeByAttribute(key, scannedAssembliesList) ?? + GetTypeByName(key, scannedAssembliesList) ?? + throw new ApplicationException($"Type could not be resolved: {key}") + ); + } + + public string GetTypeId(Type messageType) + { + return messageType.GetCustomAttribute()?.MessageTypeId + ?? TypeInfo.Build(messageType).ToString(); + } + + private static string GetTypeNameIncludingNesting(Type type) + { + return type.DeclaringType == null ? + type.Name : + $"{GetTypeNameIncludingNesting(type.DeclaringType)}{TypeInfo.NestedSeparator}{type.Name}"; + } + + private Type GetTypeByName(string messageTypeId, List scannedAssemblies) + { + var parsedType = TypeInfo.Parse(messageTypeId); + + return GetTypeByNameInternal(parsedType, scannedAssemblies); + } + + private Type GetTypeByNameInternal(TypeInfo parsedType, List scannedAssemblies) + { + var dotnetFormattedName = parsedType.GetNameForDotnetComparison(); + var foundType = WellKnownTypes.Resolve(dotnetFormattedName); + if (foundType != null) + return foundType; + + var foundTypes = scannedAssemblies + .SelectMany(x => x.ExportedTypes) + .Where(x => GetTypeNameIncludingNesting(x) == dotnetFormattedName) + .ToList(); + + if (foundTypes.Count > 1) + throw new ApplicationException($"Multiple types found with name: {foundTypes.First().GetPrettyName()}"); + + foundType = foundTypes.FirstOrDefault(); + if (foundType == null) + return null; + + if (parsedType.IsGeneric && parsedType.TypeArguments.Any()) + { + var foundTypeArgs = new List(); + foreach (var parsedTypeTypeArgument in parsedType.GetAllTypeArguments()) + { + var foundTypeArgument = GetTypeByNameInternal(parsedTypeTypeArgument, scannedAssemblies); + if (foundTypeArgument == null) + return null; + + foundTypeArgs.Add(foundTypeArgument); + } + + foundType = foundType.MakeGenericType(foundTypeArgs.ToArray()); + } + + return parsedType.ArrayDimensions.AsEnumerable() + .Reverse() + .Aggregate(foundType, (current, arrayDimension) => arrayDimension.Dimensions == 1 + ? current.MakeArrayType() + : current.MakeArrayType(arrayDimension.Dimensions)); + } + + private Type GetTypeByAttribute(string messageTypeId, List scannedAssemblies) + { + var foundTypes = scannedAssemblies + .SelectMany(x => x.ExportedTypes) + .Where(x => x.GetCustomAttribute()?.MessageTypeId == messageTypeId) + .ToList(); + + if (foundTypes.Count > 1) + throw new ApplicationException( + $"Multiple types found with the same MessageTypeIdAttribute: {messageTypeId}"); + + return foundTypes.FirstOrDefault(); + } + + private class TypeInfo + { + public const char NestedSeparator = '+'; + private string _name; + + private string Name + { + get => _name; + set => _name ??= value?.Trim(); + } + + public bool IsGeneric { get; private set; } + public List ArrayDimensions { get; private set; } + public List TypeArguments { get; private set; } + private TypeInfo DeclaringTypeInfo { get; set; } + + public class ArrayDimension + { + public int Dimensions { get; set; } + + public ArrayDimension() + { + Dimensions = 1; + } + + public override string ToString() + { + return $"[{new string(',', Dimensions - 1)}]"; + } + } + + private TypeInfo() + { + Name = null; + IsGeneric = false; + ArrayDimensions = new List(); + TypeArguments = new List(); + } + + public override string ToString() + { + var str = new StringBuilder(); + if (DeclaringTypeInfo != null) + { + str.Append(DeclaringTypeInfo).Append(NestedSeparator); + } + + str.Append(Name); + if (IsGeneric) + str.Append($"<{string.Join(",", TypeArguments.Select(tn => tn.ToString()))}>"); + + foreach (var d in ArrayDimensions) + str.Append(d); + + return str.ToString(); + } + + public string GetNameForDotnetComparison() + { + var formattedExpectedTypeName = IsGeneric ? $"{Name}`{TypeArguments.Count}" : Name; + + return DeclaringTypeInfo != null + ? $"{DeclaringTypeInfo.GetNameForDotnetComparison()}{NestedSeparator}{formattedExpectedTypeName}" + : formattedExpectedTypeName; + } + + public List GetAllTypeArguments() + { + var allArguments = new List(); + if (DeclaringTypeInfo != null) + { + allArguments.AddRange(DeclaringTypeInfo.GetAllTypeArguments()); + } + allArguments.AddRange(TypeArguments); + + return allArguments; + } + + public static TypeInfo Parse(string name) + { + var pos = 0; + return ParseInternal(name, ref pos, out _); + } + + public static TypeInfo Build(Type type) + { + if (type.IsArray) + { + var elementTypeName = Build(type.GetElementType()); + elementTypeName.ArrayDimensions.Add( + new ArrayDimension { Dimensions = type.GetArrayRank() }); + elementTypeName.ArrayDimensions.Reverse(); + return elementTypeName; + } + + var typeName = new TypeInfo + { + Name = type.IsGenericType ? type.Name.Substring(0, type.Name.IndexOf('`')) : type.Name, + IsGeneric = type.IsGenericType, + TypeArguments = type.GenericTypeArguments + .Select(Build).ToList(), + }; + + if (type.DeclaringType != null) + { + typeName.DeclaringTypeInfo = BuildDeclaringTypeName(type, typeName); + } + + return typeName; + } + + private static TypeInfo BuildDeclaringTypeName(Type type, TypeInfo typeInfo) + { + if (type.DeclaringType == null) + return null; + + var declaringTypeName = Build(type.DeclaringType); + var declaringTypeGenericParamsCount = ((System.Reflection.TypeInfo)type.DeclaringType).GenericTypeParameters.Length; + declaringTypeName.TypeArguments.AddRange(typeInfo.TypeArguments.GetRange(0, declaringTypeGenericParamsCount)); + typeInfo.TypeArguments.RemoveRange(0, declaringTypeGenericParamsCount); + + return declaringTypeName; + } + + private static TypeInfo ParseInternal(string name, ref int index, out bool listTerminated) + { + var nameBuilder = new StringBuilder(); + var typeName = new TypeInfo(); + listTerminated = true; + + while (index < name.Length) + { + char currentChar = name[index++]; + switch (currentChar) + { + case ',': + typeName.Name = nameBuilder.ToString(); + listTerminated = false; + return typeName; + case '>': + typeName.Name = nameBuilder.ToString(); + listTerminated = true; + return typeName; + case NestedSeparator: + typeName.Name = nameBuilder.ToString(); + typeName = new TypeInfo { DeclaringTypeInfo = typeName }; + nameBuilder.Clear(); + break; + case '<': + { + typeName.Name = nameBuilder.ToString(); + typeName.IsGeneric = true; + nameBuilder.Clear(); + + bool terminated = false; + while (!terminated) + { + typeName.TypeArguments.Add(ParseInternal(name, ref index, out terminated)); + } + + if (name[index - 1] != '>') + throw new Exception("Missing closing > of generic type list."); + + break; + } + case '[': + var arrayDimension = new ArrayDimension(); + typeName.ArrayDimensions.Add(arrayDimension); + + if (index >= name.Length) + throw new Exception("Expecting ] or , after [ for array type, but reached end of string."); + + bool arrayTerminated = false; + while (index < name.Length && !arrayTerminated) + { + currentChar = name[index++]; + switch (currentChar) + { + case ']': + arrayTerminated = true;//array specifier terminated + break; + case ',': //multidimensional array + arrayDimension.Dimensions++; + break; + default: + throw new Exception($@"Expecting ""]"" or "","" after ""["" for array specifier but encountered ""{currentChar}""."); + } + } + break; + default: + nameBuilder.Append(currentChar); + continue; + } + } + + typeName.Name = nameBuilder.ToString(); + return typeName; + } + } + + private static class WellKnownTypes + { + private static readonly Dictionary Map = new() + { + ["bool"] = typeof(bool), + ["Boolean"] = typeof(bool), + ["byte"] = typeof(byte), + ["Byte"] = typeof(byte), + ["sbyte"] = typeof(sbyte), + ["SByte"] = typeof(sbyte), + ["char"] = typeof(char), + ["Char"] = typeof(char), + ["decimal"] = typeof(decimal), + ["Decimal"] = typeof(decimal), + ["double"] = typeof(double), + ["Double"] = typeof(decimal), + ["float"] = typeof(float), + ["Float"] = typeof(float), + ["int"] = typeof(int), + ["Int32"] = typeof(int), + ["uint"] = typeof(uint), + ["UInt32"] = typeof(uint), + ["long"] = typeof(long), + ["Int64"] = typeof(long), + ["ulong"] = typeof(ulong), + ["UInt64"] = typeof(ulong), + ["object"] = typeof(object), + ["Object"] = typeof(object), + ["ushort"] = typeof(ushort), + ["UInt16"] = typeof(ushort), + ["short"] = typeof(short), + ["Int16"] = typeof(short), + ["string"] = typeof(string), + ["String"] = typeof(string), + + ["List`1"] = typeof(List<>), + ["Dictionary`2"] = typeof(Dictionary<,>) + }; + + public static Type Resolve(string typeName) + { + return Map.TryGetValue(typeName, out var type) ? type : null; + } + } + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Services/DefaultTopicRegistry.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Services/DefaultTopicRegistry.cs new file mode 100644 index 0000000..644c09f --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Services/DefaultTopicRegistry.cs @@ -0,0 +1,62 @@ +using Microsoft.Extensions.Configuration; +using Netmash.Infrastructure.Messaging.Abstractions; +using Netmash.Infrastructure.Messaging.Attributes; +using Netmash.Infrastructure.Messaging.Extensions; +using System; +using System.Linq; + +namespace Netmash.Infrastructure.Messaging.Services +{ + internal class DefaultTopicRegistry : ITopicRegistry + { + private readonly IConfiguration _configuration; + + public DefaultTopicRegistry(IConfiguration configuration) + { + _configuration = configuration; + } + + public string GetTopicForMessageType(Type messageType, bool includePrefix = true) + { + var topic = GetTopicNameFromAttribute(messageType) ?? messageType.GetLongPrettyName(); + + topic = GetTopicForName(topic, includePrefix); + + return topic; + } + + public string GetTopicForName(string topicName, bool includePrefix = true) + { + if (topicName == null) + { + return null; + } + + var topic = (includePrefix ? GetTopicPrefix() : string.Empty) + topicName; + topic = topic.Replace("+", "."); + topic = topic.Replace("<", "_"); + topic = topic.Replace(">", "_"); + + return topic; + } + + private string GetTopicNameFromAttribute(Type messageType) + { + var topicNameResolver = messageType.GetCustomAttributes(typeof(TopicNameResolverAttribute), true).FirstOrDefault() as TopicNameResolverAttribute; + return topicNameResolver?.ResolveTopicName(messageType, _configuration); + } + + public string GetTopicPrefix() + { + var messagingSection = _configuration.GetSection("Messaging"); + var envPrefix = messagingSection?["Env"]; + if (!string.IsNullOrWhiteSpace(envPrefix)) + { + envPrefix += "."; + } + + var topicPrefix = envPrefix ?? messagingSection?["TopicPrefix"]; + return topicPrefix ?? string.Empty; + } + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Services/MessageBusPublisher.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Services/MessageBusPublisher.cs new file mode 100644 index 0000000..dbacfa7 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Services/MessageBusPublisher.cs @@ -0,0 +1,80 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Netmash.Infrastructure.Messaging.Abstractions; +using Netmash.Infrastructure.Messaging.Constants; +using Netmash.Infrastructure.Messaging.Models; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; +using System.Threading; +using System.Threading.Tasks; + +namespace Netmash.Infrastructure.Messaging.Services +{ + internal class MessageBusPublisher : IMessageBusPublisher + { + private readonly ITopicRegistry _topicRegistry; + private readonly IMessageSerDes _messageSerDes; + private readonly IConfiguration _configuration; + private readonly ILogger _logger; + private readonly IMessagingTransport _messagingTransport; + + public MessageBusPublisher(IMessagingTransport messagingTransport, ITopicRegistry topicRegistry, + IMessageSerDes messageSerDes, IConfiguration configuration, ILogger logger) + { + _messagingTransport = messagingTransport; + _topicRegistry = topicRegistry; + _messageSerDes = messageSerDes; + _configuration = configuration; + _logger = logger; + } + + public async Task PublishAsync(T message, MessagingPublisherOptions publisherOptions = null, + CancellationToken cancellationToken = default) + { + var outgoingEnvelope = PrepareMessageEnvelope(message, publisherOptions?.EnvelopeCustomizer); + var sendContext = new TransportSendContext( + PayloadBytesAccessor: () => _messageSerDes.SerializePayload(outgoingEnvelope.Payload), + EnvelopeBytesAccessor: () => _messageSerDes.SerializeMessageEnvelope(outgoingEnvelope), + HeadersAccessor: () => outgoingEnvelope.Headers + ); + + var newTopicName = _topicRegistry.GetTopicForName(publisherOptions?.TopicName) ?? + _topicRegistry.GetTopicForMessageType(message.GetType()); + + await _messagingTransport.PublishAsync(newTopicName, sendContext, cancellationToken); + + _logger.LogDebug("Messaging publisher sent a message for subject {Subject}", newTopicName); + await Task.Yield(); + } + + private MessagingEnvelope PrepareMessageEnvelope(TMessage message, + Action customizer = null) + { + var outgoingEnvelope = new MessagingEnvelope(new Dictionary + { + [MessagingHeaders.MessageId] = Guid.NewGuid().ToString(), + [MessagingHeaders.PublishTime] = DateTime.Now.ToString(CultureInfo.InvariantCulture) + }, message); + + var sourceId = GetSourceId(); + if (!string.IsNullOrWhiteSpace(sourceId)) + { + outgoingEnvelope.Headers[MessagingHeaders.Source] = sourceId; + } + + customizer?.Invoke(outgoingEnvelope); + + //outgoingEnvelope.SetHeader(MessagingHeaders.CorrelationId, (CorrelationManager.GetCorrelationId() ?? Guid.NewGuid()).ToString()); + + return outgoingEnvelope; + } + + private string GetSourceId() + { + var sourceId = _configuration.GetSection("Messaging")?["Source"]; + return sourceId ?? ""; + } + } +} diff --git a/src/infrastructure/Netmash.Infrastructure.Messaging/Services/NewtonsoftJsonMessageSerDes.cs b/src/infrastructure/Netmash.Infrastructure.Messaging/Services/NewtonsoftJsonMessageSerDes.cs new file mode 100644 index 0000000..dde5280 --- /dev/null +++ b/src/infrastructure/Netmash.Infrastructure.Messaging/Services/NewtonsoftJsonMessageSerDes.cs @@ -0,0 +1,110 @@ +using Netmash.Infrastructure.Messaging.Abstractions; +using Netmash.Infrastructure.Messaging.Constants; +using Netmash.Infrastructure.Messaging.Extensions; +using Netmash.Infrastructure.Messaging.Models; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Dynamic; + +namespace Netmash.Infrastructure.Messaging.Services +{ + public class NewtonsoftJsonMessageSerDes : IMessageSerDes + { + private readonly IMessageTypeRegistry _messageTypeRegistry; + + public NewtonsoftJsonMessageSerDes(IMessageTypeRegistry messageTypeRegistry) + { + _messageTypeRegistry = messageTypeRegistry; + } + + public TMessage DeserializePayload(byte[] payloadBytes, IDictionary metadata = null, + MessageSerDesOptions options = null) + { + options ??= MessageSerDesOptions.Default; + var payload = Deserialize(payloadBytes); + var messageTypeId = metadata == null ? null : new MessagingEnvelope(metadata, new object()).GetMessageTypeId(); + var outputType = ResolveOutputType(messageTypeId, typeof(TMessage), options); + + return (TMessage)payload.ToObject(outputType); + } + + public (byte[] payloadBytes, IDictionary additionalMetadata) + SerializePayload(TMessage message, MessageSerDesOptions options = null) + { + var messageTypeId = _messageTypeRegistry.GetTypeId(message.GetType()); + var additionalMetadata = new Dictionary { { MessagingHeaders.MessageType, messageTypeId } }; + + return (Serialize(message), additionalMetadata); + } + + public MessagingEnvelope DeserializeMessageEnvelope(byte[] envelopeData, + MessageSerDesOptions options = null) + { + options ??= MessageSerDesOptions.Default; + var partialEnvelope = Deserialize>(envelopeData); + var messageTypeId = partialEnvelope.GetMessageTypeId(); + var outputType = ResolveOutputType(messageTypeId, typeof(TMessage), options); + + return new MessagingEnvelope(partialEnvelope.Headers, + (TMessage)partialEnvelope.Payload.ToObject(outputType)); + } + + public byte[] SerializeMessageEnvelope(MessagingEnvelope envelope, MessageSerDesOptions options = null) + { + var messageTypeId = _messageTypeRegistry.GetTypeId(envelope.Payload.GetType()); + envelope.SetHeader(MessagingHeaders.MessageType, messageTypeId, true); + + return Serialize(envelope); + } + private Type ResolveOutputType(string messageTypeId, Type expectedType, MessageSerDesOptions options = null) + { + var runtimeType = ResolveRuntimeType(messageTypeId, expectedType, options); + + return runtimeType ?? (typeof(TMessage) == typeof(object) ? typeof(ExpandoObject) : typeof(TMessage)); + } + + private Type ResolveRuntimeType(string messageTypeId, Type expectedType, MessageSerDesOptions options = null) + { + if (!options.UseDynamicDeserialization) + return null; + + if (messageTypeId == null) + { + if (expectedType == null || expectedType.IsAbstract || expectedType.IsInterface) + throw new Exception("Type information was not found in MessageType header"); + + return null; + } + + var runtimeType = + _messageTypeRegistry.ResolveType(messageTypeId, options.DynamicDeserializationScannedAssemblies); + + if (runtimeType == null) + return null; + + if (expectedType != null && !expectedType.IsAssignableFrom(runtimeType)) + throw new Exception($"Incompatible types: expected {expectedType} and found {runtimeType}"); + + return runtimeType; + } + + private T Deserialize(byte[] data) + { + var envelopeString = System.Text.Encoding.UTF8.GetString(data); + + return JsonConvert.DeserializeObject(envelopeString); + } + + private byte[] Serialize(T message) + { + var json = JsonConvert.SerializeObject(message, new JsonSerializerSettings() + { + ReferenceLoopHandling = ReferenceLoopHandling.Ignore + }); + + return System.Text.Encoding.UTF8.GetBytes(json); + } + } +}