messaging

messaging
Tudor Stanciu 2023-01-14 11:58:27 +02:00
parent f2565926f4
commit 6756ada1db
27 changed files with 1355 additions and 0 deletions

View File

@ -66,6 +66,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Netmash.Security.Authentica
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Netmash.Test.Api", "src\test\Netmash.Test.Api\Netmash.Test.Api.csproj", "{B7B073F6-91D5-4EB1-876C-76F6BD612FD9}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Netmash.Test.Api", "src\test\Netmash.Test.Api\Netmash.Test.Api.csproj", "{B7B073F6-91D5-4EB1-876C-76F6BD612FD9}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Netmash.Infrastructure.Messaging", "src\infrastructure\Netmash.Infrastructure.Messaging\Netmash.Infrastructure.Messaging.csproj", "{DC243FBB-3F39-4DBD-857F-93F2B9DBB35E}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -124,6 +126,10 @@ Global
{B7B073F6-91D5-4EB1-876C-76F6BD612FD9}.Debug|Any CPU.Build.0 = Debug|Any CPU {B7B073F6-91D5-4EB1-876C-76F6BD612FD9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B7B073F6-91D5-4EB1-876C-76F6BD612FD9}.Release|Any CPU.ActiveCfg = Release|Any CPU {B7B073F6-91D5-4EB1-876C-76F6BD612FD9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B7B073F6-91D5-4EB1-876C-76F6BD612FD9}.Release|Any CPU.Build.0 = Release|Any CPU {B7B073F6-91D5-4EB1-876C-76F6BD612FD9}.Release|Any CPU.Build.0 = Release|Any CPU
{DC243FBB-3F39-4DBD-857F-93F2B9DBB35E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DC243FBB-3F39-4DBD-857F-93F2B9DBB35E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DC243FBB-3F39-4DBD-857F-93F2B9DBB35E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DC243FBB-3F39-4DBD-857F-93F2B9DBB35E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@ -155,6 +161,7 @@ Global
{5CB39BE9-0C83-4FDC-AB26-32642EA13639} = {06D5F056-4099-4636-A45C-D6C3B2CCDD66} {5CB39BE9-0C83-4FDC-AB26-32642EA13639} = {06D5F056-4099-4636-A45C-D6C3B2CCDD66}
{D926068E-7C61-4130-B7A0-C38294F13855} = {B8132F39-6677-4D70-84CA-9747DC9086B3} {D926068E-7C61-4130-B7A0-C38294F13855} = {B8132F39-6677-4D70-84CA-9747DC9086B3}
{B7B073F6-91D5-4EB1-876C-76F6BD612FD9} = {CCEE458E-02A8-42FD-8F5F-A35481A23303} {B7B073F6-91D5-4EB1-876C-76F6BD612FD9} = {CCEE458E-02A8-42FD-8F5F-A35481A23303}
{DC243FBB-3F39-4DBD-857F-93F2B9DBB35E} = {1C1D634E-06CC-4707-9564-E31A76F27D9E}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {87541BAB-3FAC-4ADB-A7FB-8228DA87843D} SolutionGuid = {87541BAB-3FAC-4ADB-A7FB-8228DA87843D}

View File

@ -0,0 +1,21 @@
using Microsoft.Extensions.DependencyInjection;
using Netmash.Infrastructure.Messaging.Services;
namespace Netmash.Infrastructure.Messaging.Abstractions
{
public static class DependencyInjectionExtensions
{
public static IServiceCollection AddMessageBus(this IServiceCollection services)
{
services.AddSingleton<IMessageBusPublisher, MessageBusPublisher>();
// services.AddSingleton<IMessageBusSubscriber, MessageBusSubscriber>();
services.AddSingleton<ITopicRegistry, DefaultTopicRegistry>();
services.AddSingleton<IMessageSerDes, NewtonsoftJsonMessageSerDes>();
services.AddSingleton<IMessageTypeRegistry, DefaultMessageTypeRegistry>();
// services.AddSingleton<IMessageBus, MessageBus>();
// services.AddSingleton<IDeadLetterQueue, DefaultDeadLetterQueue>();
return services;
}
}
}

View File

@ -0,0 +1,15 @@
using Netmash.Infrastructure.Messaging.Models;
using System.Threading;
using System.Threading.Tasks;
namespace Netmash.Infrastructure.Messaging.Abstractions
{
public interface IMessageBusPublisher
{
Task PublishAsync<T>(T message, MessagingPublisherOptions publisherOptions = null,
CancellationToken cancellationToken = default);
Task PublishAsync<T>(T message, CancellationToken cancellationToken) =>
PublishAsync(message, null, cancellationToken);
}
}

View File

@ -0,0 +1,22 @@
using Netmash.Infrastructure.Messaging.Models;
using System.Collections.Generic;
namespace Netmash.Infrastructure.Messaging.Abstractions
{
public interface IMessageSerDes
{
byte[] SerializeMessageEnvelope(MessagingEnvelope envelope, MessageSerDesOptions options = null);
TMessage DeserializePayload<TMessage>(byte[] payloadBytes, IDictionary<string, string> medatada = null,
MessageSerDesOptions options = null);
(byte[] payloadBytes, IDictionary<string, string> additionalMetadata)
SerializePayload<TMessage>(TMessage message, MessageSerDesOptions options = null);
MessagingEnvelope<TMessage> DeserializeMessageEnvelope<TMessage>(byte[] envelopeData,
MessageSerDesOptions options = null);
MessagingEnvelope DeserializeMessageEnvelope(byte[] envelopeData, MessageSerDesOptions options = null)
=> DeserializeMessageEnvelope<object>(envelopeData, options);
}
}

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Reflection;
namespace Netmash.Infrastructure.Messaging.Abstractions
{
public interface IMessageTypeRegistry
{
Type ResolveType(string messageTypeId, IEnumerable<Assembly> scannedAssemblies);
string GetTypeId(Type messageType);
}
}

View File

@ -0,0 +1,50 @@
using Netmash.Infrastructure.Messaging.Models;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Netmash.Infrastructure.Messaging.Abstractions
{
public interface IMessagingTransport
{
/// <summary>
/// Subscribes a message handler to the given topic
/// </summary>
/// <param name="topic">The topic/channel to subscribe to</param>
/// <param name="handler">The message handler</param>
/// <param name="options">Subscription options</param>
/// <param name="cancellationToken"></param>
/// <returns>An object that when disposed unsubscribes the handler from the topic</returns>
Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task> handler,
SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default);
/// <summary>
/// Publishes a message to a topic
/// </summary>
/// <param name="topic">The topic/channel to publish to</param>
/// <param name="sendContext">The context: message and headers</param>
/// <param name="cancellationToken"></param>
Task PublishAsync(string topic, TransportSendContext sendContext, CancellationToken cancellationToken = default);
}
public delegate void TransportErrorHandler(Exception ex);
public interface ITransportMonitor
{
event TransportErrorHandler OnError;
}
public record TransportSendContext(
Func<(byte[] payloadData, IDictionary<string, string> additionalHeaders)> PayloadBytesAccessor,
Func<byte[]> EnvelopeBytesAccessor,
Func<IDictionary<string, string>> HeadersAccessor);
public record TransportReceiveContext(TransportReceivedData ReceivedData);
public abstract record TransportReceivedData
{
public record EnvelopeBytes(byte[] Bytes) : TransportReceivedData;
public record PayloadBytesAndHeaders(byte[] PayloadBytes, IDictionary<string, string> headers) : TransportReceivedData;
}
}

