Skip to content

Commit

Permalink
Add support for async streams from command execution
Browse files Browse the repository at this point in the history
Adds:

```
await foreach (var file in bus.ExecuteStream(new FindDocuments("*.json")))
    Console.WriteLine(file);
```

Fixes #113
  • Loading branch information
kzu committed Jan 29, 2024
1 parent 5efc5c7 commit e6efbd9
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 39 deletions.
41 changes: 41 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ for execution:
| value-returning synchronous command | `ICommand<TResult>` | `var result = await IMessageBus.Execute(command)` |
| void asynchronous command | `IAsyncCommand` | `await IMessageBus.ExecuteAsync(command)` |
| value-returning asynchronous command | `IAsyncCommand<TResult>` | `var result = await IMessageBus.ExecuteAsync(command)` |
| async stream command | `IStreamCommand<TResult>` | `await foreach(var item in IMessageBus.ExecuteStream(command))` |

The sample command shown before can be executed using the following code:

Expand Down Expand Up @@ -134,6 +135,8 @@ public interface IMessageBus
Task ExecuteAsync(IAsyncCommand command, CancellationToken cancellation);
// async value-returning
Task<TResult> ExecuteAsync<TResult>(IAsyncCommand<TResult> command, CancellationToken cancellation);
// async stream
IAsyncEnumerable<TResult> ExecuteStream<TResult>(IStreamCommand<TResult> command, CancellationToken cancellation);
}
```

Expand Down Expand Up @@ -182,6 +185,9 @@ public interface ICommandHandler<in TCommand, out TResult> : ... where TCommand
// async
public interface IAsyncCommandHandler<in TCommand> : ... where TCommand : IAsyncCommand;
public interface IAsyncCommandHandler<in TCommand, TResult> : ... where TCommand : IAsyncCommand<TResult>

// async stream
public interface IStreamCommandHandler<in TCommand, out TResult>: ... where TCommand : IStreamCommand<TResult>
```

This design choice also makes it impossible to end up executing a command
Expand All @@ -194,6 +200,41 @@ as a validation mechanism via `CanExecute<T>`, as shown above in the `FindDocume
Commands can notify new events, and event observers/subscribers can in turn
execute commands.

### Async Streams

For .NET6+ apps, *Merq* also supports [async streams](https://learn.microsoft.com/en-us/dotnet/csharp/asynchronous-programming/generate-consume-asynchronous-stream)
as a command invocation style. This is useful for scenarios where the command
execution produces a potentially large number of results, and the consumer
wants to process them as they are produced, rather than waiting for the entire
sequence to be produced.

For example, the filter documents command above could be implemented as an
async stream command instead:

```csharp
record FindDocuments(string Filter) : IStreamCommand<string>;

