Skip to content

Commit

Permalink
Enable replicaSet support for MongoDb
Browse files Browse the repository at this point in the history
  • Loading branch information
twsouthwick committed Sep 14, 2024
1 parent dad2369 commit 350ce9f
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 16 deletions.
81 changes: 80 additions & 1 deletion src/Aspire.Hosting.MongoDB/MongoDBBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Text;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.MongoDB;
using Aspire.Hosting.Utils;
Expand Down Expand Up @@ -53,6 +54,74 @@ public static IResourceBuilder<MongoDBServerResource> AddMongoDB(this IDistribut
.WithHealthCheck(healthCheckKey);
}

/// <summary>
/// Adds a replica set to the MongoDB server resource.
/// </summary>
/// <param name="builder">The MongoDB server resource.</param>
/// <param name="replicaSet">The name of the replica set. If not provided, defaults to <c>rs0</c>.</param>
/// <returns>A reference to the <see cref="IResourceBuilder{T}"/>.</returns>
public static IResourceBuilder<MongoDBServerResource> WithReplicaSet(this IResourceBuilder<MongoDBServerResource> builder, string? replicaSet = null)
{
if (builder.Resource.TryGetLastAnnotation<MongoDbReplicaSetAnnotation>(out _))
{
throw new InvalidOperationException("A replica set has already been added to the MongoDB server resource.");
}

replicaSet ??= "rs0";

SetPortAndTargetToBeSame(builder);
SetRsInitCallOnStartup(builder);

return builder
.WithAnnotation(new MongoDbReplicaSetAnnotation(replicaSet))
.WithArgs(ctx =>
{
ctx.Args.Add("--replSet");
ctx.Args.Add(replicaSet);

var port = builder.Resource.PrimaryEndpoint.TargetPort;

if (port != DefaultContainerPort)
{
ctx.Args.Add("--port");
ctx.Args.Add($"{port}");
}
});

static void SetPortAndTargetToBeSame(IResourceBuilder<MongoDBServerResource> builder)
{
foreach (var endpoint in builder.Resource.Annotations.OfType<EndpointAnnotation>())
{
if (endpoint.Name == MongoDBServerResource.PrimaryEndpointName)
{
// In the case of replica sets, the port and target port should be the same and is not proxied
endpoint.IsProxied = false;

if (endpoint.Port is null)
{
endpoint.Port = endpoint.TargetPort;
}
else
{
endpoint.TargetPort = endpoint.Port;
}
break;
}
}
}

static void SetRsInitCallOnStartup(IResourceBuilder<ContainerResource> builder)
{
// Need to run 'rs.initiate()' on the db startup up to initialize the replica set
const string content = $$"""echo "rs.initiate()" | mongosh""";

var tmpPath = Path.GetTempFileName();
File.WriteAllText(tmpPath, content);

builder.WithBindMount(tmpPath, "/docker-entrypoint-initdb.d/replicaset_init.sh");
}
}