View File

@ -0,0 +1,11 @@
using System;
namespace Netmash.Infrastructure.Messaging.Abstractions
{
public interface ITopicRegistry
{
string GetTopicForMessageType(Type messageType, bool includePrefix = true);
string GetTopicForName(string topicName, bool includePrefix = true);
string GetTopicPrefix();
}
}

View File

@ -0,0 +1,15 @@
using System;
namespace Netmash.Infrastructure.Messaging.Attributes
{
[AttributeUsage(AttributeTargets.Class)]
public class MessageTypeIdAttribute : Attribute
{
public string MessageTypeId { get; }
public MessageTypeIdAttribute(string messageTypeId)
{
MessageTypeId = messageTypeId;
}
}
}

View File

@ -0,0 +1,10 @@
using Microsoft.Extensions.Configuration;
using System;
namespace Netmash.Infrastructure.Messaging.Attributes
{
public abstract class TopicNameResolverAttribute : Attribute
{
public abstract string ResolveTopicName(Type messageType, IConfiguration configuration);
}
}

View File

@ -0,0 +1,11 @@
namespace Netmash.Infrastructure.Messaging.Constants
{
public static class MessagingHeaders
{
public const string
MessageType = "X-MESSAGE-TYPE",
Source = "X-SOURCE",
MessageId = "X-MESSAGE-ID",
PublishTime = "X-PUBLISH-TIME";
}
}

View File

@ -0,0 +1,24 @@
using Netmash.Infrastructure.Messaging.Constants;
using Netmash.Infrastructure.Messaging.Models;
namespace Netmash.Infrastructure.Messaging.Extensions
{
public static class MessagingEnvelopeExtensions
{
public static string GetMessageTypeId(this MessagingEnvelope envelope)
{
return envelope.Headers.TryGetValue(MessagingHeaders.MessageType, out var value)
? value
: null;
}
public static void SetHeader(this MessagingEnvelope envelope, string header, string value, bool overwrite = false)
{
if (!overwrite && envelope.Headers.ContainsKey(header) && !string.IsNullOrEmpty(envelope.Headers[header]))
return;
envelope.Headers[header] = value;
}
}
}

View File

@ -0,0 +1,85 @@
using System;
using System.Linq;
using System.Text;
namespace Netmash.Infrastructure.Messaging.Extensions
{
internal static class TypeExtensions
{
public static string GetPrettyName(this Type type)
{
var retType = new StringBuilder();
if (type.IsGenericType)
{
var parentTypeName = type.FullName?.Split('`')[0].Split('.').Last();
// We will build the type here.
Type[] arguments = type.GetGenericArguments();
var argList = new StringBuilder();
foreach (Type t in arguments)
{
// Let's make sure we get the argument list.
string arg = t.GetPrettyName();
if (argList.Length > 0)
{
argList.AppendFormat(", {0}", arg);
}
else
{
argList.Append(arg);
}
}
if (argList.Length > 0)
{
retType.AppendFormat("{0}<{1}>", parentTypeName, argList.ToString());
}
}
else
{
return type.FullName?.Split('.').Last() ?? type.Name;
}
return retType.ToString();
}
public static string GetLongPrettyName(this Type type)
{
var retType = new StringBuilder();
if (type.IsGenericType)
{
var parentTypeName = type.FullName?.Split('`')[0];
// We will build the type here.
Type[] arguments = type.GetGenericArguments();
var argList = new StringBuilder();
foreach (Type t in arguments)
{
// Let's make sure we get the argument list.
string arg = t.GetLongPrettyName();
if (argList.Length > 0)
{
argList.AppendFormat(", {0}", arg);
}
else
{
argList.Append(arg);
}
}
if (argList.Length > 0)
{
retType.AppendFormat("{0}<{1}>", parentTypeName, argList.ToString());
}
}
else
{
return type.FullName;
}
return retType.ToString();
}
}
}

