Skip to content

Commit

Permalink
Fail primary term and generation listeners on a closed shard (#122713)
Browse files Browse the repository at this point in the history
If a shard has been closed, we should quickly bail out and fail
all waiting primary term and generation listeners. Otherwise,
the engine implementation may try to successfully to complete the
provided listeners and perform operations on an already closed
shard and cause some unexpected errors.
  • Loading branch information
arteam authored Feb 19, 2025
1 parent f220aba commit 8e34393
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4491,14 +4491,17 @@ public void waitForEngineOrClosedShard(ActionListener<Void> listener) {
}

/**
* Registers a listener for an event when the shard advances to the provided primary term and segment generation
* Registers a listener for an event when the shard advances to the provided primary term and segment generation.
* Completes the listener with a {@link IndexShardClosedException} if the shard is closed.
*/
public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGeneration, ActionListener<Long> listener) {
waitForEngineOrClosedShard(
listener.delegateFailureAndWrap(
(l, ignored) -> getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l)
)
);
waitForEngineOrClosedShard(listener.delegateFailureAndWrap((l, ignored) -> {
if (state == IndexShardState.CLOSED) {
l.onFailure(new IndexShardClosedException(shardId));
} else {
getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l);
}
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Constants;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
Expand Down Expand Up @@ -3334,6 +3335,21 @@ public void testWaitForClosedListener() throws IOException {
assertThat("listener should have been called", called.get(), equalTo(true));
}

public void testWaitForPrimaryTermAndGenerationFailsForClosedShard() throws IOException {
Settings settings = indexSettings(IndexVersion.current(), 1, 1).build();
IndexMetadata metadata = IndexMetadata.builder("test").putMapping("""
{ "properties": { "foo": { "type": "text"}}}""").settings(settings).primaryTerm(0, 1).build();
IndexShard initializingShard = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null);

var future = new PlainActionFuture<Long>();
initializingShard.waitForPrimaryTermAndGeneration(0L, 0L, future);

assertFalse("waitForPrimaryTermAndGeneration should be waiting", future.isDone());
closeShards(initializingShard);
// Should bail out earlier without calling the engine
assertNotNull(ExceptionsHelper.unwrap(expectThrows(Exception.class, future::get), IndexShardClosedException.class));
}

public void testRecoverFromLocalShard() throws IOException {
Settings settings = indexSettings(IndexVersion.current(), 1, 1).build();
IndexMetadata metadata = IndexMetadata.builder("source")
Expand Down

0 comments on commit 8e34393

Please sign in to comment.