From db73b391de9c1ad2a19c4abdc36dd5d600abe261 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Fri, 7 Feb 2025 11:42:37 +0100 Subject: [PATCH 01/19] Support returning latest hollow generation real-time GET requests See ES-10571 --- .../action/get/TransportGetFromTranslogAction.java | 7 ++++++- .../org/elasticsearch/index/engine/ReadOnlyEngine.java | 4 ++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java index 3cbd7497dcf39..a33377ee88927 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -81,7 +82,11 @@ protected void doExecute(Task task, Request request, ActionListener li if (engine == null) { throw new AlreadyClosedException("engine closed"); } - segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets(); + if (engine instanceof InternalEngine internalEngine) { + segmentGeneration = internalEngine.getLastUnsafeSegmentGenerationForGets(); + } else if (engine instanceof ReadOnlyEngine readOnlyEngine) { + segmentGeneration = readOnlyEngine.getLastUnsafeSegmentGenerationForGets(); + } } return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); }); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 63a4696ddb08e..6dabdec5fc944 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -636,4 +636,8 @@ public String getSearcherId() { public final String getCommitId() { return commitId; } + + public long getLastUnsafeSegmentGenerationForGets() { + return -1; + } } From 0bced2024acd93e0573e0ea4718573c1cae9248b Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Sat, 8 Feb 2025 13:39:33 +0100 Subject: [PATCH 02/19] Also support multi-get requests --- .../get/TransportShardMultiGetFomTranslogAction.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java index e953ff527f637..4c29cb76e2748 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -102,7 +103,11 @@ protected void doExecute(Task task, Request request, ActionListener li if (engine == null) { throw new AlreadyClosedException("engine closed"); } - segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets(); + if (engine instanceof InternalEngine internalEngine) { + segmentGeneration = internalEngine.getLastUnsafeSegmentGenerationForGets(); + } else if (engine instanceof ReadOnlyEngine readOnlyEngine) { + segmentGeneration = readOnlyEngine.getLastUnsafeSegmentGenerationForGets(); + } } return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); }); From 3237dfa2afcf7117d5523a3e5a371831983e9080 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Sun, 9 Feb 2025 16:48:22 +0100 Subject: [PATCH 03/19] Make TransportGetFromTranslogAction and TransportShardMultiGetFomTranslogAction extendable --- .../action/get/MultiGetShardRequest.java | 8 ++ .../action/get/MultiGetShardResponse.java | 2 +- .../get/TransportGetFromTranslogAction.java | 47 ++++----- ...ansportShardMultiGetFomTranslogAction.java | 99 ++++++++++--------- 4 files changed, 87 insertions(+), 69 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java b/server/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java index 77f7b5087c96e..b54e951a28f7b 100644 --- a/server/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java @@ -174,6 +174,14 @@ void add(int location, MultiGetRequest.Item item) { this.items.add(item); } + public List locations() { + return locations; + } + + public List items() { + return items; + } + @Override public String[] indices() { String[] indices = new String[items.size()]; diff --git a/server/src/main/java/org/elasticsearch/action/get/MultiGetShardResponse.java b/server/src/main/java/org/elasticsearch/action/get/MultiGetShardResponse.java index 6bc536fe643ae..eb1f0a25c2bb2 100644 --- a/server/src/main/java/org/elasticsearch/action/get/MultiGetShardResponse.java +++ b/server/src/main/java/org/elasticsearch/action/get/MultiGetShardResponse.java @@ -24,7 +24,7 @@ public class MultiGetShardResponse extends ActionResponse { final List responses; final List failures; - MultiGetShardResponse() { + public MultiGetShardResponse() { locations = new ArrayList<>(); responses = new ArrayList<>(); failures = new ArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java index a33377ee88927..08a5171610d70 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; -import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -59,39 +58,43 @@ public TransportGetFromTranslogAction(TransportService transportService, Indices @Override protected void doExecute(Task task, Request request, ActionListener listener) { - final GetRequest getRequest = request.getRequest(); - final ShardId shardId = request.shardId(); - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - IndexShard indexShard = indexService.getShard(shardId.id()); - assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); - assert getRequest.realtime(); + IndexShard indexShard = getIndexShard(indicesService, request); ActionListener.completeWith(listener, () -> { - var result = indexShard.getService() - .getFromTranslog( - getRequest.id(), - getRequest.storedFields(), - getRequest.realtime(), - getRequest.version(), - getRequest.versionType(), - getRequest.fetchSourceContext(), - getRequest.isForceSyntheticSource() - ); + var result = getResult(indexShard, request.getRequest()); long segmentGeneration = -1; if (result == null) { Engine engine = indexShard.getEngineOrNull(); if (engine == null) { throw new AlreadyClosedException("engine closed"); } - if (engine instanceof InternalEngine internalEngine) { - segmentGeneration = internalEngine.getLastUnsafeSegmentGenerationForGets(); - } else if (engine instanceof ReadOnlyEngine readOnlyEngine) { - segmentGeneration = readOnlyEngine.getLastUnsafeSegmentGenerationForGets(); - } + segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets(); } return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); }); } + public static IndexShard getIndexShard(IndicesService indicesService, Request request) { + final ShardId shardId = request.shardId(); + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); + assert request.getRequest().realtime(); + return indexShard; + } + + public static GetResult getResult(IndexShard indexShard, GetRequest getRequest) throws IOException { + return indexShard.getService() + .getFromTranslog( + getRequest.id(), + getRequest.storedFields(), + getRequest.realtime(), + getRequest.version(), + getRequest.versionType(), + getRequest.fetchSourceContext(), + getRequest.isForceSyntheticSource() + ); + } + public static class Request extends ActionRequest implements IndicesRequest { private final GetRequest getRequest; diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java index 4c29cb76e2748..d552fc480d0d5 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java @@ -21,10 +21,10 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; -import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -58,61 +58,68 @@ protected TransportShardMultiGetFomTranslogAction( @Override protected void doExecute(Task task, Request request, ActionListener listener) { var multiGetShardRequest = request.getMultiGetShardRequest(); - var shardId = request.getShardId(); - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - IndexShard indexShard = indexService.getShard(shardId.id()); - assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); - assert multiGetShardRequest.realtime(); + IndexShard indexShard = getIndexShard(indicesService, request); ActionListener.completeWith(listener, () -> { - var multiGetShardResponse = new MultiGetShardResponse(); - var someItemsNotFoundInTranslog = false; - for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { - var item = multiGetShardRequest.items.get(i); - try { - var result = indexShard.getService() - .getFromTranslog( - item.id(), - item.storedFields(), - multiGetShardRequest.realtime(), - item.version(), - item.versionType(), - item.fetchSourceContext(), - multiGetShardRequest.isForceSyntheticSource() - ); - GetResponse getResponse = null; - if (result == null) { - someItemsNotFoundInTranslog = true; - } else { - getResponse = new GetResponse(result); - } - multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); - } catch (RuntimeException | IOException e) { - if (TransportActions.isShardNotAvailableException(e)) { - throw e; - } - logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e); - multiGetShardResponse.add( - multiGetShardRequest.locations.get(i), - new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) - ); - } - } + Tuple multiGetShardResponse = getResponse(multiGetShardRequest, indexShard); long segmentGeneration = -1; - if (someItemsNotFoundInTranslog) { + if (multiGetShardResponse.v2()) { Engine engine = indexShard.getEngineOrNull(); if (engine == null) { throw new AlreadyClosedException("engine closed"); } - if (engine instanceof InternalEngine internalEngine) { - segmentGeneration = internalEngine.getLastUnsafeSegmentGenerationForGets(); - } else if (engine instanceof ReadOnlyEngine readOnlyEngine) { - segmentGeneration = readOnlyEngine.getLastUnsafeSegmentGenerationForGets(); - } + segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets(); } - return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); + return new Response(multiGetShardResponse.v1(), indexShard.getOperationPrimaryTerm(), segmentGeneration); }); } + public static IndexShard getIndexShard(IndicesService indicesService, Request request) { + var shardId = request.getShardId(); + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); + assert request.getMultiGetShardRequest().realtime(); + return indexShard; + } + + public static Tuple getResponse(MultiGetShardRequest multiGetShardRequest, IndexShard indexShard) + throws IOException { + var multiGetShardResponse = new MultiGetShardResponse(); + var someItemsNotFoundInTranslog = false; + for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { + var item = multiGetShardRequest.items.get(i); + try { + var result = indexShard.getService() + .getFromTranslog( + item.id(), + item.storedFields(), + multiGetShardRequest.realtime(), + item.version(), + item.versionType(), + item.fetchSourceContext(), + multiGetShardRequest.isForceSyntheticSource() + ); + GetResponse getResponse = null; + if (result == null) { + someItemsNotFoundInTranslog = true; + } else { + getResponse = new GetResponse(result); + } + multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); + } catch (RuntimeException | IOException e) { + if (TransportActions.isShardNotAvailableException(e)) { + throw e; + } + logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", multiGetShardRequest.shardId(), item.id(), e); + multiGetShardResponse.add( + multiGetShardRequest.locations.get(i), + new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) + ); + } + } + return Tuple.tuple(multiGetShardResponse, someItemsNotFoundInTranslog); + } + public static class Request extends ActionRequest { private final MultiGetShardRequest multiGetShardRequest; From 47b508375c75806e551f6f5fb84c25a90f591ceb Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Sun, 9 Feb 2025 16:48:29 +0100 Subject: [PATCH 04/19] Revert "Support returning latest hollow generation real-time GET requests" This reverts commit db73b391de9c1ad2a19c4abdc36dd5d600abe261. --- .../java/org/elasticsearch/index/engine/ReadOnlyEngine.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 6dabdec5fc944..63a4696ddb08e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -636,8 +636,4 @@ public String getSearcherId() { public final String getCommitId() { return commitId; } - - public long getLastUnsafeSegmentGenerationForGets() { - return -1; - } } From f207d156f030b0903cab8c93b772c547b4ec39a8 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Sun, 9 Feb 2025 22:19:27 +0100 Subject: [PATCH 05/19] Bring back original TransportGetFromTranslogAction and TransportShardMultiGetFomTranslogAction --- .../get/TransportGetFromTranslogAction.java | 40 ++++---- ...ansportShardMultiGetFomTranslogAction.java | 92 ++++++++----------- 2 files changed, 56 insertions(+), 76 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java index 08a5171610d70..3cbd7497dcf39 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java @@ -58,9 +58,23 @@ public TransportGetFromTranslogAction(TransportService transportService, Indices @Override protected void doExecute(Task task, Request request, ActionListener listener) { - IndexShard indexShard = getIndexShard(indicesService, request); + final GetRequest getRequest = request.getRequest(); + final ShardId shardId = request.shardId(); + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); + assert getRequest.realtime(); ActionListener.completeWith(listener, () -> { - var result = getResult(indexShard, request.getRequest()); + var result = indexShard.getService() + .getFromTranslog( + getRequest.id(), + getRequest.storedFields(), + getRequest.realtime(), + getRequest.version(), + getRequest.versionType(), + getRequest.fetchSourceContext(), + getRequest.isForceSyntheticSource() + ); long segmentGeneration = -1; if (result == null) { Engine engine = indexShard.getEngineOrNull(); @@ -73,28 +87,6 @@ protected void doExecute(Task task, Request request, ActionListener li }); } - public static IndexShard getIndexShard(IndicesService indicesService, Request request) { - final ShardId shardId = request.shardId(); - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - IndexShard indexShard = indexService.getShard(shardId.id()); - assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); - assert request.getRequest().realtime(); - return indexShard; - } - - public static GetResult getResult(IndexShard indexShard, GetRequest getRequest) throws IOException { - return indexShard.getService() - .getFromTranslog( - getRequest.id(), - getRequest.storedFields(), - getRequest.realtime(), - getRequest.version(), - getRequest.versionType(), - getRequest.fetchSourceContext(), - getRequest.isForceSyntheticSource() - ); - } - public static class Request extends ActionRequest implements IndicesRequest { private final GetRequest getRequest; diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java index d552fc480d0d5..e953ff527f637 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; @@ -58,68 +57,57 @@ protected TransportShardMultiGetFomTranslogAction( @Override protected void doExecute(Task task, Request request, ActionListener listener) { var multiGetShardRequest = request.getMultiGetShardRequest(); - IndexShard indexShard = getIndexShard(indicesService, request); + var shardId = request.getShardId(); + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); + assert multiGetShardRequest.realtime(); ActionListener.completeWith(listener, () -> { - Tuple multiGetShardResponse = getResponse(multiGetShardRequest, indexShard); + var multiGetShardResponse = new MultiGetShardResponse(); + var someItemsNotFoundInTranslog = false; + for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { + var item = multiGetShardRequest.items.get(i); + try { + var result = indexShard.getService() + .getFromTranslog( + item.id(), + item.storedFields(), + multiGetShardRequest.realtime(), + item.version(), + item.versionType(), + item.fetchSourceContext(), + multiGetShardRequest.isForceSyntheticSource() + ); + GetResponse getResponse = null; + if (result == null) { + someItemsNotFoundInTranslog = true; + } else { + getResponse = new GetResponse(result); + } + multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); + } catch (RuntimeException | IOException e) { + if (TransportActions.isShardNotAvailableException(e)) { + throw e; + } + logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e); + multiGetShardResponse.add( + multiGetShardRequest.locations.get(i), + new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) + ); + } + } long segmentGeneration = -1; - if (multiGetShardResponse.v2()) { + if (someItemsNotFoundInTranslog) { Engine engine = indexShard.getEngineOrNull(); if (engine == null) { throw new AlreadyClosedException("engine closed"); } segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets(); } - return new Response(multiGetShardResponse.v1(), indexShard.getOperationPrimaryTerm(), segmentGeneration); + return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); }); } - public static IndexShard getIndexShard(IndicesService indicesService, Request request) { - var shardId = request.getShardId(); - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - IndexShard indexShard = indexService.getShard(shardId.id()); - assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); - assert request.getMultiGetShardRequest().realtime(); - return indexShard; - } - - public static Tuple getResponse(MultiGetShardRequest multiGetShardRequest, IndexShard indexShard) - throws IOException { - var multiGetShardResponse = new MultiGetShardResponse(); - var someItemsNotFoundInTranslog = false; - for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { - var item = multiGetShardRequest.items.get(i); - try { - var result = indexShard.getService() - .getFromTranslog( - item.id(), - item.storedFields(), - multiGetShardRequest.realtime(), - item.version(), - item.versionType(), - item.fetchSourceContext(), - multiGetShardRequest.isForceSyntheticSource() - ); - GetResponse getResponse = null; - if (result == null) { - someItemsNotFoundInTranslog = true; - } else { - getResponse = new GetResponse(result); - } - multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); - } catch (RuntimeException | IOException e) { - if (TransportActions.isShardNotAvailableException(e)) { - throw e; - } - logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", multiGetShardRequest.shardId(), item.id(), e); - multiGetShardResponse.add( - multiGetShardRequest.locations.get(i), - new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) - ); - } - } - return Tuple.tuple(multiGetShardResponse, someItemsNotFoundInTranslog); - } - public static class Request extends ActionRequest { private final MultiGetShardRequest multiGetShardRequest; From 40301f27a2e8bd4299e4ee3d6acffb646d76365b Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 10 Feb 2025 10:31:31 +0100 Subject: [PATCH 06/19] Retry real-time gets for AlreadyClosedException --- .../org/elasticsearch/action/get/TransportGetAction.java | 5 ++++- .../action/get/TransportShardMultiGetAction.java | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 83189198cd4d4..17d170b819c4c 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -230,7 +231,9 @@ private void getFromTranslog( final var retryingListener = listener.delegateResponse((l, e) -> { final var cause = ExceptionsHelper.unwrapCause(e); logger.debug("get_from_translog failed", cause); - if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) { + if (cause instanceof ShardNotFoundException + || cause instanceof IndexNotFoundException + || cause instanceof AlreadyClosedException) { logger.debug("retrying get_from_translog"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 2850ee675a1af..0aaf27993c8d8 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -211,7 +212,9 @@ private void shardMultiGetFromTranslog( final var retryingListener = listener.delegateResponse((l, e) -> { final var cause = ExceptionsHelper.unwrapCause(e); logger.debug("mget_from_translog[shard] failed", cause); - if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) { + if (cause instanceof ShardNotFoundException + || cause instanceof IndexNotFoundException + || cause instanceof AlreadyClosedException) { logger.debug("retrying mget_from_translog[shard]"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override From ff0c1915b71b2fd5335296ace4035bc0150275ba Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 10 Feb 2025 15:21:28 +0100 Subject: [PATCH 07/19] Correctly check index service in case index is not a different node --- .../java/org/elasticsearch/action/get/TransportGetAction.java | 4 +++- .../action/get/TransportShardMultiGetAction.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 17d170b819c4c..21f1b3b1cec11 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -166,7 +166,9 @@ protected Executor getExecutor(GetRequest request, ShardId shardId) { final ClusterState clusterState = clusterService.state(); if (clusterState.metadata().getIndexSafe(shardId.getIndex()).isSystem()) { return threadPool.executor(executorSelector.executorForGet(shardId.getIndexName())); - } else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) { + } + var indexService = indicesService.indexService(shardId.getIndex()); + if (indexService != null && indexService.getIndexSettings().isSearchThrottled()) { return threadPool.executor(ThreadPool.Names.SEARCH_THROTTLED); } else { return super.getExecutor(request, shardId); diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 0aaf27993c8d8..98efe4b392ea6 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -155,7 +155,9 @@ protected Executor getExecutor(MultiGetShardRequest request, ShardId shardId) { final ClusterState clusterState = clusterService.state(); if (clusterState.metadata().index(shardId.getIndex()).isSystem()) { return threadPool.executor(executorSelector.executorForGet(shardId.getIndexName())); - } else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) { + } + var indexService = indicesService.indexService(shardId.getIndex()); + if (indexService != null && indexService.getIndexSettings().isSearchThrottled()) { return threadPool.executor(ThreadPool.Names.SEARCH_THROTTLED); } else { return super.getExecutor(request, shardId); From e8b47d25ab50fc3b98826f33f35d23ff27b175ef Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 10 Feb 2025 16:33:36 +0100 Subject: [PATCH 08/19] Remove access to multi get request items --- .../org/elasticsearch/action/get/MultiGetShardRequest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java b/server/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java index b54e951a28f7b..8dafac98a5cfb 100644 --- a/server/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java @@ -178,10 +178,6 @@ public List locations() { return locations; } - public List items() { - return items; - } - @Override public String[] indices() { String[] indices = new String[items.size()]; From 195f5fb614483afd6463346229eb9dd222598c10 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Fri, 7 Feb 2025 11:42:37 +0100 Subject: [PATCH 09/19] Support returning latest hollow generation real-time GET requests See ES-10571 --- .../action/get/TransportGetFromTranslogAction.java | 7 ++++++- .../org/elasticsearch/index/engine/ReadOnlyEngine.java | 4 ++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java index 3cbd7497dcf39..a33377ee88927 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -81,7 +82,11 @@ protected void doExecute(Task task, Request request, ActionListener li if (engine == null) { throw new AlreadyClosedException("engine closed"); } - segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets(); + if (engine instanceof InternalEngine internalEngine) { + segmentGeneration = internalEngine.getLastUnsafeSegmentGenerationForGets(); + } else if (engine instanceof ReadOnlyEngine readOnlyEngine) { + segmentGeneration = readOnlyEngine.getLastUnsafeSegmentGenerationForGets(); + } } return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); }); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 63a4696ddb08e..6dabdec5fc944 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -636,4 +636,8 @@ public String getSearcherId() { public final String getCommitId() { return commitId; } + + public long getLastUnsafeSegmentGenerationForGets() { + return -1; + } } From 0ca0c78e8a51512db8f3b32005c29781bbf4ffa1 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 10 Feb 2025 18:34:18 +0100 Subject: [PATCH 10/19] Throw UnsupportedOperationExceptions for `getLastUnsafeSegmentGenerationForGets` --- .../src/main/java/org/elasticsearch/index/engine/Engine.java | 4 ++++ .../java/org/elasticsearch/index/engine/ReadOnlyEngine.java | 4 ---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 36fd18144ad6e..0589741a70281 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -2346,4 +2346,8 @@ public record FlushResult(boolean flushPerformed, long generation) { public void prepareForEngineReset() throws IOException { throw new UnsupportedOperationException("does not support engine reset"); } + + public long getLastUnsafeSegmentGenerationForGets() { + throw new UnsupportedOperationException("Doesn't support getting the latest segment generation"); + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 6dabdec5fc944..63a4696ddb08e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -636,8 +636,4 @@ public String getSearcherId() { public final String getCommitId() { return commitId; } - - public long getLastUnsafeSegmentGenerationForGets() { - return -1; - } } From 87240c5626495cad4de76bf8461e74e9c902565e Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Sat, 8 Feb 2025 13:39:33 +0100 Subject: [PATCH 11/19] Also support multi-get requests --- .../get/TransportShardMultiGetFomTranslogAction.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java index e953ff527f637..4c29cb76e2748 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -102,7 +103,11 @@ protected void doExecute(Task task, Request request, ActionListener li if (engine == null) { throw new AlreadyClosedException("engine closed"); } - segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets(); + if (engine instanceof InternalEngine internalEngine) { + segmentGeneration = internalEngine.getLastUnsafeSegmentGenerationForGets(); + } else if (engine instanceof ReadOnlyEngine readOnlyEngine) { + segmentGeneration = readOnlyEngine.getLastUnsafeSegmentGenerationForGets(); + } } return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); }); From 25e5e9d98e66fb3f67b71d145f0cd2b03ee065da Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 10 Feb 2025 22:06:22 +0100 Subject: [PATCH 12/19] Call getLastUnsafeSegmentGenerationForGets directly on Engine --- .../action/get/TransportGetFromTranslogAction.java | 7 +------ .../get/TransportShardMultiGetFomTranslogAction.java | 8 +------- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java index a33377ee88927..6fc1ff5300101 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; -import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -82,11 +81,7 @@ protected void doExecute(Task task, Request request, ActionListener li if (engine == null) { throw new AlreadyClosedException("engine closed"); } - if (engine instanceof InternalEngine internalEngine) { - segmentGeneration = internalEngine.getLastUnsafeSegmentGenerationForGets(); - } else if (engine instanceof ReadOnlyEngine readOnlyEngine) { - segmentGeneration = readOnlyEngine.getLastUnsafeSegmentGenerationForGets(); - } + segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); } return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); }); diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java index 4c29cb76e2748..ec0b5c6cf143f 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java @@ -23,8 +23,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.InternalEngine; -import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -103,11 +101,7 @@ protected void doExecute(Task task, Request request, ActionListener li if (engine == null) { throw new AlreadyClosedException("engine closed"); } - if (engine instanceof InternalEngine internalEngine) { - segmentGeneration = internalEngine.getLastUnsafeSegmentGenerationForGets(); - } else if (engine instanceof ReadOnlyEngine readOnlyEngine) { - segmentGeneration = readOnlyEngine.getLastUnsafeSegmentGenerationForGets(); - } + segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); } return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); }); From ed8c90206a9b1e57a5b65f861577350fbb5b4c45 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 10 Feb 2025 22:07:35 +0100 Subject: [PATCH 13/19] Revert "Make TransportGetFromTranslogAction and TransportShardMultiGetFomTranslogAction extendable" This reverts commit 3237dfa2 --- .../org/elasticsearch/action/get/MultiGetShardRequest.java | 4 ---- .../org/elasticsearch/action/get/MultiGetShardResponse.java | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java b/server/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java index 8dafac98a5cfb..77f7b5087c96e 100644 --- a/server/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java @@ -174,10 +174,6 @@ void add(int location, MultiGetRequest.Item item) { this.items.add(item); } - public List locations() { - return locations; - } - @Override public String[] indices() { String[] indices = new String[items.size()]; diff --git a/server/src/main/java/org/elasticsearch/action/get/MultiGetShardResponse.java b/server/src/main/java/org/elasticsearch/action/get/MultiGetShardResponse.java index eb1f0a25c2bb2..6bc536fe643ae 100644 --- a/server/src/main/java/org/elasticsearch/action/get/MultiGetShardResponse.java +++ b/server/src/main/java/org/elasticsearch/action/get/MultiGetShardResponse.java @@ -24,7 +24,7 @@ public class MultiGetShardResponse extends ActionResponse { final List responses; final List failures; - public MultiGetShardResponse() { + MultiGetShardResponse() { locations = new ArrayList<>(); responses = new ArrayList<>(); failures = new ArrayList<>(); From 4806d1133d08ddb708b03937de139a2f03aa33ed Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 11 Feb 2025 07:54:17 +0100 Subject: [PATCH 14/19] Retry getTranslog calls if we encountered `AlreadyClosedException` and didn't get a cluster update --- .../org/elasticsearch/action/get/TransportGetAction.java | 7 ++++++- .../action/get/TransportShardMultiGetAction.java | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 21f1b3b1cec11..0c14a228549d4 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -250,7 +250,12 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause)); + if (cause instanceof AlreadyClosedException) { + // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update + getFromTranslog(request, indexShard, state, observer, l); + } else { + l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause)); + } } }); } else { diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 98efe4b392ea6..f35d737941d2a 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -231,7 +231,12 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause)); + if (cause instanceof AlreadyClosedException) { + // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update + shardMultiGetFromTranslog(request, indexShard, state, observer, l); + } else { + l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause)); + } } }); } else { From b965926df341c7d9a40e2f626cdc0a9b7e946f2c Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 11 Feb 2025 10:41:31 +0100 Subject: [PATCH 15/19] Revert "Correctly check index service in case index is not a different node" This reverts commit ff0c1915b71b2fd5335296ace4035bc0150275ba. --- .../java/org/elasticsearch/action/get/TransportGetAction.java | 4 +--- .../action/get/TransportShardMultiGetAction.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 0c14a228549d4..e41d7e31f8693 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -166,9 +166,7 @@ protected Executor getExecutor(GetRequest request, ShardId shardId) { final ClusterState clusterState = clusterService.state(); if (clusterState.metadata().getIndexSafe(shardId.getIndex()).isSystem()) { return threadPool.executor(executorSelector.executorForGet(shardId.getIndexName())); - } - var indexService = indicesService.indexService(shardId.getIndex()); - if (indexService != null && indexService.getIndexSettings().isSearchThrottled()) { + } else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) { return threadPool.executor(ThreadPool.Names.SEARCH_THROTTLED); } else { return super.getExecutor(request, shardId); diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index f35d737941d2a..3905ec2b354c2 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -155,9 +155,7 @@ protected Executor getExecutor(MultiGetShardRequest request, ShardId shardId) { final ClusterState clusterState = clusterService.state(); if (clusterState.metadata().index(shardId.getIndex()).isSystem()) { return threadPool.executor(executorSelector.executorForGet(shardId.getIndexName())); - } - var indexService = indicesService.indexService(shardId.getIndex()); - if (indexService != null && indexService.getIndexSettings().isSearchThrottled()) { + } else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) { return threadPool.executor(ThreadPool.Names.SEARCH_THROTTLED); } else { return super.getExecutor(request, shardId); From 847bf5c91efb851155788bd5b605917dd0049cfd Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 10 Feb 2025 15:21:28 +0100 Subject: [PATCH 16/19] Correctly check index service in case index is not a different node --- .../java/org/elasticsearch/action/get/TransportGetAction.java | 4 +++- .../action/get/TransportShardMultiGetAction.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index e41d7e31f8693..0c14a228549d4 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -166,7 +166,9 @@ protected Executor getExecutor(GetRequest request, ShardId shardId) { final ClusterState clusterState = clusterService.state(); if (clusterState.metadata().getIndexSafe(shardId.getIndex()).isSystem()) { return threadPool.executor(executorSelector.executorForGet(shardId.getIndexName())); - } else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) { + } + var indexService = indicesService.indexService(shardId.getIndex()); + if (indexService != null && indexService.getIndexSettings().isSearchThrottled()) { return threadPool.executor(ThreadPool.Names.SEARCH_THROTTLED); } else { return super.getExecutor(request, shardId); diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index c5fc71878e10e..9b208df0e9774 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -155,7 +155,9 @@ protected Executor getExecutor(MultiGetShardRequest request, ShardId shardId) { final ClusterState clusterState = clusterService.state(); if (clusterState.metadata().index(shardId.getIndex()).isSystem()) { return threadPool.executor(executorSelector.executorForGet(shardId.getIndexName())); - } else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) { + } + var indexService = indicesService.indexService(shardId.getIndex()); + if (indexService != null && indexService.getIndexSettings().isSearchThrottled()) { return threadPool.executor(ThreadPool.Names.SEARCH_THROTTLED); } else { return super.getExecutor(request, shardId); From 4bdcafbb90dd6012310fa818f684148144af0452 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 11 Feb 2025 12:27:56 +0100 Subject: [PATCH 17/19] Use try versions of get*Translog calls so we don't retry forever --- .../java/org/elasticsearch/action/get/TransportGetAction.java | 2 +- .../elasticsearch/action/get/TransportShardMultiGetAction.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 0c14a228549d4..277f2fb1664f1 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -252,7 +252,7 @@ public void onClusterServiceClose() { public void onTimeout(TimeValue timeout) { if (cause instanceof AlreadyClosedException) { // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update - getFromTranslog(request, indexShard, state, observer, l); + tryGetFromTranslog(request, indexShard, node, l); } else { l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause)); } diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 9b208df0e9774..572cc677ebab6 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -233,7 +233,7 @@ public void onClusterServiceClose() { public void onTimeout(TimeValue timeout) { if (cause instanceof AlreadyClosedException) { // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update - shardMultiGetFromTranslog(request, indexShard, state, observer, l); + tryShardMultiGetFromTranslog(request, indexShard, node, l); } else { l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause)); } From 01b81bb6a71f1a26caf1a7c973e1f4ed22538b2d Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 11 Feb 2025 14:45:28 +0100 Subject: [PATCH 18/19] Add TODOs for ES-10826 --- .../java/org/elasticsearch/action/get/TransportGetAction.java | 2 ++ .../elasticsearch/action/get/TransportShardMultiGetAction.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 277f2fb1664f1..8b4631b41828f 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -236,6 +236,7 @@ private void getFromTranslog( if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException || cause instanceof AlreadyClosedException) { + // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 logger.debug("retrying get_from_translog"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -250,6 +251,7 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { + // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 if (cause instanceof AlreadyClosedException) { // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update tryGetFromTranslog(request, indexShard, node, l); diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 572cc677ebab6..52c72ac981fea 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -217,6 +217,7 @@ private void shardMultiGetFromTranslog( if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException || cause instanceof AlreadyClosedException) { + // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 logger.debug("retrying mget_from_translog[shard]"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -231,6 +232,7 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { + // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 if (cause instanceof AlreadyClosedException) { // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update tryShardMultiGetFromTranslog(request, indexShard, node, l); From 8d4ab8e4bbf99c16816e6f4068f1153c85528941 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 11 Feb 2025 16:24:29 +0100 Subject: [PATCH 19/19] Revert "Correctly check index service in case index is not a different node" This reverts commit 847bf5c91efb851155788bd5b605917dd0049cfd. --- .../java/org/elasticsearch/action/get/TransportGetAction.java | 4 +--- .../action/get/TransportShardMultiGetAction.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 8b4631b41828f..96317069b40f1 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -166,9 +166,7 @@ protected Executor getExecutor(GetRequest request, ShardId shardId) { final ClusterState clusterState = clusterService.state(); if (clusterState.metadata().getIndexSafe(shardId.getIndex()).isSystem()) { return threadPool.executor(executorSelector.executorForGet(shardId.getIndexName())); - } - var indexService = indicesService.indexService(shardId.getIndex()); - if (indexService != null && indexService.getIndexSettings().isSearchThrottled()) { + } else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) { return threadPool.executor(ThreadPool.Names.SEARCH_THROTTLED); } else { return super.getExecutor(request, shardId); diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 52c72ac981fea..3a66db14decdb 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -155,9 +155,7 @@ protected Executor getExecutor(MultiGetShardRequest request, ShardId shardId) { final ClusterState clusterState = clusterService.state(); if (clusterState.metadata().index(shardId.getIndex()).isSystem()) { return threadPool.executor(executorSelector.executorForGet(shardId.getIndexName())); - } - var indexService = indicesService.indexService(shardId.getIndex()); - if (indexService != null && indexService.getIndexSettings().isSearchThrottled()) { + } else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) { return threadPool.executor(ThreadPool.Names.SEARCH_THROTTLED); } else { return super.getExecutor(request, shardId);