Skip to content

Commit

Permalink
Refactor merge scheduling code to allow overrides (#114547)
Browse files Browse the repository at this point in the history
This code refactors how the merge scheduler is configured to allow
different engine implementations to configure different merge schedulers.
  • Loading branch information
Tim-Brooks authored Oct 14, 2024
1 parent 35fd893 commit a8de554
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.util.SameThreadExecutorService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
Expand All @@ -29,32 +25,20 @@
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.Collections;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.Executor;

/**
* An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total
* and current merges.
*/
class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
public class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler implements ElasticsearchMergeScheduler {

protected final Logger logger;
private final Settings indexSettings;
private final ShardId shardId;

private final MeanMetric totalMerges = new MeanMetric();
private final CounterMetric totalMergesNumDocs = new CounterMetric();
private final CounterMetric totalMergesSizeInBytes = new CounterMetric();
private final CounterMetric currentMerges = new CounterMetric();
private final CounterMetric currentMergesNumDocs = new CounterMetric();
private final CounterMetric currentMergesSizeInBytes = new CounterMetric();
private final CounterMetric totalMergeStoppedTime = new CounterMetric();
private final CounterMetric totalMergeThrottledTime = new CounterMetric();

private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
private final MergeTracking mergeTracking;
private final MergeSchedulerConfig config;
private final SameThreadExecutorService sameThreadExecutorService = new SameThreadExecutorService();

Expand All @@ -63,11 +47,16 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
this.shardId = shardId;
this.indexSettings = indexSettings.getSettings();
this.logger = Loggers.getLogger(getClass(), shardId);
this.mergeTracking = new MergeTracking(
logger,
() -> indexSettings.getMergeSchedulerConfig().isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY
);
refreshConfig();
}

@Override
public Set<OnGoingMerge> onGoingMerges() {
return readOnlyOnGoingMerges;
return mergeTracking.onGoingMerges();
}

/** We're currently only interested in messages with this prefix. */
Expand Down Expand Up @@ -104,74 +93,21 @@ protected void message(String message) {
super.message(message);
}

private static String getSegmentName(MergePolicy.OneMerge merge) {
return merge.getMergeInfo() != null ? merge.getMergeInfo().info.name : "_na_";
}

@Override
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
int totalNumDocs = merge.totalNumDocs();
long totalSizeInBytes = merge.totalBytesSize();
long timeNS = System.nanoTime();
currentMerges.inc();
currentMergesNumDocs.inc(totalNumDocs);
currentMergesSizeInBytes.inc(totalSizeInBytes);

OnGoingMerge onGoingMerge = new OnGoingMerge(merge);
onGoingMerges.add(onGoingMerge);

if (logger.isTraceEnabled()) {
logger.trace(
"merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size",
getSegmentName(merge),
merge.segments.size(),
totalNumDocs,
ByteSizeValue.ofBytes(totalSizeInBytes),
ByteSizeValue.ofBytes(merge.estimatedMergeBytes)
);
}
mergeTracking.mergeStarted(onGoingMerge);
try {
beforeMerge(onGoingMerge);
super.doMerge(mergeSource, merge);
} finally {
long tookMS = TimeValue.nsecToMSec(System.nanoTime() - timeNS);
mergeTracking.mergeFinished(merge, onGoingMerge, tookMS);

onGoingMerges.remove(onGoingMerge);
afterMerge(onGoingMerge);

currentMerges.dec();
currentMergesNumDocs.dec(totalNumDocs);
currentMergesSizeInBytes.dec(totalSizeInBytes);

totalMergesNumDocs.inc(totalNumDocs);
totalMergesSizeInBytes.inc(totalSizeInBytes);
totalMerges.inc(tookMS);
long stoppedMS = TimeValue.nsecToMSec(
merge.getMergeProgress().getPauseTimes().get(MergePolicy.OneMergeProgress.PauseReason.STOPPED)
);
long throttledMS = TimeValue.nsecToMSec(
merge.getMergeProgress().getPauseTimes().get(MergePolicy.OneMergeProgress.PauseReason.PAUSED)
);
totalMergeStoppedTime.inc(stoppedMS);
totalMergeThrottledTime.inc(throttledMS);

String message = String.format(
Locale.ROOT,
"merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs], [%s stopped], [%s throttled]",
getSegmentName(merge),
TimeValue.timeValueMillis(tookMS),
totalSizeInBytes / 1024f / 1024f,
totalNumDocs,
TimeValue.timeValueMillis(stoppedMS),
TimeValue.timeValueMillis(throttledMS)
);

if (tookMS > 20000) { // if more than 20 seconds, DEBUG log it
logger.debug("{}", message);
} else if (logger.isTraceEnabled()) {
logger.trace("{}", message);
}
}

}

