From 1bc218a651635e3893b4b7176e904f2c4f1c5ac5 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Sun, 16 Feb 2025 17:51:34 +0100 Subject: [PATCH 1/4] Fail primary term and generation listeners on a closed shard If a shard has been closed, we should quickly bail out and fail all waiting primary term and generation listeners. Otherwise, the engine implementation may try to successfully to complete the provided listeners and perform operations on an already closed shard and cause some unexpected errors. --- .../elasticsearch/index/shard/IndexShard.java | 12 +++++++----- .../index/shard/IndexShardTests.java | 16 ++++++++++++++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d56d7471d498e..c103cecfa6d77 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -4494,11 +4494,13 @@ public void waitForEngineOrClosedShard(ActionListener listener) { * Registers a listener for an event when the shard advances to the provided primary term and segment generation */ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGeneration, ActionListener listener) { - waitForEngineOrClosedShard( - listener.delegateFailureAndWrap( - (l, ignored) -> getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l) - ) - ); + waitForEngineOrClosedShard(listener.delegateFailureAndWrap((l, ignored) -> { + if (state == IndexShardState.CLOSED) { + l.onFailure(new IndexShardClosedException(shardId)); + } else { + getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l); + } + })); } /** diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index c07b396626c45..489e9194b5760 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3334,6 +3334,22 @@ public void testWaitForClosedListener() throws IOException { assertThat("listener should have been called", called.get(), equalTo(true)); } + public void testWaitForPrimaryTermAndGenerationFailsForClosedShard() throws IOException { + Settings settings = indexSettings(IndexVersion.current(), 1, 1).build(); + IndexMetadata metadata = IndexMetadata.builder("test").putMapping(""" + { "properties": { "foo": { "type": "text"}}}""").settings(settings).primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); + + var exception = new AtomicReference(); + ActionListener listener = ActionListener.wrap(l -> { assert false : l; }, e -> exception.set(e)); + primary.waitForPrimaryTermAndGeneration(0L, 0L, listener); + + assertNull("waitForPrimaryTermAndGeneration should be waiting", exception.get()); + closeShards(primary); + // Should bail out earlier without calling the engine + assertThat(exception.get(), instanceOf(IndexShardClosedException.class)); + } + public void testRecoverFromLocalShard() throws IOException { Settings settings = indexSettings(IndexVersion.current(), 1, 1).build(); IndexMetadata metadata = IndexMetadata.builder("source") From c337e2808b46702512d2faa80aa55772cd8468c5 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 17 Feb 2025 21:05:32 +0100 Subject: [PATCH 2/4] Update javadoc for IndexShard#waitForPrimaryTermAndGeneration with IndexShardClosedException --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c103cecfa6d77..7fecc53826ff1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -4491,7 +4491,8 @@ public void waitForEngineOrClosedShard(ActionListener listener) { } /** - * Registers a listener for an event when the shard advances to the provided primary term and segment generation + * Registers a listener for an event when the shard advances to the provided primary term and segment generation. + * Completes the listener with a {@link IndexShardClosedException} if the shard is closed. */ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGeneration, ActionListener listener) { waitForEngineOrClosedShard(listener.delegateFailureAndWrap((l, ignored) -> { From ae938832e2712884b290146a9e594f246516db62 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Wed, 19 Feb 2025 10:30:03 +0100 Subject: [PATCH 3/4] Use future for getting an exception --- .../org/elasticsearch/index/shard/IndexShardTests.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 489e9194b5760..096bb55bcc323 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -27,6 +27,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; @@ -3340,14 +3341,13 @@ public void testWaitForPrimaryTermAndGenerationFailsForClosedShard() throws IOEx { "properties": { "foo": { "type": "text"}}}""").settings(settings).primaryTerm(0, 1).build(); IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); - var exception = new AtomicReference(); - ActionListener listener = ActionListener.wrap(l -> { assert false : l; }, e -> exception.set(e)); - primary.waitForPrimaryTermAndGeneration(0L, 0L, listener); + var future = new PlainActionFuture(); + primary.waitForPrimaryTermAndGeneration(0L, 0L, future); - assertNull("waitForPrimaryTermAndGeneration should be waiting", exception.get()); + assertFalse("waitForPrimaryTermAndGeneration should be waiting", future.isDone()); closeShards(primary); // Should bail out earlier without calling the engine - assertThat(exception.get(), instanceOf(IndexShardClosedException.class)); + assertNotNull(ExceptionsHelper.unwrap(expectThrows(Exception.class, future::get), IndexShardClosedException.class)); } public void testRecoverFromLocalShard() throws IOException { From 5dd1c9a904d23080efc2e64e499f7f45ec446a13 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Wed, 19 Feb 2025 10:31:12 +0100 Subject: [PATCH 4/4] Rename to initializingShard --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 096bb55bcc323..975565b73a0d6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3339,13 +3339,13 @@ public void testWaitForPrimaryTermAndGenerationFailsForClosedShard() throws IOEx Settings settings = indexSettings(IndexVersion.current(), 1, 1).build(); IndexMetadata metadata = IndexMetadata.builder("test").putMapping(""" { "properties": { "foo": { "type": "text"}}}""").settings(settings).primaryTerm(0, 1).build(); - IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); + IndexShard initializingShard = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); var future = new PlainActionFuture(); - primary.waitForPrimaryTermAndGeneration(0L, 0L, future); + initializingShard.waitForPrimaryTermAndGeneration(0L, 0L, future); assertFalse("waitForPrimaryTermAndGeneration should be waiting", future.isDone()); - closeShards(primary); + closeShards(initializingShard); // Should bail out earlier without calling the engine assertNotNull(ExceptionsHelper.unwrap(expectThrows(Exception.class, future::get), IndexShardClosedException.class)); }