From 8104fc4f5ee88c60e9caf91b1ea04f6b8e8ed271 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Thu, 14 Dec 2023 09:56:11 -0800 Subject: [PATCH] Refactor more, add comments --- .../GrainReferenceActivator.cs | 41 +++++++++++++------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs index f566118935..e7f28e2db6 100644 --- a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs +++ b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Reflection; @@ -11,6 +12,7 @@ using Orleans.CodeGeneration; using Orleans.Metadata; using Orleans.Runtime; +using Orleans.Runtime.Internal; using Orleans.Runtime.Versions; using Orleans.Serialization; using Orleans.Serialization.Cloning; @@ -184,10 +186,17 @@ public GrainReferenceActivator(Type referenceType, GrainReferenceShared shared) public GrainReference CreateReference(GrainId grainId) => _create(_shared, grainId.Key); } - private sealed class StubGrainReferenceRuntime(IRuntimeClient runtimeClient, GrainFactory grainFactory, TimeProvider timeProvider) : IGrainReferenceRuntime + private sealed class StubGrainReferenceRuntime( + IRuntimeClient runtimeClient, + GrainFactory grainFactory, + GrainInterfaceTypeToGrainTypeResolver interfaceTypeToGrainTypeResolver, + IClusterManifestProvider clusterManifestProvider, + TimeProvider timeProvider) : IGrainReferenceRuntime { private readonly IRuntimeClient _runtimeClient = runtimeClient; private readonly GrainFactory _grainFactory = grainFactory; + private readonly GrainInterfaceTypeToGrainTypeResolver _interfaceTypeToGrainTypeResolver = interfaceTypeToGrainTypeResolver; + private readonly IClusterManifestProvider _clusterManifestProvider = clusterManifestProvider; private readonly TimeProvider _timeProvider = timeProvider; public object Cast(IAddressable grain, Type grainInterface) => _runtimeClient.GrainReferenceRuntime.Cast(grain, grainInterface); @@ -196,31 +205,39 @@ private sealed class StubGrainReferenceRuntime(IRuntimeClient runtimeClient, Gra public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable request, InvokeMethodOptions options) { - await ResolveGrainType(reference, request); + await ResolveGrainTypeAsync(reference, request); return await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); } public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable request, InvokeMethodOptions options) { - await ResolveGrainType(reference, request); + await ResolveGrainTypeAsync(reference, request); await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); } - private async Task ResolveGrainType(GrainReference reference, IInvokable request) + private async Task ResolveGrainTypeAsync(GrainReference reference, IInvokable request) { _ = GrainTypePrefix.TryGetCrainClassPrefix(reference.GrainId.Type, out var grainClassPrefix); var timeout = request.GetDefaultResponseTimeout() ?? _runtimeClient.GetResponseTimeout(); - var cancellation = new CancellationTokenSource(timeout, _timeProvider); - GrainReference grain; - try - { - grain = (GrainReference)await _grainFactory.GetGrainAsync(reference.InterfaceType, reference.GrainId.Key, grainClassPrefix, cancellation.Token); - } - finally + using var cancellation = new CancellationTokenSource(timeout, _timeProvider); + var clusterManifestUpdates = _clusterManifestProvider.Updates.WithCancellation(cancellation.Token).GetAsyncEnumerator(); + GrainType grainType; + while (!_interfaceTypeToGrainTypeResolver.TryGetGrainType(reference.InterfaceType, grainClassPrefix, out grainType)) { - cancellation.Dispose(); + // Wait for the next cluster manifest update. + // The cluster manifest is updated when silos join or leave the cluster. + // Each time the cluster manifest is updated, the grain type resolution cache is invalidated, and will be + // recomputed on the subsequent call to TryGetGrainType. + if (!await clusterManifestUpdates.MoveNextAsync()) + { + // If execution has reached this point, the host is either shutting down or the response timeout has elapsed. + throw new OperationCanceledException(cancellation.Token); + } } + // Use the newly resolved grain type to replace the grain reference's runtime with the runtime corresponding to the resolved grain reference. + var grainId = GrainId.Create(grainType, reference.GrainId.Key); + var grain = (GrainReference)_grainFactory.GetGrain(grainId, reference.InterfaceType); reference.Shared = grain.Shared; } }