View File

@ -0,0 +1,29 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Netmash.Infrastructure.Messaging.Abstractions;
using Netmash.Infrastructure.Messaging.Host.Models;
using System;
namespace Netmash.Infrastructure.Messaging.Host
{
public static class DependencyInjectionExtensions
{
public static IServiceCollection AddMessagingHost(this IServiceCollection serviceCollection, IConfiguration configuration, Action<IMessagingHostBuilder> configure)
{
serviceCollection.Configure<MessagingHostOptions>(configuration.GetSection("Messaging").GetSection("Host"));
serviceCollection.AddSingleton<IMessagingHost, MessagingHost>();
serviceCollection.AddHostedService<MessagingHostService>();
serviceCollection.AddSingleton<MessagingContextAccessor>();
serviceCollection.Decorate<IMessageBusPublisher, MessagingContextBusPublisherDecorator>();
serviceCollection.TryAddSingleton(serviceCollection);
var builder = new MessagingHostBuilder(serviceCollection);
configure.Invoke(builder);
return serviceCollection;
}
}
}

View File

@ -0,0 +1,14 @@
namespace Netmash.Infrastructure.Messaging.Host.Models
{
public class MessagingHostOptions
{
public TransportErrorStrategy TransportErrorStrategy { get; set; } = TransportErrorStrategy.Retry;
public int StartRetryCount { get; set; } = 10;
}
public enum TransportErrorStrategy
{
Throw = 0,
Retry = 1
}
}

View File

@ -0,0 +1,28 @@
using System.Collections.Generic;
using System.Reflection;
namespace Netmash.Infrastructure.Messaging.Models
{
/// <summary>
/// Options for serialization/deserialization
/// </summary>
public record MessageSerDesOptions
{
/// <summary>
/// Options for message deserialization.
/// </summary>
/// <value>
/// The type of deserialization.
/// </value>
public bool UseDynamicDeserialization { get; init; }
/// <summary>
/// he list of assemblies to be scanned in case of the 'Dynamic' deserialization option.
/// </summary>
/// <value>
/// The scanned assemblies used in dynamic deserialization.
/// </value>
public IEnumerable<Assembly> DynamicDeserializationScannedAssemblies { get; init; }
public static MessageSerDesOptions Default { get; } = new();
}
}

View File

@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
namespace Netmash.Infrastructure.Messaging.Models
{
public record MessagingEnvelope
{
public IDictionary<string, string> Headers { get; }
public object Payload { get; }
public MessagingEnvelope(IDictionary<string, string> headers, object payload)
{
Headers = headers ?? throw new ArgumentNullException(nameof(headers));
Payload = payload ?? throw new ArgumentNullException(nameof(payload));
}
}
public record MessagingEnvelope<TMessage> : MessagingEnvelope
{
public MessagingEnvelope(IDictionary<string, string> headers, TMessage payload)
: base(headers, payload)
{
}
public new TMessage Payload => (TMessage)base.Payload;
}
}

View File

@ -0,0 +1,13 @@
using System;
namespace Netmash.Infrastructure.Messaging.Models
{
public record MessagingPublisherOptions
{
public Action<MessagingEnvelope> EnvelopeCustomizer { get; init; }
public string TopicName { get; init; }
public static MessagingPublisherOptions Default = new();
}
}

View File

@ -0,0 +1,50 @@
namespace Netmash.Infrastructure.Messaging.Models
{
public record SubscriptionTransportOptions
{
/// <summary>
/// Specifies if a durable subscription should be made (default true).
/// </summary>
public bool IsDurable { get; init; } = true;
/// <summary>
/// Specifies if a consumer group should be used (default true).
/// </summary>
public bool UseGroup { get; init; } = true;
/// <summary>
/// Specifies the maximum number of messages that can be handled concurrently (default 1).
/// </summary>
public int MaxConcurrentMessages { get; init; } = 1;
/// <summary>
/// If set, only new messages are delivered. Otherwise all available messages are delivered even if published before the subscription.
/// </summary>
public bool DeliverNewMessagesOnly { get; set; } = true;
/// <summary>
/// Timeout before message redelivery (in milliseconds)
/// </summary>
public int? AckWait { get; init; } = null;
/// <summary>
/// Default transport options
/// </summary>
public static SubscriptionTransportOptions Default => new();
/// <summary>
/// Stream processor transport options (with durable subscription)
/// </summary>
public static SubscriptionTransportOptions StreamProcessor => new()
{ DeliverNewMessagesOnly = false };
/// <summary>
/// Request/Reply transport options (non-durable
/// </summary>
public static SubscriptionTransportOptions RequestReply => new()
{ IsDurable = false, UseGroup = false };
public static SubscriptionTransportOptions PubSub => new()
{ IsDurable = false };
}
}

View File