/// <summary>
/// Adds a MongoDB database to the application model.
/// </summary>
Expand Down Expand Up @@ -163,9 +232,19 @@ public static IResourceBuilder<MongoDBServerResource> WithInitBindMount(this IRe

private static void ConfigureMongoExpressContainer(EnvironmentCallbackContext context, MongoDBServerResource resource)
{
var sb = new StringBuilder($"mongodb://{resource.Name}:{resource.PrimaryEndpoint.TargetPort}/?directConnection=true");

if (resource.TryGetLastAnnotation<MongoDbReplicaSetAnnotation>(out var replica))
{
sb.Append('&');
sb.Append(MongoDbReplicaSetAnnotation.QueryName);
sb.Append('=');
sb.Append(replica.ReplicaSetName);
}

// Mongo Exporess assumes Mongo is being accessed over a default Aspire container network and hardcodes the resource address
// This will need to be refactored once updated service discovery APIs are available
context.EnvironmentVariables.Add("ME_CONFIG_MONGODB_URL", $"mongodb://{resource.Name}:{resource.PrimaryEndpoint.TargetPort}/?directConnection=true");
context.EnvironmentVariables.Add("ME_CONFIG_MONGODB_URL", sb.ToString());
context.EnvironmentVariables.Add("ME_CONFIG_BASICAUTH", "false");
}
}
12 changes: 11 additions & 1 deletion src/Aspire.Hosting.MongoDB/MongoDBDatabaseResource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,17 @@ public MongoDBDatabaseResource(string name, string databaseName, MongoDBServerRe
/// Gets the connection string expression for the MongoDB database.
/// </summary>
public ReferenceExpression ConnectionStringExpression
=> ReferenceExpression.Create($"{Parent}/{DatabaseName}");
{
get
{
var builder = new ReferenceExpression.ExpressionInterpolatedStringHandler(10, 2);

Parent.AppendConnectionString(builder);
Parent.AppendSuffix(builder, DatabaseName);

return ReferenceExpression.Create(builder);
}
}

/// <summary>
/// Gets the parent MongoDB container resource.
Expand Down
58 changes: 55 additions & 3 deletions src/Aspire.Hosting.MongoDB/MongoDBServerResource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,61 @@ public class MongoDBServerResource(string name) : ContainerResource(name), IReso
/// <summary>
/// Gets the connection string for the MongoDB server.
/// </summary>
public ReferenceExpression ConnectionStringExpression =>
ReferenceExpression.Create(
$"mongodb://{PrimaryEndpoint.Property(EndpointProperty.Host)}:{PrimaryEndpoint.Property(EndpointProperty.Port)}");
public ReferenceExpression ConnectionStringExpression
{
get
{
const string MongoScheme = "mongodb://";

var builder = new ReferenceExpression.ExpressionInterpolatedStringHandler(MongoScheme.Length, 2);

AppendConnectionString(builder);
AppendSuffix(builder);

return ReferenceExpression.Create(builder);
}
}

internal void AppendConnectionString(in ReferenceExpression.ExpressionInterpolatedStringHandler builder)
{
builder.AppendLiteral("mongodb://");
builder.AppendFormatted(PrimaryEndpoint.Property(EndpointProperty.Host));
builder.AppendLiteral(":");
builder.AppendFormatted(PrimaryEndpoint.Property(EndpointProperty.Port));
}

/// <summary>
/// Handles adding the rest of the connection string.
///
/// - If a database name is provided, it will be appended to the connection string.
/// - If a replica set name is provided, it will be appended to the connection string.
/// - If no database but a replica set is provided, a '/' must be inserted before the '?'
/// </summary>
internal bool AppendSuffix(in ReferenceExpression.ExpressionInterpolatedStringHandler builder, string? dbName = null)
{
if (dbName is { })
{
builder.AppendLiteral("/");
builder.AppendFormatted(dbName);
}

if (Annotations.OfType<MongoDbReplicaSetAnnotation>().FirstOrDefault() is { ReplicaSetName: { } replicaSetName })
{
if (dbName is null)
{
builder.AppendLiteral("/");
}

builder.AppendLiteral("?");
builder.AppendLiteral(MongoDbReplicaSetAnnotation.QueryName);
builder.AppendLiteral("=");
builder.AppendLiteral(replicaSetName);

return true;
}

return false;
}

private readonly Dictionary<string, string> _databases = new Dictionary<string, string>(StringComparers.ResourceName);

Expand Down
9 changes: 9 additions & 0 deletions src/Aspire.Hosting.MongoDB/MongoDbReplicaSetAnnotation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Aspire.Hosting.ApplicationModel;

internal sealed record MongoDbReplicaSetAnnotation(string ReplicaSetName) : IResourceAnnotation
{
internal const string QueryName = "replicaSet";
}
2 changes: 1 addition & 1 deletion src/Aspire.Hosting.MongoDB/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#nullable enable

static Aspire.Hosting.MongoDBBuilderExtensions.WithReplicaSet(this Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.ApplicationModel.MongoDBServerResource!>! builder, string? replicaSet = null) -> Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.ApplicationModel.MongoDBServerResource!>!
63 changes: 57 additions & 6 deletions tests/Aspire.Hosting.MongoDB.Tests/AddMongoDBTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,33 @@ public async Task MongoDBCreatesConnectionString()
Assert.Equal("mongodb://localhost:27017", await serverResource.GetConnectionStringAsync());
Assert.Equal("mongodb://{mongodb.bindings.tcp.host}:{mongodb.bindings.tcp.port}", serverResource.ConnectionStringExpression.ValueExpression);
Assert.Equal("mongodb://localhost:27017/mydatabase", connectionString);
Assert.Equal("{mongodb.connectionString}/mydatabase", connectionStringResource.ConnectionStringExpression.ValueExpression);
Assert.Equal("mongodb://{mongodb.bindings.tcp.host}:{mongodb.bindings.tcp.port}/mydatabase", connectionStringResource.ConnectionStringExpression.ValueExpression);
}