/**
Expand Down Expand Up @@ -206,24 +142,13 @@ protected MergeThread getMergeThread(MergeSource mergeSource, MergePolicy.OneMer
return thread;
}

MergeStats stats() {
final MergeStats mergeStats = new MergeStats();
mergeStats.add(
totalMerges.count(),
totalMerges.sum(),
totalMergesNumDocs.count(),
totalMergesSizeInBytes.count(),
currentMerges.count(),
currentMergesNumDocs.count(),
currentMergesSizeInBytes.count(),
totalMergeStoppedTime.count(),
totalMergeThrottledTime.count(),
config.isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY
);
return mergeStats;
@Override
public MergeStats stats() {
return mergeTracking.stats();
}

void refreshConfig() {
@Override
public void refreshConfig() {
if (this.getMaxMergeCount() != config.getMaxMergeCount() || this.getMaxThreadCount() != config.getMaxThreadCount()) {
this.setMaxMergesAndThreads(config.getMaxMergeCount(), config.getMaxThreadCount());
}
Expand All @@ -234,4 +159,9 @@ void refreshConfig() {
disableAutoIOThrottle();
}
}

@Override
public MergeScheduler getMergeScheduler() {
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.MergeScheduler;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;

import java.util.Set;

public interface ElasticsearchMergeScheduler {

Set<OnGoingMerge> onGoingMerges();

MergeStats stats();

void refreshConfig();

MergeScheduler getMergeScheduler();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
Expand Down Expand Up @@ -139,7 +140,7 @@ public class InternalEngine extends Engine {
private volatile long lastDeleteVersionPruneTimeMSec;

private final Translog translog;
private final ElasticsearchConcurrentMergeScheduler mergeScheduler;
private final ElasticsearchMergeScheduler mergeScheduler;

private final IndexWriter indexWriter;

Expand Down Expand Up @@ -248,11 +249,12 @@ public InternalEngine(EngineConfig engineConfig) {
Translog translog = null;
ExternalReaderManager externalReaderManager = null;
ElasticsearchReaderManager internalReaderManager = null;
EngineMergeScheduler scheduler = null;
MergeScheduler scheduler = null;
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
mergeScheduler = createMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
scheduler = mergeScheduler.getMergeScheduler();
throttle = new IndexThrottle();
try {
store.trimUnsafeCommits(config().getTranslogConfig().getTranslogPath());
Expand Down Expand Up @@ -383,7 +385,7 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {

@Nullable
private CombinedDeletionPolicy.CommitsListener newCommitsListener() {
Engine.IndexCommitListener listener = engineConfig.getIndexCommitListener();
IndexCommitListener listener = engineConfig.getIndexCommitListener();
if (listener != null) {
final IndexCommitListener wrappedListener = Assertions.ENABLED ? assertingCommitsOrderListener(listener) : listener;
return new CombinedDeletionPolicy.CommitsListener() {
Expand Down Expand Up @@ -824,7 +826,7 @@ private GetResult getFromTranslog(
config(),
translogInMemorySegmentsCount::incrementAndGet
);
final Engine.Searcher searcher = new Engine.Searcher(
final Searcher searcher = new Searcher(
"realtime_get",
ElasticsearchDirectoryReader.wrap(inMemoryReader, shardId),
config().getSimilarity(),
Expand All @@ -841,7 +843,7 @@ public GetResult get(
Get get,
MappingLookup mappingLookup,
DocumentParser documentParser,
Function<Engine.Searcher, Engine.Searcher> searcherWrapper
Function<Searcher, Searcher> searcherWrapper
) {
try (var ignored = acquireEnsureOpenRef()) {
if (get.realtime()) {
Expand Down Expand Up @@ -875,7 +877,7 @@ protected GetResult realtimeGetUnderLock(
Get get,
MappingLookup mappingLookup,
DocumentParser documentParser,
Function<Engine.Searcher, Engine.Searcher> searcherWrapper,
Function<Searcher, Searcher> searcherWrapper,
boolean getFromSearcher
) {
assert isDrainedForClose() == false;
Expand Down Expand Up @@ -1098,7 +1100,7 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
return true;
}

private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
private boolean assertIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) {
if (origin == Operation.Origin.PRIMARY) {
assert assertPrimaryIncomingSequenceNumber(origin, seqNo);
} else {
Expand All @@ -1108,7 +1110,7 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi
return true;
}

protected boolean assertPrimaryIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
protected boolean assertPrimaryIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) {
// sequence number should not be set when operation origin is primary
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
: "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
Expand Down Expand Up @@ -2700,7 +2702,7 @@ private IndexWriterConfig getIndexWriterConfig() {
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
iwc.setInfoStream(TESTS_VERBOSE ? InfoStream.getDefault() : new LoggerInfoStream(logger));
iwc.setMergeScheduler(mergeScheduler);
iwc.setMergeScheduler(mergeScheduler.getMergeScheduler());
// Give us the opportunity to upgrade old segments while performing
// background merges
MergePolicy mergePolicy = config().getMergePolicy();
Expand Down Expand Up @@ -2753,7 +2755,7 @@ private IndexWriterConfig getIndexWriterConfig() {

/** A listener that warms the segments if needed when acquiring a new reader */
static final class RefreshWarmerListener implements BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> {
private final Engine.Warmer warmer;
private final Warmer warmer;
private final Logger logger;
private final AtomicBoolean isEngineClosed;

Expand Down Expand Up @@ -2817,6 +2819,10 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() {
return indexWriter.getConfig();
}

protected ElasticsearchMergeScheduler createMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
return new EngineMergeScheduler(shardId, indexSettings);
}

