Skip to content

Commit

Permalink
Support real-time gets on hollow shards (#122012)
Browse files Browse the repository at this point in the history
* Support returning `getLastUnsafeSegmentGenerationForGets` for any `Engine`, not only `InternalEngine`
* Retry real-time gets on `AlreadyClosedException` in case a shard's engine gets swapped.

See ES-10571
  • Loading branch information
arteam authored Feb 11, 2025
1 parent 2b9d7f6 commit ecda919
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -230,7 +231,10 @@ private void getFromTranslog(
final var retryingListener = listener.delegateResponse((l, e) -> {
final var cause = ExceptionsHelper.unwrapCause(e);
logger.debug("get_from_translog failed", cause);
if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) {
if (cause instanceof ShardNotFoundException
|| cause instanceof IndexNotFoundException
|| cause instanceof AlreadyClosedException) {
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
logger.debug("retrying get_from_translog");
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
Expand All @@ -245,7 +249,13 @@ public void onClusterServiceClose() {

@Override
public void onTimeout(TimeValue timeout) {
l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause));
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
if (cause instanceof AlreadyClosedException) {
// Do an additional retry just in case AlreadyClosedException didn't generate a cluster update
tryGetFromTranslog(request, indexShard, node, l);
} else {
l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause));
}
}
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
if (engine == null) {
throw new AlreadyClosedException("engine closed");
}
segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets();
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
}
return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -211,7 +212,10 @@ private void shardMultiGetFromTranslog(
final var retryingListener = listener.delegateResponse((l, e) -> {
final var cause = ExceptionsHelper.unwrapCause(e);
logger.debug("mget_from_translog[shard] failed", cause);
if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) {
if (cause instanceof ShardNotFoundException
|| cause instanceof IndexNotFoundException
|| cause instanceof AlreadyClosedException) {
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
logger.debug("retrying mget_from_translog[shard]");
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
Expand All @@ -226,7 +230,13 @@ public void onClusterServiceClose() {

@Override
public void onTimeout(TimeValue timeout) {
l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause));
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
if (cause instanceof AlreadyClosedException) {
// Do an additional retry just in case AlreadyClosedException didn't generate a cluster update
tryShardMultiGetFromTranslog(request, indexShard, node, l);
} else {
l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause));
}
}
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -102,7 +101,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
if (engine == null) {
throw new AlreadyClosedException("engine closed");
}
segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets();
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
}
return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2346,4 +2346,8 @@ public record FlushResult(boolean flushPerformed, long generation) {
public void prepareForEngineReset() throws IOException {
throw new UnsupportedOperationException("does not support engine reset");
}

public long getLastUnsafeSegmentGenerationForGets() {
throw new UnsupportedOperationException("Doesn't support getting the latest segment generation");
}
}

0 comments on commit ecda919

Please sign in to comment.