[Fact]
public async Task MongoDBCreatesConnectionStringWithReplicaSet()
{
var appBuilder = DistributedApplication.CreateBuilder();
appBuilder
.AddMongoDB("mongodb")
.WithEndpoint("tcp", e => e.AllocatedEndpoint = new AllocatedEndpoint(e, "localhost", 27017))
.WithReplicaSet("myreplset")
.AddDatabase("mydatabase");

using var app = appBuilder.Build();

var appModel = app.Services.GetRequiredService<DistributedApplicationModel>();

var dbResource = Assert.Single(appModel.Resources.OfType<MongoDBDatabaseResource>());
var serverResource = dbResource.Parent as IResourceWithConnectionString;
var connectionStringResource = dbResource as IResourceWithConnectionString;
Assert.NotNull(connectionStringResource);
var connectionString = await connectionStringResource.GetConnectionStringAsync();

Assert.Equal("mongodb://localhost:27017/?replicaSet=myreplset", await serverResource.GetConnectionStringAsync());
Assert.Equal("mongodb://{mongodb.bindings.tcp.host}:{mongodb.bindings.tcp.port}/?replicaSet=myreplset", serverResource.ConnectionStringExpression.ValueExpression);
Assert.Equal("mongodb://localhost:27017/mydatabase?replicaSet=myreplset", connectionString);
Assert.Equal("mongodb://{mongodb.bindings.tcp.host}:{mongodb.bindings.tcp.port}/mydatabase?replicaSet=myreplset", connectionStringResource.ConnectionStringExpression.ValueExpression);
}

[Fact]
Expand Down Expand Up @@ -161,6 +187,31 @@ public async Task WithMongoExpressUsesContainerHost()
});
}

[Fact]
public async Task WithMongoExpressWithReplicaSet()
{
using var builder = TestDistributedApplicationBuilder.Create();
builder.AddMongoDB("mongo")
.WithReplicaSet("myreplicaset")
.WithMongoExpress();

var mongoExpress = Assert.Single(builder.Resources.OfType<MongoExpressContainerResource>());

var env = await EnvironmentVariableEvaluator.GetEnvironmentVariablesAsync(mongoExpress, DistributedApplicationOperation.Run, TestServiceProvider.Instance);

Assert.Collection(env,
e =>
{
Assert.Equal("ME_CONFIG_MONGODB_URL", e.Key);
Assert.Equal($"mongodb://mongo:27017/?directConnection=true&replicaSet=myreplicaset", e.Value);
},
e =>
{
Assert.Equal("ME_CONFIG_BASICAUTH", e.Key);
Assert.Equal("false", e.Value);
});
}

