From e5313cdb55ad2b244d978f82d0694b4f08f2998b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Wed, 5 Feb 2025 16:50:53 +0100 Subject: [PATCH] Track queued merges in ElasticsearchMergeScheduler and InternalEngine This commit adds tracking for merges that are queued for future execution. Relates ES-10570 --- .../ElasticsearchConcurrentMergeScheduler.java | 5 +++++ .../engine/ElasticsearchMergeScheduler.java | 2 ++ .../index/engine/InternalEngine.java | 8 ++++++++ .../index/engine/MergeTracking.java | 16 ++++++++++++++++ 4 files changed, 31 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java index 90f8e6adab73d..3f4d853add963 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java @@ -59,6 +59,11 @@ public Set onGoingMerges() { return mergeTracking.onGoingMerges(); } + @Override + public Set queuedMerges() { + return mergeTracking.queuedMerges(); + } + /** We're currently only interested in messages with this prefix. */ private static final String MERGE_THREAD_MESSAGE_PREFIX = "merge thread"; diff --git a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchMergeScheduler.java index ac72c7a21da75..69d9af3848325 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchMergeScheduler.java @@ -19,6 +19,8 @@ public interface ElasticsearchMergeScheduler { Set onGoingMerges(); + Set queuedMerges(); + MergeStats stats(); void refreshConfig(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7f6fe40dbaaf0..9106b307bf60d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -3006,6 +3006,14 @@ public MergeStats getMergeStats() { return mergeScheduler.stats(); } + public boolean hasQueuedOrOnGoingMerges() { + return hasQueuedMerges() || mergeScheduler.onGoingMerges().isEmpty() == false; + } + + public boolean hasQueuedMerges() { + return mergeScheduler.queuedMerges().isEmpty() == false; + } + protected LocalCheckpointTracker getLocalCheckpointTracker() { return localCheckpointTracker; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/MergeTracking.java b/server/src/main/java/org/elasticsearch/index/engine/MergeTracking.java index 3f52b607cf356..3442afd994f29 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/MergeTracking.java +++ b/server/src/main/java/org/elasticsearch/index/engine/MergeTracking.java @@ -41,6 +41,9 @@ public class MergeTracking { private final Set onGoingMerges = ConcurrentCollections.newConcurrentSet(); private final Set readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges); + private final Set queuedMerges = ConcurrentCollections.newConcurrentSet(); + private final Set readOnlyQueuedMerges = Collections.unmodifiableSet(queuedMerges); + public MergeTracking(Logger logger, DoubleSupplier mbPerSecAutoThrottle) { this.logger = logger; this.mbPerSecAutoThrottle = mbPerSecAutoThrottle; @@ -50,6 +53,18 @@ public Set onGoingMerges() { return readOnlyOnGoingMerges; } + public Set queuedMerges() { + return readOnlyQueuedMerges; + } + + public void markMergeQueued(OnGoingMerge merge) { + queuedMerges.add(merge); + } + + public void unmarkMergeQueued(OnGoingMerge merge) { + queuedMerges.remove(merge); + } + public void mergeStarted(OnGoingMerge onGoingMerge) { MergePolicy.OneMerge merge = onGoingMerge.getMerge(); int totalNumDocs = merge.totalNumDocs(); @@ -57,6 +72,7 @@ public void mergeStarted(OnGoingMerge onGoingMerge) { currentMerges.inc(); currentMergesNumDocs.inc(totalNumDocs); currentMergesSizeInBytes.inc(totalSizeInBytes); + unmarkMergeQueued(onGoingMerge); onGoingMerges.add(onGoingMerge); if (logger.isTraceEnabled()) {