@ -0,0 +1,20 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Netmash.Infrastructure.Messaging.Abstractions;
using Netmash.Infrastructure.Messaging.Nats.Records;
namespace Netmash.Infrastructure.Messaging.Nats
{
public static class DependencyInjectionExtensions
{
public static IServiceCollection AddNatsTransport(this IServiceCollection services, IConfiguration configuration)
{
services.Configure<NatsOptions>(configuration.GetSection("Messaging").GetSection("Nats"));
services.AddSingleton<StanConnectionProvider>();
services.AddSingleton<IMessagingTransport, StanMessagingTransport>();
services.AddSingleton<ITransportMonitor>(sp => sp.GetRequiredService<StanConnectionProvider>());
return services;
}
}
}

View File

@ -0,0 +1,35 @@
namespace Netmash.Infrastructure.Messaging.Nats.Records
{
internal record NatsOptions
{
/// <summary>
/// URL of the Streaming NATS cluster
/// </summary>
public string NatsUrl { get; set; }
/// <summary>
/// Streaming NATS cluster name
/// </summary>
public string Cluster { get; set; }
/// <summary>
/// Identifier of the Streaming NATS client
/// </summary>
public string ClientId { get; set; }
/// <summary>
/// Durable subscriptions name.
/// </summary>
public string DurableName { get; set; }
/// <summary>
/// Queue group name
/// </summary>
public string QGroup { get; set; }
/// <summary>
/// The time the server awaits for acknowledgement from the client before redelivering the message (in milliseconds)
/// </summary>
public int? AckWait { get; set; }
}
}

View File

@ -0,0 +1,163 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NATS.Client;
using Netmash.Infrastructure.Messaging.Abstractions;
using Netmash.Infrastructure.Messaging.Nats.Records;
using STAN.Client;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Netmash.Infrastructure.Messaging.Nats
{
internal class StanConnectionProvider : IDisposable, ITransportMonitor
{
private readonly IOptions<NatsOptions> _natsOptions;
private readonly ILogger<StanConnectionProvider> _logger;
private IStanConnection _connection;
private Exception _unrecoverableException;
private AtomicLazy<IStanConnection> _lazyConnection;
public event TransportErrorHandler OnError;
public StanConnectionProvider(IOptions<NatsOptions> natsOptions, ILogger<StanConnectionProvider> logger)
{
_natsOptions = natsOptions;
_logger = logger;
_lazyConnection = new AtomicLazy<IStanConnection>(GetConnection);
}
public async Task ExecuteAsync(Func<IStanConnection, Task> action)
{
var connection = GetAndCheckConnection();
await action(connection);
}
public void Execute(Action<IStanConnection> action)
{
var connection = GetAndCheckConnection();
action(connection);
}
private IStanConnection GetAndCheckConnection()
{
ThrowIfUnrecoverableState();
return _lazyConnection.Value;
}
private IStanConnection GetConnection()
{
var clientId = _natsOptions.Value.ClientId?.Replace(".", "_");
var options = StanOptions.GetDefaultOptions();
options.NatsURL = _natsOptions.Value.NatsUrl;
options.ConnectionLostEventHandler = (_, args) =>
{
SetConnectionLostState(args.ConnectionException ?? new Exception("NATS connection was lost"));
};
//fix https://github.com/nats-io/csharp-nats-streaming/issues/28
options.PubAckWait = 30000;
var cf = new StanConnectionFactory();
_connection = cf.CreateConnection(_natsOptions.Value.Cluster, clientId + Guid.NewGuid(), options);
_logger.LogInformation($"NATS connection to {_natsOptions.Value.NatsUrl} was established");
return _connection;
}
private void SetConnectionLostState(Exception exception)
{
// Set the field to the current exception if not already set
var existingException =
Interlocked.CompareExchange(ref _unrecoverableException, exception, null);
// Send the application stop signal only once
if (existingException != null)
return;
_logger.LogError(exception, "NATS connection unrecoverable");
ResetConnection();
OnError?.Invoke(exception);
}
private void ResetConnection()
{
_connection?.Dispose();
_connection = null;
_lazyConnection = new AtomicLazy<IStanConnection>(GetConnection);
Interlocked.CompareExchange(ref _unrecoverableException, null, _unrecoverableException);
_logger.LogInformation("NATS connection was reset");
}
private void ThrowIfUnrecoverableState()
{
// For consistency, read the field using the same primitive used for writing instead of using Thread.VolatileRead
var exception = Interlocked.CompareExchange(ref _unrecoverableException, null, null);
if (exception != null)
{
throw new Exception("NATS connection encountered an unrecoverable exception", exception);
}
}
private static bool IsUnrecoverableException(Exception ex) =>
ex is NATSConnectionClosedException ||
ex is StanConnectionClosedException ||
ex is NATSConnectionException ||
ex is StanConnectionException ||
ex is NATSBadSubscriptionException ||
ex is StanBadSubscriptionException ||
ex is StanTimeoutException ||
ex is NATSTimeoutException ||
ex is NATSStaleConnectionException ||
ex is NATSNoServersException ||
ex is StanConnectRequestException ||
ex is StanMaxPingsException;
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
_connection?.Dispose();
}
}
private class AtomicLazy<T>
{
private readonly Func<T> _factory;
private T _value;
private bool _initialized;
private object _lock;
public AtomicLazy(Func<T> factory)
{
_factory = factory;
}
public AtomicLazy(T value)
{
_value = value;
_initialized = true;
}
public T Value => LazyInitializer.EnsureInitialized(ref _value, ref _initialized, ref _lock, _factory);
}
}
}

View File