[Fact]
public void WithMongoExpressOnMultipleResources()
{
Expand Down Expand Up @@ -201,7 +252,7 @@ public async Task VerifyManifest()
expectedManifest = """
{
"type": "value.v0",
"connectionString": "{mongo.connectionString}/mydb"
"connectionString": "mongodb://{mongo.bindings.tcp.host}:{mongo.bindings.tcp.port}/mydb"
}
""";
Assert.Equal(expectedManifest, dbManifest.ToString());
Expand Down Expand Up @@ -243,8 +294,8 @@ public void CanAddDatabasesWithDifferentNamesOnSingleServer()
Assert.Equal("customers1", db1.Resource.DatabaseName);
Assert.Equal("customers2", db2.Resource.DatabaseName);

Assert.Equal("{mongo1.connectionString}/customers1", db1.Resource.ConnectionStringExpression.ValueExpression);
Assert.Equal("{mongo1.connectionString}/customers2", db2.Resource.ConnectionStringExpression.ValueExpression);
Assert.Equal("mongodb://{mongo1.bindings.tcp.host}:{mongo1.bindings.tcp.port}/customers1", db1.Resource.ConnectionStringExpression.ValueExpression);
Assert.Equal("mongodb://{mongo1.bindings.tcp.host}:{mongo1.bindings.tcp.port}/customers2", db2.Resource.ConnectionStringExpression.ValueExpression);
}

[Fact]
Expand All @@ -261,7 +312,7 @@ public void CanAddDatabasesWithTheSameNameOnMultipleServers()
Assert.Equal("imports", db1.Resource.DatabaseName);
Assert.Equal("imports", db2.Resource.DatabaseName);

Assert.Equal("{mongo1.connectionString}/imports", db1.Resource.ConnectionStringExpression.ValueExpression);
Assert.Equal("{mongo2.connectionString}/imports", db2.Resource.ConnectionStringExpression.ValueExpression);
Assert.Equal("mongodb://{mongo1.bindings.tcp.host}:{mongo1.bindings.tcp.port}/imports", db1.Resource.ConnectionStringExpression.ValueExpression);
Assert.Equal("mongodb://{mongo2.bindings.tcp.host}:{mongo2.bindings.tcp.port}/imports", db2.Resource.ConnectionStringExpression.ValueExpression);
}
}
69 changes: 65 additions & 4 deletions tests/Aspire.Hosting.MongoDB.Tests/MongoDbFunctionalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Components.Common.Tests;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Utils;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;
using Polly;
using Xunit;
using Xunit.Abstractions;
using Polly;
using Aspire.Hosting.ApplicationModel;
using Microsoft.Extensions.Diagnostics.HealthChecks;

namespace Aspire.Hosting.MongoDB.Tests;

Expand Down Expand Up @@ -102,6 +102,67 @@ await pipeline.ExecuteAsync(async token =>
}, cts.Token);
}

[Fact]
[RequiresDocker]
public async Task VerifyMongoDBResourceReplicaSet()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
var pipeline = new ResiliencePipelineBuilder()
.AddRetry(new() { MaxRetryAttempts = 10, Delay = TimeSpan.FromSeconds(1) })
.Build();

using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);

var mongodb = builder.AddMongoDB("mongodb")
.WithReplicaSet();
var db = mongodb.AddDatabase("testdb");
using var app = builder.Build();

await app.StartAsync();

var hb = Host.CreateApplicationBuilder();

hb.Configuration[$"ConnectionStrings:{db.Resource.Name}"] = await db.Resource.ConnectionStringExpression.GetValueAsync(default);

hb.AddMongoDBClient(db.Resource.Name);

using var host = hb.Build();

await host.StartAsync();

await pipeline.ExecuteAsync(async token =>
{
var mongoDatabase = host.Services.GetRequiredService<IMongoDatabase>();

var collection = mongoDatabase.GetCollection<Movie>(CollectionName);
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Movie>>()
.Match(x => x.OperationType == ChangeStreamOperationType.Insert);

var changeStreamOptions = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};

// This API requires replica sets
using var cursor = await collection.WatchAsync(pipeline, changeStreamOptions, cts.Token);
var tcs = new TaskCompletionSource<Movie>();

var name = s_movies[0].Name + "Updated";
_ = cursor.ForEachAsync(cursor =>
{
Assert.Equal(ChangeStreamOperationType.Insert, cursor.OperationType);
Assert.NotNull(cursor.FullDocument);
tcs.SetResult(cursor.FullDocument);
}, cts.Token);

await collection.InsertOneAsync(new Movie { Name = name }, cancellationToken: token);

var updated = await tcs.Task;

Assert.Equal(name, updated.Name);
}, cts.Token);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
Expand Down

0 comments on commit 350ce9f

Please sign in to comment.