Skip to content

Commit d84b01d

Browse files
author
Rafał Maciąg
committed
AddPlumberd as an extension method.
1 parent 032a780 commit d84b01d

File tree

8 files changed

+152
-59
lines changed

8 files changed

+152
-59
lines changed

Diff for: Checkers/Program.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
builder.Services.AddMudServices();
1414
var tmp = builder.Logging;
15-
builder.Services.AddCheckers(builder.Configuration, builder.Environment.IsDevelopment());
15+
builder.Services.AddCheckers();
1616
var app = builder.Build();
1717

1818
// Configure the HTTP request pipeline.
@@ -31,5 +31,5 @@
3131

3232
app.MapBlazorHub();
3333
app.MapFallbackToPage("/_Host");
34-
app.ConfigureCheckers();
34+
3535
app.Run();

Diff for: Checkers/StartupExtensions.cs

+7-30
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public object GetService(Type serviceType)
2020
}
2121
}
2222

23+
2324
class LazyLogProvider : ILoggerFactory
2425
{
2526
IServiceProvider _provider;
@@ -73,32 +74,14 @@ public ILogger CreateLogger(string categoryName)
7374
}
7475
public static class Startup
7576
{
76-
private static ServiceProviderProxy _serviceProvider;
77+
7778
private static IPlumberRuntime _plumberRuntime;
7879

79-
public static void AddCheckers(this IServiceCollection services,
80-
IConfiguration Configuration, bool isDevelopment)
80+
public static void AddCheckers(this IServiceCollection services)
8181
{
82-
_serviceProvider = new ServiceProviderProxy();
82+
services.AddPlumberd(x => x.WithGrpc(y => y.IgnoreServerCert().InSecure().WithDevelopmentEnv(true)));
8383

8484

85-
86-
87-
var b = new PlumberBuilder()
88-
.WithDefaultServiceProvider(_serviceProvider)
89-
.WithLoggerFactory(new LazyLogProvider(_serviceProvider))
90-
.WithGrpc(x => x
91-
.WithConfig(Configuration)
92-
.WithWrittenEventsToLog(isDevelopment)
93-
.IgnoreServerCert() // <---
94-
.InSecure()
95-
.WithDevelopmentEnv(isDevelopment)
96-
.WithProjectionsConfigFrom(typeof(Startup).Assembly));
97-
_plumberRuntime = b.Build();
98-
services.AddSingleton(_plumberRuntime.DefaultCommandInvoker);
99-
services.AddSingleton(_plumberRuntime.DefaultEventStore);
100-
services.AddSingleton(_plumberRuntime);
101-
10285
services.AddSingleton<ValidatorFactory>();
10386

10487
_processingUnitTypes = GetTypes()
@@ -107,6 +90,8 @@ public static void AddCheckers(this IServiceCollection services,
10790
services.AddSingletons(_processingUnitTypes);
10891
services.AddSingletons(GetTypes().IsAssignableToClass<IModel>());
10992

93+
services.RegisterControllers(_processingUnitTypes);
94+
11095
bool hasAggregates = false;
11196
foreach (var at in GetTypes().IsAssignableToClass<IRootAggregate>())
11297
{
@@ -134,15 +119,7 @@ public static void AddCheckers(this IServiceCollection services,
134119
services.AddScoped<ILiveQueryExecutor, LiveQueryExecutor>();
135120
}
136121

137-
public static void ConfigureCheckers(this WebApplication app)
138-
{
139-
_serviceProvider.SetProvider(app.Services);
140-
141-
foreach (var pu in _processingUnitTypes)
142-
_plumberRuntime.RegisterController(pu);
143-
144-
Task.Run(() => _plumberRuntime.StartAsync()).GetAwaiter().GetResult();
145-
}
122+
146123
private static Type[] _types;
147124
private static Type[] _processingUnitTypes;
148125

Diff for: ModelingEvolution.Plumberd.EventStore/EventStoreExtensions.cs

+70-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
using System.Runtime.CompilerServices;
22
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Microsoft.Extensions.Configuration;
8+
using Microsoft.Extensions.DependencyInjection;
9+
using Microsoft.Extensions.Hosting;
10+
using Microsoft.Extensions.Logging;
11+
using ModelingEvolution.Plumberd.EventProcessing;
312

413
[assembly: InternalsVisibleTo("ModelingEvolution.Plumberd.Tests")]
514

@@ -19,5 +28,65 @@ public static PlumberBuilder WithGrpc(this PlumberBuilder builder,
1928
}
2029

2130
}
22-
31+
public class PlumberdProjectionStarter : BackgroundService
32+
{
33+
private readonly IServiceProvider _serviceProvider;
34+
35+
public PlumberdProjectionStarter(IServiceProvider serviceProvider)
36+
{
37+
_serviceProvider = serviceProvider;
38+
}
39+
40+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
41+
{
42+
var plumberd = _serviceProvider.GetRequiredService<IPlumberRuntime>();
43+
var processingUnits = _serviceProvider.GetRequiredService<IEnumerable<IProcessingUnitRegistration>>();
44+
foreach (var r in processingUnits)
45+
plumberd.RegisterController(r.ProcessingUnit);
46+
await plumberd.StartAsync();
47+
}
48+
}
49+
interface IProcessingUnitRegistration { Type ProcessingUnit { get; }}
50+
record ProcessingUnitRegistration(Type ProcessingUnit) : IProcessingUnitRegistration { }
51+
52+
public static class ContainerExtensions
53+
{
54+
public static IServiceCollection RegisterController(this IServiceCollection services, Type processingUnit)
55+
{
56+
57+
services.AddSingleton(typeof(IProcessingUnitRegistration), new ProcessingUnitRegistration(processingUnit));
58+
return services;
59+
}
60+
public static IServiceCollection RegisterControllers(this IServiceCollection services, params Type[] processingUnits)
61+
{
62+
foreach (var i in processingUnits) services.RegisterController(i);
63+
return services;
64+
}
65+
public static IServiceCollection RegisterController<TController>(this IServiceCollection services)
66+
{
67+
return services.RegisterController(typeof(TController));
68+
}
69+
public static IServiceCollection AddPlumberd(this IServiceCollection services, Action<PlumberBuilder> builder)
70+
{
71+
// Check in WASM - this might not work.
72+
services.AddHostedService<PlumberdProjectionStarter>();
73+
74+
services.AddSingleton<IPlumberRuntime>(sp =>
75+
{
76+
PlumberBuilder b = new PlumberBuilder()
77+
.WithDefaultServiceProvider(sp)
78+
.WithLoggerFactory(sp.GetRequiredService<ILoggerFactory>())
79+
.WithGrpc(x => x.WithConfig(sp.GetRequiredService<IConfiguration>())
80+
.WithLoggerFactory(sp.GetRequiredService<ILoggerFactory>())
81+
.WithStartupProjections(StartupProjection.All));
82+
83+
builder(b);
84+
return b.Build();
85+
});
86+
services.AddSingleton<ICommandInvoker>(sp => sp.GetRequiredService<IPlumberRuntime>().DefaultCommandInvoker);
87+
services.AddSingleton<IEventStore>(sp => sp.GetRequiredService<IPlumberRuntime>().DefaultEventStore);
88+
services.AddSingleton<TypeRegister>();
89+
return services;
90+
}
91+
}
2392
}

Diff for: ModelingEvolution.Plumberd.EventStore/ModelingEvolution.Plumberd.EventStore.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ This package is the implementation of EventStory from EventStore.</Description>
2727
<PackageReference Include="EventStore.Client.Grpc.ProjectionManagement" Version="23.1.0" />
2828
<PackageReference Include="EventStore.Client.Grpc.Streams" Version="23.1.0" />
2929
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
30+
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
3031
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
3132

3233
</ItemGroup>

Diff for: ModelingEvolution.Plumberd.EventStore/NativeEventStore.Subscription.cs

+15-3
Original file line numberDiff line numberDiff line change
@@ -56,22 +56,34 @@ public async Task Subscribe()
5656
try
5757
{
5858
var position = _fromBeginning ? StreamPosition.Start : StreamPosition.End;
59-
PersistentSubscriptionSettings s = new PersistentSubscriptionSettings(true,position,
60-
checkPointLowerBound:1);
59+
PersistentSubscriptionSettings s = new PersistentSubscriptionSettings(true, position,
60+
checkPointLowerBound: 1);
6161

6262
_streamIterator = position.ToUInt64();
6363
var subs = (await _parent.PersistentSubscriptions.ListAllAsync()).ToArray();
6464

6565
if (subs.All(x => x.GroupName != group))
6666
await _parent.PersistentSubscriptions.CreateToStreamAsync(_streamName, group, s);
67-
67+
6868
_log.LogInformation("Connecting to persistent subscription {subscriptionName}.", _streamName);
6969
this._subscription = await _parent.PersistentSubscriptions.SubscribeToStreamAsync(_streamName,
7070
group /*Environment.MachineName*/,
7171
OnEventAppeared,
7272
userCredentials: _parent._credentials,
7373
subscriptionDropped: OnSubscriptionDropped);
7474
}
75+
catch (PersistentSubscriptionNotFoundException ex)
76+
{
77+
var position = _fromBeginning ? StreamPosition.Start : StreamPosition.End;
78+
PersistentSubscriptionSettings s = new PersistentSubscriptionSettings(true, position,
79+
checkPointLowerBound: 1);
80+
await _parent.PersistentSubscriptions.CreateToStreamAsync(_streamName, group, s);
81+
this._subscription = await _parent.PersistentSubscriptions.SubscribeToStreamAsync(_streamName,
82+
group /*Environment.MachineName*/,
83+
OnEventAppeared,
84+
userCredentials: _parent._credentials,
85+
subscriptionDropped: OnSubscriptionDropped);
86+
}
7587
catch (ArgumentException)
7688
{
7789
await Task.Delay(2000);

Diff for: ModelingEvolution.Plumberd.Tests/NativeEventStoreBuilderTests.cs

-22
This file was deleted.

Diff for: ModelingEvolution.Plumberd.Tests/TransitionUnitTests.cs

+56
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,68 @@
22
using System.Threading.Tasks;
33
using FluentAssertions;
44
using Microsoft.Extensions.Logging;
5+
using ModelingEvolution.Plumberd.EventStore;
56
using ModelingEvolution.Plumberd.Tests.Models;
67
using Xunit;
78
#pragma warning disable 1998
89

910
namespace ModelingEvolution.Plumberd.Tests
1011
{
12+
public class UnitTest1
13+
{
14+
[Fact]
15+
public void Router()
16+
{
17+
// This won't work because apparently HAProxy cannot connect to backend using HTTP2
18+
var b = new PlumberBuilder()
19+
.WithDefaultServiceProvider(NSubstitute.Substitute.For<IServiceProvider>())
20+
.WithLoggerFactory(new LazyLogProvider(NSubstitute.Substitute.For<IServiceProvider>()))
21+
.WithGrpc(x => x
22+
// .WithConfig(Configuration)
23+
.WithCredentials("admin", "3KLE81YCdbG6nDnSH9oyr4IU")
24+
.WithHttpUrl(new Uri("https://es.welder.ai"))
25+
.InSecure()
26+
.WithWrittenEventsToLog(true)
27+
.IgnoreServerCert() // <---
28+
.WithDevelopmentEnv(true));
29+
var _plumberRuntime = b.Build();
30+
31+
}
32+
[Fact]
33+
public void RouterNat()
34+
{
35+
var b = new PlumberBuilder()
36+
.WithDefaultServiceProvider(NSubstitute.Substitute.For<IServiceProvider>())
37+
.WithLoggerFactory(new LazyLogProvider(NSubstitute.Substitute.For<IServiceProvider>()))
38+
.WithGrpc(x => x
39+
// .WithConfig(Configuration)
40+
.WithCredentials("admin", "3KLE81YCdbG6nDnSH9oyr4IU")
41+
.WithHttpUrl(new Uri("https://10.2.0.1:8080"))
42+
.InSecure()
43+
.WithWrittenEventsToLog(true)
44+
.IgnoreServerCert() // <---
45+
.WithDevelopmentEnv(true));
46+
var _plumberRuntime = b.Build();
47+
48+
}
49+
[Fact]
50+
public void Direct()
51+
{
52+
var b = new PlumberBuilder()
53+
.WithDefaultServiceProvider(NSubstitute.Substitute.For<IServiceProvider>())
54+
.WithLoggerFactory(new LazyLogProvider(NSubstitute.Substitute.For<IServiceProvider>()))
55+
.WithGrpc(x => x
56+
// .WithConfig(Configuration)
57+
.WithCredentials("admin", "3KLE81YCdbG6nDnSH9oyr4IU")
58+
.WithHttpUrl(new Uri("https://10.2.0.13:5009"))
59+
.InSecure()
60+
.WithWrittenEventsToLog(true)
61+
.IgnoreServerCert() // <---
62+
.WithDevelopmentEnv(true));
63+
var _plumberRuntime = b.Build();
64+
65+
}
66+
}
1167
public class TransitionUnitTests
1268
{
1369
[Fact]

Diff for: build.cake

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
var target = Argument("target", "Publish");
44
var configuration = Argument("configuration", "Release");
5-
var version = Argument("version", "1.3.35.10");
5+
var version = Argument("version", "1.3.35.11");
66

77

88
var commits = GitLog("./", int.MaxValue);

0 commit comments

Comments
 (0)