@ -0,0 +1,75 @@
using Microsoft.Extensions.Options;
using Netmash.Infrastructure.Messaging.Abstractions;
using Netmash.Infrastructure.Messaging.Models;
using Netmash.Infrastructure.Messaging.Nats.Records;
using STAN.Client;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Netmash.Infrastructure.Messaging.Nats
{
internal class StanMessagingTransport : IMessagingTransport
{
private readonly StanConnectionProvider _stanConnectionManager;
private readonly IOptions<NatsOptions> _natsOptions;
public StanMessagingTransport(StanConnectionProvider stanConnectionManager, IOptions<NatsOptions> natsOptions)
{
_stanConnectionManager = stanConnectionManager;
_natsOptions = natsOptions;
}
public Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task> handler,
SubscriptionTransportOptions options = null,
CancellationToken cancellationToken = default)
{
var opts = StanSubscriptionOptions.GetDefaultOptions();
var subscriberOptions = options ?? SubscriptionTransportOptions.Default;
if (subscriberOptions.IsDurable)
{
opts.DurableName = _natsOptions.Value.DurableName;
opts.LeaveOpen = true;
}
if (!subscriberOptions.DeliverNewMessagesOnly)
{
opts.DeliverAllAvailable();
}
// https://github.com/nats-io/stan.go#subscriber-rate-limiting
opts.MaxInflight = subscriberOptions.MaxConcurrentMessages;
opts.AckWait = subscriberOptions.AckWait ?? _natsOptions.Value.AckWait ?? 50000;
opts.ManualAcks = true;
void StanMsgHandler(object obj, StanMsgHandlerArgs args)
{
if (cancellationToken.IsCancellationRequested)
return;
var receiveContext = new TransportReceiveContext(new TransportReceivedData.EnvelopeBytes(args.Message.Data));
// Fire and forget
_ = handler(receiveContext).ContinueWith(_ => args.Message.Ack(), cancellationToken);
}
IDisposable subscription = null;
_stanConnectionManager.Execute(stanConnection =>
{
subscription = subscriberOptions.UseGroup
? stanConnection.Subscribe(topic, _natsOptions.Value.QGroup, opts, StanMsgHandler)
: stanConnection.Subscribe(topic, opts, StanMsgHandler);
});
return Task.FromResult(subscription);
}
public Task PublishAsync(string topic, TransportSendContext sendContext, CancellationToken cancellationToken = default)
{
var envelopeData = sendContext.EnvelopeBytesAccessor.Invoke();
return _stanConnectionManager.ExecuteAsync(
async connection => await connection.PublishAsync(topic, envelopeData));
}
}
}

View File