private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeScheduler {
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
private final AtomicBoolean isThrottling = new AtomicBoolean();
Expand All @@ -2827,7 +2833,7 @@ private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeSch

@Override
public synchronized void beforeMerge(OnGoingMerge merge) {
int maxNumMerges = mergeScheduler.getMaxMergeCount();
int maxNumMerges = getMaxMergeCount();
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
if (isThrottling.getAndSet(true) == false) {
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
Expand All @@ -2838,7 +2844,7 @@ public synchronized void beforeMerge(OnGoingMerge merge) {

@Override
public synchronized void afterMerge(OnGoingMerge merge) {
int maxNumMerges = mergeScheduler.getMaxMergeCount();
int maxNumMerges = getMaxMergeCount();
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
if (isThrottling.getAndSet(false)) {
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
Expand Down Expand Up @@ -2876,25 +2882,29 @@ protected void doRun() {

@Override
protected void handleMergeException(final Throwable exc) {
engineConfig.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug("merge failure action rejected", e);
}

@Override
protected void doRun() throws Exception {
/*
* We do this on another thread rather than the merge thread that we are initially called on so that we have complete
* confidence that the call stack does not contain catch statements that would cause the error that might be thrown
* here from being caught and never reaching the uncaught exception handler.
*/
failEngine("merge failed", new MergePolicy.MergeException(exc));
}
});
mergeException(exc);
}
}

protected void mergeException(final Throwable exc) {
engineConfig.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug("merge failure action rejected", e);
}

@Override
protected void doRun() throws Exception {
/*
* We do this on another thread rather than the merge thread that we are initially called on so that we have complete
* confidence that the call stack does not contain catch statements that would cause the error that might be thrown
* here from being caught and never reaching the uncaught exception handler.
*/
failEngine("merge failed", new MergePolicy.MergeException(exc));
}
});
}

/**
* Commits the specified index writer.
*
Expand Down
Loading

0 comments on commit a8de554

Please sign in to comment.