From 0beb131395fb6cf24befaa9f81234337cdb3ed1a Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Thu, 11 Jul 2024 21:32:43 -0700 Subject: [PATCH] AdaptiveDirectoryCacheMaintainer: wait for all operations to complete during shutdown --- .../AdaptiveDirectoryCacheMaintainer.cs | 56 +++++++++++-------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/AdaptiveDirectoryCacheMaintainer.cs b/src/Orleans.Runtime/GrainDirectory/AdaptiveDirectoryCacheMaintainer.cs index 2efaa85eaf..d380e315ec 100644 --- a/src/Orleans.Runtime/GrainDirectory/AdaptiveDirectoryCacheMaintainer.cs +++ b/src/Orleans.Runtime/GrainDirectory/AdaptiveDirectoryCacheMaintainer.cs @@ -145,44 +145,54 @@ private async Task Run() refreshedCount); // Send batch requests - SendBatchCacheRefreshRequests(fetchInBatchList); + await SendBatchCacheRefreshRequests(fetchInBatchList, cancellationToken); ProduceStats(); } - catch (Exception ex) when (!cancellationToken.IsCancellationRequested) + catch (Exception ex) { - Log.LogError(ex, $"Error in {nameof(AdaptiveDirectoryCacheMaintainer)}."); + if (!cancellationToken.IsCancellationRequested) + { + Log.LogError(ex, $"Error in {nameof(AdaptiveDirectoryCacheMaintainer)}."); + } + else + { + Log.LogDebug(ex, $"Error in {nameof(AdaptiveDirectoryCacheMaintainer)}."); + } } } } - private void SendBatchCacheRefreshRequests(Dictionary> refreshRequests) + private async Task SendBatchCacheRefreshRequests(Dictionary> refreshRequests, CancellationToken cancellationToken) { - foreach (var kv in refreshRequests) + var tasks = new List(refreshRequests.Count); + foreach (var (silo, grainIds) in refreshRequests) { - var cachedGrainAndETagList = BuildGrainAndETagList(kv.Value); - - var silo = kv.Key; - - DirectoryInstruments.ValidationsCacheSent.Add(1); - // Send all of the items in one large request - var validator = this.grainFactory.GetSystemTarget(Constants.DirectoryCacheValidatorType, silo); - - router.CacheValidator.QueueTask(async () => - { - var response = await validator.LookUpMany(cachedGrainAndETagList); - ProcessCacheRefreshResponse(silo, response); - }).Ignore(); - - if (Log.IsEnabled(LogLevel.Trace)) Log.LogTrace("Silo {SiloAddress} is sending request to silo {OwnerSilo} with {Count} entries", router.MyAddress, silo, cachedGrainAndETagList.Count); + tasks.Add(RefreshSiloDirectoryCache(silo, grainIds, cancellationToken)); } + + await Task.WhenAll(tasks).WaitAsync(cancellationToken); } - private void ProcessCacheRefreshResponse( + private async Task RefreshSiloDirectoryCache( SiloAddress silo, - List refreshResponse) + List grainIds, + CancellationToken cancellationToken) { - if (Log.IsEnabled(LogLevel.Trace)) Log.LogTrace("Silo {SiloAddress} received ProcessCacheRefreshResponse. #Response entries {Count}.", router.MyAddress, refreshResponse.Count); + if (cancellationToken.IsCancellationRequested) + { + return; + } + + var cachedGrainAndETagList = BuildGrainAndETagList(grainIds); + DirectoryInstruments.ValidationsCacheSent.Add(1); + + // Send all of the items in one large request + var validator = this.grainFactory.GetSystemTarget(Constants.DirectoryCacheValidatorType, silo); + + if (Log.IsEnabled(LogLevel.Trace)) Log.LogTrace("Silo {SiloAddress} is sending refresh request to silo {OwnerSilo} with {Count} entries", router.MyAddress, silo, cachedGrainAndETagList.Count); + var refreshResponse = await validator.LookUpMany(cachedGrainAndETagList); + if (Log.IsEnabled(LogLevel.Trace)) Log.LogTrace("Silo {SiloAddress} received refresh response. #Response entries {Count}.", router.MyAddress, refreshResponse.Count); int otherSiloCount = 0, updatedCount = 0, unchangedCount = 0;