@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.3" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="6.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="STAN.Client" Version="0.3.0" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,348 @@
using Netmash.Infrastructure.Messaging.Abstractions;
using Netmash.Infrastructure.Messaging.Attributes;
using Netmash.Infrastructure.Messaging.Extensions;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
namespace Netmash.Infrastructure.Messaging.Services
{
public class DefaultMessageTypeRegistry : IMessageTypeRegistry
{
private readonly ConcurrentDictionary<string, Type> _typeCache = new();
public Type ResolveType(string messageTypeId, IEnumerable<Assembly> scannedAssemblies)
{
var scannedAssembliesList = scannedAssemblies?.ToList() ?? new List<Assembly>();
return _typeCache.GetOrAdd(messageTypeId, key =>
GetTypeByAttribute(key, scannedAssembliesList) ??
GetTypeByName(key, scannedAssembliesList) ??
throw new ApplicationException($"Type could not be resolved: {key}")
);
}
public string GetTypeId(Type messageType)
{
return messageType.GetCustomAttribute<MessageTypeIdAttribute>()?.MessageTypeId
?? TypeInfo.Build(messageType).ToString();
}
private static string GetTypeNameIncludingNesting(Type type)
{
return type.DeclaringType == null ?
type.Name :
$"{GetTypeNameIncludingNesting(type.DeclaringType)}{TypeInfo.NestedSeparator}{type.Name}";
}
private Type GetTypeByName(string messageTypeId, List<Assembly> scannedAssemblies)
{
var parsedType = TypeInfo.Parse(messageTypeId);
return GetTypeByNameInternal(parsedType, scannedAssemblies);
}
private Type GetTypeByNameInternal(TypeInfo parsedType, List<Assembly> scannedAssemblies)
{
var dotnetFormattedName = parsedType.GetNameForDotnetComparison();
var foundType = WellKnownTypes.Resolve(dotnetFormattedName);
if (foundType != null)
return foundType;
var foundTypes = scannedAssemblies
.SelectMany(x => x.ExportedTypes)
.Where(x => GetTypeNameIncludingNesting(x) == dotnetFormattedName)
.ToList();
if (foundTypes.Count > 1)
throw new ApplicationException($"Multiple types found with name: {foundTypes.First().GetPrettyName()}");
foundType = foundTypes.FirstOrDefault();
if (foundType == null)
return null;
if (parsedType.IsGeneric && parsedType.TypeArguments.Any())
{
var foundTypeArgs = new List<Type>();
foreach (var parsedTypeTypeArgument in parsedType.GetAllTypeArguments())
{
var foundTypeArgument = GetTypeByNameInternal(parsedTypeTypeArgument, scannedAssemblies);
if (foundTypeArgument == null)
return null;
foundTypeArgs.Add(foundTypeArgument);
}
foundType = foundType.MakeGenericType(foundTypeArgs.ToArray());
}
return parsedType.ArrayDimensions.AsEnumerable()
.Reverse()
.Aggregate(foundType, (current, arrayDimension) => arrayDimension.Dimensions == 1
? current.MakeArrayType()
: current.MakeArrayType(arrayDimension.Dimensions));
}
private Type GetTypeByAttribute(string messageTypeId, List<Assembly> scannedAssemblies)
{
var foundTypes = scannedAssemblies
.SelectMany(x => x.ExportedTypes)
.Where(x => x.GetCustomAttribute<MessageTypeIdAttribute>()?.MessageTypeId == messageTypeId)
.ToList();
if (foundTypes.Count > 1)
throw new ApplicationException(
$"Multiple types found with the same MessageTypeIdAttribute: {messageTypeId}");
return foundTypes.FirstOrDefault();
}
private class TypeInfo
{
public const char NestedSeparator = '+';
private string _name;
private string Name
{
get => _name;
set => _name ??= value?.Trim();
}
public bool IsGeneric { get; private set; }
public List<ArrayDimension> ArrayDimensions { get; private set; }
public List<TypeInfo> TypeArguments { get; private set; }
private TypeInfo DeclaringTypeInfo { get; set; }
public class ArrayDimension
{
public int Dimensions { get; set; }
public ArrayDimension()
{
Dimensions = 1;
}
public override string ToString()
{
return $"[{new string(',', Dimensions - 1)}]";
}
}
private TypeInfo()
{
Name = null;
IsGeneric = false;
ArrayDimensions = new List<ArrayDimension>();
TypeArguments = new List<TypeInfo>();
}
public override string ToString()
{
var str = new StringBuilder();
if (DeclaringTypeInfo != null)
{
str.Append(DeclaringTypeInfo).Append(NestedSeparator);
}
str.Append(Name);
if (IsGeneric)
str.Append($"<{string.Join(",", TypeArguments.Select(tn => tn.ToString()))}>");
foreach (var d in ArrayDimensions)
str.Append(d);
return str.ToString();
}
public string GetNameForDotnetComparison()
{
var formattedExpectedTypeName = IsGeneric ? $"{Name}`{TypeArguments.Count}" : Name;
return DeclaringTypeInfo != null
? $"{DeclaringTypeInfo.GetNameForDotnetComparison()}{NestedSeparator}{formattedExpectedTypeName}"
: formattedExpectedTypeName;
}
public List<TypeInfo> GetAllTypeArguments()
{
var allArguments = new List<TypeInfo>();
if (DeclaringTypeInfo != null)
{
allArguments.AddRange(DeclaringTypeInfo.GetAllTypeArguments());
}
allArguments.AddRange(TypeArguments);
return allArguments;
}
public static TypeInfo Parse(string name)
{
var pos = 0;
return ParseInternal(name, ref pos, out _);
}
public static TypeInfo Build(Type type)
{
if (type.IsArray)
{
var elementTypeName = Build(type.GetElementType());
elementTypeName.ArrayDimensions.Add(
new ArrayDimension { Dimensions = type.GetArrayRank() });
elementTypeName.ArrayDimensions.Reverse();
return elementTypeName;
}
var typeName = new TypeInfo
{
Name = type.IsGenericType ? type.Name.Substring(0, type.Name.IndexOf('`')) : type.Name,
IsGeneric = type.IsGenericType,
TypeArguments = type.GenericTypeArguments
.Select(Build).ToList(),
};
if (type.DeclaringType != null)
{
typeName.DeclaringTypeInfo = BuildDeclaringTypeName(type, typeName);
}
return typeName;
}
private static TypeInfo BuildDeclaringTypeName(Type type, TypeInfo typeInfo)
{
if (type.DeclaringType == null)
return null;
var declaringTypeName = Build(type.DeclaringType);
var declaringTypeGenericParamsCount = ((System.Reflection.TypeInfo)type.DeclaringType).GenericTypeParameters.Length;
declaringTypeName.TypeArguments.AddRange(typeInfo.TypeArguments.GetRange(0, declaringTypeGenericParamsCount));
typeInfo.TypeArguments.RemoveRange(0, declaringTypeGenericParamsCount);
return declaringTypeName;
}
private static TypeInfo ParseInternal(string name, ref int index, out bool listTerminated)
{
var nameBuilder = new StringBuilder();
var typeName = new TypeInfo();
listTerminated = true;
while (index < name.Length)
{
char currentChar = name[index++];
switch (currentChar)
{
case ',':
typeName.Name = nameBuilder.ToString();
listTerminated = false;
return typeName;
case '>':
typeName.Name = nameBuilder.ToString();
listTerminated = true;
return typeName;
case NestedSeparator:
typeName.Name = nameBuilder.ToString();
typeName = new TypeInfo { DeclaringTypeInfo = typeName };
nameBuilder.Clear();
break;
case '<':
{
typeName.Name = nameBuilder.ToString();
typeName.IsGeneric = true;
nameBuilder.Clear();
bool terminated = false;
while (!terminated)
{
typeName.TypeArguments.Add(ParseInternal(name, ref index, out terminated));
}
if (name[index - 1] != '>')
throw new Exception("Missing closing > of generic type list.");
break;
}
case '[':
var arrayDimension = new ArrayDimension();
typeName.ArrayDimensions.Add(arrayDimension);
if (index >= name.Length)
throw new Exception("Expecting ] or , after [ for array type, but reached end of string.");
bool arrayTerminated = false;
while (index < name.Length && !arrayTerminated)
{
currentChar = name[index++];
switch (currentChar)
{
case ']':
arrayTerminated = true;//array specifier terminated
break;
case ',': //multidimensional array
arrayDimension.Dimensions++;
break;
default:
throw new Exception($@"Expecting ""]"" or "","" after ""["" for array specifier but encountered ""{currentChar}"".");
}
}
break;
default:
nameBuilder.Append(currentChar);
continue;
}
}
typeName.Name = nameBuilder.ToString();
return typeName;
}
}
private static class WellKnownTypes
{
private static readonly Dictionary<string, Type> Map = new()
{
["bool"] = typeof(bool),
["Boolean"] = typeof(bool),
["byte"] = typeof(byte),
["Byte"] = typeof(byte),
["sbyte"] = typeof(sbyte),
["SByte"] = typeof(sbyte),
["char"] = typeof(char),
["Char"] = typeof(char),
["decimal"] = typeof(decimal),
["Decimal"] = typeof(decimal),
["double"] = typeof(double),
["Double"] = typeof(decimal),
["float"] = typeof(float),
["Float"] = typeof(float),
["int"] = typeof(int),
["Int32"] = typeof(int),
["uint"] = typeof(uint),
["UInt32"] = typeof(uint),
["long"] = typeof(long),
["Int64"] = typeof(long),
["ulong"] = typeof(ulong),
["UInt64"] = typeof(ulong),
["object"] = typeof(object),
["Object"] = typeof(object),
["ushort"] = typeof(ushort),
["UInt16"] = typeof(ushort),
["short"] = typeof(short),
["Int16"] = typeof(short),
["string"] = typeof(string),
["String"] = typeof(string),
["List`1"] = typeof(List<>),
["Dictionary`2"] = typeof(Dictionary<,>)
};
public static Type Resolve(string typeName)
{
return Map.TryGetValue(typeName, out var type) ? type : null;
}
}
}
}