class FindDocumentsHandler : IStreamCommandHandler<FindDocument, string>
{
public bool CanExecute(FindDocument command) => !string.IsNullOrEmpty(command.Filter);

public async IAsyncEnumerable<string> ExecuteAsync(FindDocument command, [EnumeratorCancellation] CancellationToken cancellation)
{
await foreach (var file in FindFilesAsync(command.Filter, cancellation))
yield return file;
}
}
```

In order to execute such command, the only execute method the compiler will allow
is:

```csharp
await foreach (var file in bus.ExecuteStream(new FindDocuments("*.json")))
Console.WriteLine(file);
```


## Analyzers and Code Fixes

Beyond the compiler complaining, *Merq* also provides a set of analyzers and
Expand Down
4 changes: 2 additions & 2 deletions src/Merq.CodeAnalysis.Tests/Merq.CodeAnalysis.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Merq.Core\Merq.Core.csproj" />
<ProjectReference Include="..\Merq\Merq.csproj" />
<ProjectReference Include="..\Merq.Core\Merq.Core.csproj" AdditionalProperties="TargetFramework=netstandard2.0" />
<ProjectReference Include="..\Merq\Merq.csproj" AdditionalProperties="TargetFramework=netstandard2.0" />
<ProjectReference Include="..\Merq.CodeAnalysis\Merq.CodeAnalysis.csproj" />
<ProjectReference Include="..\Merq.CodeFixes\Merq.CodeFixes.csproj" />
</ItemGroup>
Expand Down
6 changes: 3 additions & 3 deletions src/Merq.CodeFixes/ModuleInit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ public static void Init()
AppDomain.CurrentDomain.AssemblyResolve += (s, e) =>
{
var name = new AssemblyName(e.Name);
if (name.Name == "Superpower" &&
Path.GetDirectoryName(typeof(ModuleInit).Assembly.ManifestModule.FullyQualifiedName) is string dir &&
if (name.Name == "Superpower" &&
Path.GetDirectoryName(typeof(ModuleInit).Assembly.ManifestModule.FullyQualifiedName) is string dir &&
Path.Combine(dir, "Superpower.dll") is string path &&
File.Exists(path) &&
File.Exists(path) &&
Assembly.LoadFrom(path) is Assembly asm)
{
return asm;
Expand Down
2 changes: 1 addition & 1 deletion src/Merq.Core/Merq.Core.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>netstandard2.0;net6.0</TargetFrameworks>
<PackageId>Merq.Core</PackageId>
<Title>
Merq: Default Message Bus (Commands + Events) Implementation, for internal application architecture via command and event messages.
Expand Down
128 changes: 96 additions & 32 deletions src/Merq.Core/MessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
Expand Down Expand Up @@ -53,7 +54,7 @@ namespace Merq;
public class MessageBus : IMessageBus
{
static readonly ConcurrentDictionary<Type, Type> handlerTypeMap = new();
static readonly MethodInfo canExecuteMethod = typeof(MessageBus).GetMethod(nameof(CanExecute)) ??
static readonly MethodInfo canExecuteMethod = typeof(MessageBus).GetMethod(nameof(CanExecute)) ??
throw new InvalidOperationException($"{nameof(MessageBus)}.{nameof(CanExecute)} not found");

// All subjects active in the event stream.
Expand Down Expand Up @@ -293,6 +294,37 @@ public Task<TResult> ExecuteAsync<TResult>(IAsyncCommand<TResult> command, Cance
}
}

#if NET6_0_OR_GREATER
/// <inheritdoc/>
public IAsyncEnumerable<TResult> ExecuteStream<TResult>(IStreamCommand<TResult> command, CancellationToken cancellation = default, [CallerMemberName] string? callerName = default, [CallerFilePath] string? callerFile = default, [CallerLineNumber] int? callerLine = default)
{
var type = GetCommandType(command);
using var activity = StartCommandActivity(type, command, callerName, callerFile, callerLine);

try
{
if (type.IsPublic || type.IsNestedPublic)
// For public types, we can use the faster dynamic dispatch approach
return WithResult<TResult>().ExecuteStream((dynamic)command, cancellation);

return (IAsyncEnumerable<TResult>)resultAsyncExecutors.GetOrAdd(type, type
=> (ResultAsyncDispatcher)Activator.CreateInstance(
typeof(ResultStreamDispatcher<,>).MakeGenericType(type, typeof(TResult)),
this)!)
.ExecuteAsync(command, cancellation);
}
catch (Exception e)
{
activity.RecordException(e);
// Rethrow original exception to preserve stacktrace.
ExceptionDispatchInfo.Capture(e).Throw();
throw;
}
}
#endif



/// <inheritdoc/>
public void Notify<TEvent>(TEvent e, [CallerMemberName] string? callerName = default, [CallerFilePath] string? callerFile = default, [CallerLineNumber] int? callerLine = default)
{
Expand Down Expand Up @@ -420,6 +452,12 @@ static Type GetHandlerType(Type commandType)
i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAsyncCommand<>)) is Type iface2)
return typeof(IAsyncCommandHandler<,>).MakeGenericType(type, iface2.GetGenericArguments()[0]);

#if NET6_0_OR_GREATER
if (type.GetInterfaces().FirstOrDefault(i =>
i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IStreamCommand<>)) is Type iface3)
return typeof(IStreamCommandHandler<,>).MakeGenericType(type, iface3.GetGenericArguments()[0]);
#endif

throw new InvalidOperationException($"Type {type} does not implement any command interface.");
});
}
Expand Down Expand Up @@ -453,7 +491,11 @@ static Type GetCommandType(IExecutable command)
generic != typeof(ICommandHandler<>) &&
generic != typeof(ICommandHandler<,>) &&
generic != typeof(IAsyncCommandHandler<>) &&
generic != typeof(IAsyncCommandHandler<,>))
generic != typeof(IAsyncCommandHandler<,>)
#if NET6_0_OR_GREATER
&& generic != typeof(IStreamCommandHandler<,>)
#endif
)
continue;

var arg = descriptor.ServiceType.GetGenericArguments()[0];
Expand Down Expand Up @@ -596,32 +638,62 @@ commandType is not null &&
throw new InvalidOperationException($"No service for type '{typeof(IAsyncCommandHandler<TCommand, TResult>)}' has been registered.");
}

#if NET6_0_OR_GREATER
IAsyncEnumerable<TResult> ExecuteStreamCore<TCommand, TResult>(TCommand command, CancellationToken cancellation) where TCommand : IStreamCommand<TResult>
{
var handler = services.GetService<IStreamCommandHandler<TCommand, TResult>>();
if (handler != null)
{
var watch = Stopwatch.StartNew();
try
{
return handler.ExecuteSteam(command, cancellation);
}
finally
{
Processing.Record(watch.ElapsedMilliseconds,
new Tag("Command", typeof(TCommand).FullName),
new Tag("Handler", handler.GetType().FullName));
}
}

// See if we can convert from the TCommand to a compatible type with
// a registered command handler
if (FindCommandMapper(typeof(TCommand), out var commandType) is Func<dynamic, object> factory &&
commandType is not null &&
factory.Invoke(command) is IStreamCommand<TResult> converted)
{
return ExecuteStream(converted, cancellation);
}

throw new InvalidOperationException($"No service for type '{typeof(IStreamCommandHandler<TCommand, TResult>)}' has been registered.");
}
#endif

// dynamic dispatch cannot infer TResult from TCommand, so we need to use a generic method
// that first "sets" the TResult and then use dynamic dispatch on the resulting instance.
With<TResult> WithResult<TResult>() => new(this);

readonly struct With<TResult>
readonly struct With<TResult>(MessageBus bus)
{
readonly MessageBus bus;

public With(MessageBus bus) => this.bus = bus;
readonly MessageBus bus = bus;

public TResult Execute<TCommand>(TCommand command) where TCommand : ICommand<TResult>
=> bus.ExecuteCore<TCommand, TResult>(command);

public Task<TResult> ExecuteAsync<TCommand>(TCommand command, CancellationToken cancellation) where TCommand : IAsyncCommand<TResult>
=> bus.ExecuteAsyncCore<TCommand, TResult>(command, cancellation);

#if NET6_0_OR_GREATER
public IAsyncEnumerable<TResult> ExecuteStream<TCommand>(TCommand command, CancellationToken cancellation) where TCommand : IStreamCommand<TResult>
=> bus.ExecuteStreamCore<TCommand, TResult>(command, cancellation);
#endif
}

#region Event Helpers

class CompositeObservable<T> : IObservable<T>
class CompositeObservable<T>(params IObservable<T>[] observables) : IObservable<T>
{
readonly IObservable<T>[] observables;

public CompositeObservable(params IObservable<T>[] observables)
=> this.observables = observables;

public IDisposable Subscribe(IObserver<T> observer)
=> new CompositeDisposable(observables
.Select(observable => observable.Subscribe(observer)).ToArray());
Expand Down Expand Up @@ -660,12 +732,8 @@ abstract class VoidDispatcher
public abstract void Execute(IExecutable command);
}

class VoidDispatcher<TCommand> : VoidDispatcher where TCommand : ICommand
class VoidDispatcher<TCommand>(MessageBus bus) : VoidDispatcher where TCommand : ICommand
{
readonly MessageBus bus;

public VoidDispatcher(MessageBus bus) => this.bus = bus;

public override void Execute(IExecutable command) => bus.ExecuteCore((TCommand)command);
}

Expand All @@ -674,12 +742,8 @@ abstract class ResultDispatcher
public abstract object? Execute(IExecutable command);
}

class ResultDispatcher<TCommand, TResult> : ResultDispatcher where TCommand : ICommand<TResult>
class ResultDispatcher<TCommand, TResult>(MessageBus bus) : ResultDispatcher where TCommand : ICommand<TResult>
{
readonly MessageBus bus;

public ResultDispatcher(MessageBus bus) => this.bus = bus;

public override object? Execute(IExecutable command) => bus.ExecuteCore<TCommand, TResult>((TCommand)command);
}

Expand All @@ -688,12 +752,8 @@ abstract class VoidAsyncDispatcher
public abstract Task ExecuteAsync(IExecutable command, CancellationToken cancellation);
}

class VoidAsyncDispatcher<TCommand> : VoidAsyncDispatcher where TCommand : IAsyncCommand
class VoidAsyncDispatcher<TCommand>(MessageBus bus) : VoidAsyncDispatcher where TCommand : IAsyncCommand
{
readonly MessageBus bus;

public VoidAsyncDispatcher(MessageBus bus) => this.bus = bus;

public override Task ExecuteAsync(IExecutable command, CancellationToken cancellation) => bus.ExecuteAsyncCore((TCommand)command, cancellation);
}

Expand All @@ -702,15 +762,19 @@ abstract class ResultAsyncDispatcher
public abstract object ExecuteAsync(IExecutable command, CancellationToken cancellation);
}

class ResultAsyncDispatcher<TCommand, TResult> : ResultAsyncDispatcher where TCommand : IAsyncCommand<TResult>
class ResultAsyncDispatcher<TCommand, TResult>(MessageBus bus) : ResultAsyncDispatcher where TCommand : IAsyncCommand<TResult>
{
readonly MessageBus bus;

public ResultAsyncDispatcher(MessageBus bus) => this.bus = bus;

public override object ExecuteAsync(IExecutable command, CancellationToken cancellation)
=> bus.ExecuteAsyncCore<TCommand, TResult>((TCommand)command, cancellation);
}

#if NET6_0_OR_GREATER
class ResultStreamDispatcher<TCommand, TResult>(MessageBus bus) : ResultAsyncDispatcher where TCommand : IStreamCommand<TResult>
{
public override object ExecuteAsync(IExecutable command, CancellationToken cancellation)
=> bus.ExecuteStreamCore<TCommand, TResult>((TCommand)command, cancellation);
}
#endif

#endregion
}
Loading

0 comments on commit e6efbd9

Please sign in to comment.