Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail primary term and generation listeners on a closed shard #122713

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4491,14 +4491,17 @@ public void waitForEngineOrClosedShard(ActionListener<Void> listener) {
}

/**
* Registers a listener for an event when the shard advances to the provided primary term and segment generation
* Registers a listener for an event when the shard advances to the provided primary term and segment generation.
* Completes the listener with a {@link IndexShardClosedException} if the shard is closed.
*/
public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGeneration, ActionListener<Long> listener) {
waitForEngineOrClosedShard(
listener.delegateFailureAndWrap(
(l, ignored) -> getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l)
)
);
waitForEngineOrClosedShard(listener.delegateFailureAndWrap((l, ignored) -> {
if (state == IndexShardState.CLOSED) {
l.onFailure(new IndexShardClosedException(shardId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I am a bit concerned we might be throwing now in cases where we did not expect. Specifically, TransportUnpromotableShardRefreshAction#unpromotableShardOperation() seems to ignore some errors (e.g., shard not being there) and return success. Can you confirm (ideally through a test) whether the unpromotable refresh was also ignoring whether a shard was closed? if so, we may need to catch the new IndexShardClosedException in unpromotable refresh to ignore it. Or only throw it / break early in the real-time gets and leave unpromotable refreshes as they were.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original waitForPrimaryTermAndGeneration API seems trappy? i.e. it returns even if the generation never made to the node

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, I still wonder if the shard was relocated before it had the chance to start or what's going on in the failing test, I didn't have the time to check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @arteam can answer your question by checking the original test failure's sequence of events.

As to the trappiness of the API, we can correct it (this is something this PR tries to do) but we should ensure we don't change behavior and create bugs. That's why I'd like us to ensure that if the unpromotable refresh ignored the shard being closed, we also ignore it with this PR. @arteam can you check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's a good point to check for regressions in TransportUnpromotableShardRefreshAction if it has special handling for AlreadyClosedException. Let me check it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, I still wonder if the shard was relocated before it had the chance to start or what's going on in the failing test, I didn't have the time to check.

No, it was during the closure of a shard. I believe the shard got closed before the search shard got initialized.

  2> WARNING: Uncaught exception in thread: Thread[#1665,elasticsearch[node_t5][clusterApplierService#updateTask][T#1],5,TGRP-StatelessHollowIndexShardsIT]
  2> java.lang.AssertionError: On the cluster applier thread you must use ClusterChangedEvent#state() and ClusterChangedEvent#previousState() instead of ClusterApplierService#state(). It is almost certainly a bug to read the latest-applied state from within a cluster applier since the new state has been committed at this point but is not yet applied.
  2> 	at __randomizedtesting.SeedInfo.seed([57CD61946EF05833]:0)
  2> 	at org.elasticsearch.cluster.service.ClusterApplierService.assertNotCalledFromClusterStateApplier(ClusterApplierService.java:401)
  2> 	at org.elasticsearch.cluster.service.ClusterApplierService.state(ClusterApplierService.java:207)
  2> 	at org.elasticsearch.cluster.service.ClusterService.state(ClusterService.java:129)
  2> 	at org.elasticsearch.action.get.TransportGetAction.getExecutor(TransportGetAction.java:166)
  2> 	at org.elasticsearch.action.get.TransportGetAction.getExecutor(TransportGetAction.java:56)
  2> 	at org.elasticsearch.action.support.single.shard.TransportSingleShardAction.asyncShardOperation(TransportSingleShardAction.java:113)
  2> 	at org.elasticsearch.action.get.TransportGetAction.lambda$tryGetFromTranslog$6(TransportGetAction.java:293)
  2> 	at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:247)
  2> 	at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:33)
  2> 	at co.elastic.elasticsearch.stateless.engine.SearchEngine.addOrExecuteSegmentGenerationListener(SearchEngine.java:883)
  2> 	at co.elastic.elasticsearch.stateless.engine.SearchEngine.addPrimaryTermAndGenerationListener(SearchEngine.java:855)
  2> 	at org.elasticsearch.index.shard.IndexShard.lambda$waitForPrimaryTermAndGeneration$53(IndexShard.java:4499)
  2> 	at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:247)
  2> 	at org.elasticsearch.action.support.SubscribableListener$SuccessResult.complete(SubscribableListener.java:387)
  2> 	at org.elasticsearch.action.support.SubscribableListener.tryComplete(SubscribableListener.java:307)
  2> 	at org.elasticsearch.action.support.SubscribableListener.setResult(SubscribableListener.java:336)
  2> 	at org.elasticsearch.action.support.SubscribableListener.onResponse(SubscribableListener.java:250)
  2> 	at org.elasticsearch.index.shard.IndexShard.checkAndCallWaitForEngineOrClosedShardListeners(IndexShard.java:4482)
  2> 	at org.elasticsearch.index.shard.IndexShard.close(IndexShard.java:1784)
  2> 	at org.elasticsearch.index.IndexService.lambda$closeShard$16(IndexService.java:672)
  2> 	at org.elasticsearch.action.ActionListener.run(ActionListener.java:452)
  2> 	at org.elasticsearch.index.IndexService.closeShard(IndexService.java:650)
  2> 	at org.elasticsearch.index.IndexService.removeShard(IndexService.java:610)
  2> 	at org.elasticsearch.indices.cluster.IndicesClusterStateService.removeIndicesAndShards(IndicesClusterStateService.java:516)
  2> 	at org.elasticsearch.indices.cluster.IndicesClusterStateService.doApplyClusterState(IndicesClusterStateService.java:316)
  2> 	at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:274)
  2> 	at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:570)
  2> 	at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:556)
  2> 	at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:529)
  2> 	at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:458)
  2> 	at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:157)
  2> 	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:977)
  2> 	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:218)
  2> 	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:184)
  2> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  2> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  2> 	at java.base/java.lang.Thread.run(Thread.java:1575)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kingherc TransportUnpromotableShardRefreshAction doesn't seem to have any special handling for closing shards, only for a specical for ACKing refreshes if a shard hasn't been recovered yet from #110221.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arteam in that case I think that we should add a test and maybe fix it on TransportUnpromotableShardRefreshAction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fcofdez SearchShardRecoveryIT#testRefreshOfRecoveringSearchShardAndDeleteIndex tests that we expect an exception thrown in TransportUnpromotableShardRefreshAction if a shard gets closed during the recovery. We are just going to get IndexShardClosedException instead of AlreadyClosedException because we are never going to call the engine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me 👍

} else {
getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l);
}
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3334,6 +3334,22 @@ public void testWaitForClosedListener() throws IOException {
assertThat("listener should have been called", called.get(), equalTo(true));
}

public void testWaitForPrimaryTermAndGenerationFailsForClosedShard() throws IOException {
Settings settings = indexSettings(IndexVersion.current(), 1, 1).build();
IndexMetadata metadata = IndexMetadata.builder("test").putMapping("""
{ "properties": { "foo": { "type": "text"}}}""").settings(settings).primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe rename to initializingShard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5dd1c9a


var exception = new AtomicReference<Exception>();
ActionListener<Long> listener = ActionListener.wrap(l -> { assert false : l; }, e -> exception.set(e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we can use a future here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in ae93883

primary.waitForPrimaryTermAndGeneration(0L, 0L, listener);

assertNull("waitForPrimaryTermAndGeneration should be waiting", exception.get());
closeShards(primary);
// Should bail out earlier without calling the engine
assertThat(exception.get(), instanceOf(IndexShardClosedException.class));
}

public void testRecoverFromLocalShard() throws IOException {
Settings settings = indexSettings(IndexVersion.current(), 1, 1).build();
IndexMetadata metadata = IndexMetadata.builder("source")
Expand Down