View File

@ -0,0 +1,62 @@
using Microsoft.Extensions.Configuration;
using Netmash.Infrastructure.Messaging.Abstractions;
using Netmash.Infrastructure.Messaging.Attributes;
using Netmash.Infrastructure.Messaging.Extensions;
using System;
using System.Linq;
namespace Netmash.Infrastructure.Messaging.Services
{
internal class DefaultTopicRegistry : ITopicRegistry
{
private readonly IConfiguration _configuration;
public DefaultTopicRegistry(IConfiguration configuration)
{
_configuration = configuration;
}
public string GetTopicForMessageType(Type messageType, bool includePrefix = true)
{
var topic = GetTopicNameFromAttribute(messageType) ?? messageType.GetLongPrettyName();
topic = GetTopicForName(topic, includePrefix);
return topic;
}
public string GetTopicForName(string topicName, bool includePrefix = true)
{
if (topicName == null)
{
return null;
}
var topic = (includePrefix ? GetTopicPrefix() : string.Empty) + topicName;
topic = topic.Replace("+", ".");
topic = topic.Replace("<", "_");
topic = topic.Replace(">", "_");
return topic;
}
private string GetTopicNameFromAttribute(Type messageType)
{
var topicNameResolver = messageType.GetCustomAttributes(typeof(TopicNameResolverAttribute), true).FirstOrDefault() as TopicNameResolverAttribute;
return topicNameResolver?.ResolveTopicName(messageType, _configuration);
}
public string GetTopicPrefix()
{
var messagingSection = _configuration.GetSection("Messaging");
var envPrefix = messagingSection?["Env"];
if (!string.IsNullOrWhiteSpace(envPrefix))
{
envPrefix += ".";
}
var topicPrefix = envPrefix ?? messagingSection?["TopicPrefix"];
return topicPrefix ?? string.Empty;
}
}
}

View File

@ -0,0 +1,80 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Netmash.Infrastructure.Messaging.Abstractions;
using Netmash.Infrastructure.Messaging.Constants;
using Netmash.Infrastructure.Messaging.Models;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
namespace Netmash.Infrastructure.Messaging.Services
{
internal class MessageBusPublisher : IMessageBusPublisher
{
private readonly ITopicRegistry _topicRegistry;
private readonly IMessageSerDes _messageSerDes;
private readonly IConfiguration _configuration;
private readonly ILogger<MessageBusPublisher> _logger;
private readonly IMessagingTransport _messagingTransport;
public MessageBusPublisher(IMessagingTransport messagingTransport, ITopicRegistry topicRegistry,
IMessageSerDes messageSerDes, IConfiguration configuration, ILogger<MessageBusPublisher> logger)
{
_messagingTransport = messagingTransport;
_topicRegistry = topicRegistry;
_messageSerDes = messageSerDes;
_configuration = configuration;
_logger = logger;
}
public async Task PublishAsync<T>(T message, MessagingPublisherOptions publisherOptions = null,
CancellationToken cancellationToken = default)
{
var outgoingEnvelope = PrepareMessageEnvelope(message, publisherOptions?.EnvelopeCustomizer);
var sendContext = new TransportSendContext(
PayloadBytesAccessor: () => _messageSerDes.SerializePayload(outgoingEnvelope.Payload),
EnvelopeBytesAccessor: () => _messageSerDes.SerializeMessageEnvelope(outgoingEnvelope),
HeadersAccessor: () => outgoingEnvelope.Headers
);
var newTopicName = _topicRegistry.GetTopicForName(publisherOptions?.TopicName) ??
_topicRegistry.GetTopicForMessageType(message.GetType());
await _messagingTransport.PublishAsync(newTopicName, sendContext, cancellationToken);
_logger.LogDebug("Messaging publisher sent a message for subject {Subject}", newTopicName);
await Task.Yield();
}
private MessagingEnvelope<TMessage> PrepareMessageEnvelope<TMessage>(TMessage message,
Action<MessagingEnvelope> customizer = null)
{
var outgoingEnvelope = new MessagingEnvelope<TMessage>(new Dictionary<string, string>
{
[MessagingHeaders.MessageId] = Guid.NewGuid().ToString(),
[MessagingHeaders.PublishTime] = DateTime.Now.ToString(CultureInfo.InvariantCulture)
}, message);
var sourceId = GetSourceId();
if (!string.IsNullOrWhiteSpace(sourceId))
{
outgoingEnvelope.Headers[MessagingHeaders.Source] = sourceId;
}
customizer?.Invoke(outgoingEnvelope);
//outgoingEnvelope.SetHeader(MessagingHeaders.CorrelationId, (CorrelationManager.GetCorrelationId() ?? Guid.NewGuid()).ToString());
return outgoingEnvelope;
}
private string GetSourceId()
{
var sourceId = _configuration.GetSection("Messaging")?["Source"];
return sourceId ?? "";
}
}
}

View File

@ -0,0 +1,110 @@
using Netmash.Infrastructure.Messaging.Abstractions;
using Netmash.Infrastructure.Messaging.Constants;
using Netmash.Infrastructure.Messaging.Extensions;
using Netmash.Infrastructure.Messaging.Models;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Dynamic;
namespace Netmash.Infrastructure.Messaging.Services
{
public class NewtonsoftJsonMessageSerDes : IMessageSerDes
{
private readonly IMessageTypeRegistry _messageTypeRegistry;
public NewtonsoftJsonMessageSerDes(IMessageTypeRegistry messageTypeRegistry)
{
_messageTypeRegistry = messageTypeRegistry;
}
public TMessage DeserializePayload<TMessage>(byte[] payloadBytes, IDictionary<string, string> metadata = null,
MessageSerDesOptions options = null)
{
options ??= MessageSerDesOptions.Default;
var payload = Deserialize<JObject>(payloadBytes);
var messageTypeId = metadata == null ? null : new MessagingEnvelope(metadata, new object()).GetMessageTypeId();
var outputType = ResolveOutputType<TMessage>(messageTypeId, typeof(TMessage), options);
return (TMessage)payload.ToObject(outputType);
}
public (byte[] payloadBytes, IDictionary<string, string> additionalMetadata)
SerializePayload<TMessage>(TMessage message, MessageSerDesOptions options = null)
{
var messageTypeId = _messageTypeRegistry.GetTypeId(message.GetType());
var additionalMetadata = new Dictionary<string, string> { { MessagingHeaders.MessageType, messageTypeId } };
return (Serialize(message), additionalMetadata);
}
public MessagingEnvelope<TMessage> DeserializeMessageEnvelope<TMessage>(byte[] envelopeData,
MessageSerDesOptions options = null)
{
options ??= MessageSerDesOptions.Default;
var partialEnvelope = Deserialize<MessagingEnvelope<JObject>>(envelopeData);
var messageTypeId = partialEnvelope.GetMessageTypeId();
var outputType = ResolveOutputType<TMessage>(messageTypeId, typeof(TMessage), options);
return new MessagingEnvelope<TMessage>(partialEnvelope.Headers,
(TMessage)partialEnvelope.Payload.ToObject(outputType));
}
public byte[] SerializeMessageEnvelope(MessagingEnvelope envelope, MessageSerDesOptions options = null)
{
var messageTypeId = _messageTypeRegistry.GetTypeId(envelope.Payload.GetType());
envelope.SetHeader(MessagingHeaders.MessageType, messageTypeId, true);
return Serialize(envelope);
}
private Type ResolveOutputType<TMessage>(string messageTypeId, Type expectedType, MessageSerDesOptions options = null)
{
var runtimeType = ResolveRuntimeType(messageTypeId, expectedType, options);
return runtimeType ?? (typeof(TMessage) == typeof(object) ? typeof(ExpandoObject) : typeof(TMessage));
}
private Type ResolveRuntimeType(string messageTypeId, Type expectedType, MessageSerDesOptions options = null)
{
if (!options.UseDynamicDeserialization)
return null;
if (messageTypeId == null)
{
if (expectedType == null || expectedType.IsAbstract || expectedType.IsInterface)
throw new Exception("Type information was not found in MessageType header");
return null;
}
var runtimeType =
_messageTypeRegistry.ResolveType(messageTypeId, options.DynamicDeserializationScannedAssemblies);
if (runtimeType == null)
return null;
if (expectedType != null && !expectedType.IsAssignableFrom(runtimeType))
throw new Exception($"Incompatible types: expected {expectedType} and found {runtimeType}");
return runtimeType;
}
private T Deserialize<T>(byte[] data)
{
var envelopeString = System.Text.Encoding.UTF8.GetString(data);
return JsonConvert.DeserializeObject<T>(envelopeString);
}
private byte[] Serialize<T>(T message)
{
var json = JsonConvert.SerializeObject(message, new JsonSerializerSettings()
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore
});
return System.Text.Encoding.UTF8.GetBytes(json);
}
}
}