-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fail primary term and generation listeners on a closed shard #122713
Fail primary term and generation listeners on a closed shard #122713
Conversation
If a shard has been closed, we should quickly bail out and fail all waiting primary term and generation listeners. Otherwise, the engine implementation may try to successfully to complete the provided listeners and perform operations on an already closed shard and cause some unexpected errors.
Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing) |
); | ||
waitForEngineOrClosedShard(listener.delegateFailureAndWrap((l, ignored) -> { | ||
if (state == IndexShardState.CLOSED) { | ||
l.onFailure(new IndexShardClosedException(shardId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I am a bit concerned we might be throwing now in cases where we did not expect. Specifically, TransportUnpromotableShardRefreshAction#unpromotableShardOperation()
seems to ignore some errors (e.g., shard not being there) and return success. Can you confirm (ideally through a test) whether the unpromotable refresh was also ignoring whether a shard was closed? if so, we may need to catch the new IndexShardClosedException in unpromotable refresh to ignore it. Or only throw it / break early in the real-time gets and leave unpromotable refreshes as they were.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original waitForPrimaryTermAndGeneration
API seems trappy? i.e. it returns even if the generation never made to the node
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, I still wonder if the shard was relocated before it had the chance to start or what's going on in the failing test, I didn't have the time to check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe @arteam can answer your question by checking the original test failure's sequence of events.
As to the trappiness of the API, we can correct it (this is something this PR tries to do) but we should ensure we don't change behavior and create bugs. That's why I'd like us to ensure that if the unpromotable refresh ignored the shard being closed, we also ignore it with this PR. @arteam can you check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's a good point to check for regressions in TransportUnpromotableShardRefreshAction
if it has special handling for AlreadyClosedException
. Let me check it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, I still wonder if the shard was relocated before it had the chance to start or what's going on in the failing test, I didn't have the time to check.
No, it was during the closure of a shard. I believe the shard got closed before the search shard got initialized.
2> WARNING: Uncaught exception in thread: Thread[#1665,elasticsearch[node_t5][clusterApplierService#updateTask][T#1],5,TGRP-StatelessHollowIndexShardsIT]
2> java.lang.AssertionError: On the cluster applier thread you must use ClusterChangedEvent#state() and ClusterChangedEvent#previousState() instead of ClusterApplierService#state(). It is almost certainly a bug to read the latest-applied state from within a cluster applier since the new state has been committed at this point but is not yet applied.
2> at __randomizedtesting.SeedInfo.seed([57CD61946EF05833]:0)
2> at org.elasticsearch.cluster.service.ClusterApplierService.assertNotCalledFromClusterStateApplier(ClusterApplierService.java:401)
2> at org.elasticsearch.cluster.service.ClusterApplierService.state(ClusterApplierService.java:207)
2> at org.elasticsearch.cluster.service.ClusterService.state(ClusterService.java:129)
2> at org.elasticsearch.action.get.TransportGetAction.getExecutor(TransportGetAction.java:166)
2> at org.elasticsearch.action.get.TransportGetAction.getExecutor(TransportGetAction.java:56)
2> at org.elasticsearch.action.support.single.shard.TransportSingleShardAction.asyncShardOperation(TransportSingleShardAction.java:113)
2> at org.elasticsearch.action.get.TransportGetAction.lambda$tryGetFromTranslog$6(TransportGetAction.java:293)
2> at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:247)
2> at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:33)
2> at co.elastic.elasticsearch.stateless.engine.SearchEngine.addOrExecuteSegmentGenerationListener(SearchEngine.java:883)
2> at co.elastic.elasticsearch.stateless.engine.SearchEngine.addPrimaryTermAndGenerationListener(SearchEngine.java:855)
2> at org.elasticsearch.index.shard.IndexShard.lambda$waitForPrimaryTermAndGeneration$53(IndexShard.java:4499)
2> at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:247)
2> at org.elasticsearch.action.support.SubscribableListener$SuccessResult.complete(SubscribableListener.java:387)
2> at org.elasticsearch.action.support.SubscribableListener.tryComplete(SubscribableListener.java:307)
2> at org.elasticsearch.action.support.SubscribableListener.setResult(SubscribableListener.java:336)
2> at org.elasticsearch.action.support.SubscribableListener.onResponse(SubscribableListener.java:250)
2> at org.elasticsearch.index.shard.IndexShard.checkAndCallWaitForEngineOrClosedShardListeners(IndexShard.java:4482)
2> at org.elasticsearch.index.shard.IndexShard.close(IndexShard.java:1784)
2> at org.elasticsearch.index.IndexService.lambda$closeShard$16(IndexService.java:672)
2> at org.elasticsearch.action.ActionListener.run(ActionListener.java:452)
2> at org.elasticsearch.index.IndexService.closeShard(IndexService.java:650)
2> at org.elasticsearch.index.IndexService.removeShard(IndexService.java:610)
2> at org.elasticsearch.indices.cluster.IndicesClusterStateService.removeIndicesAndShards(IndicesClusterStateService.java:516)
2> at org.elasticsearch.indices.cluster.IndicesClusterStateService.doApplyClusterState(IndicesClusterStateService.java:316)
2> at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:274)
2> at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:570)
2> at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:556)
2> at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:529)
2> at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:458)
2> at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:157)
2> at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:977)
2> at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:218)
2> at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:184)
2> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
2> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
2> at java.base/java.lang.Thread.run(Thread.java:1575)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arteam in that case I think that we should add a test and maybe fix it on TransportUnpromotableShardRefreshAction
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fcofdez SearchShardRecoveryIT#testRefreshOfRecoveringSearchShardAndDeleteIndex
tests that we expect an exception thrown in TransportUnpromotableShardRefreshAction
if a shard gets closed during the recovery. We are just going to get IndexShardClosedException
instead of AlreadyClosedException
because we are never going to call the engine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me 👍
…dexShardClosedException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I left a couple of suggestions for the test.
IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); | ||
|
||
var exception = new AtomicReference<Exception>(); | ||
ActionListener<Long> listener = ActionListener.wrap(l -> { assert false : l; }, e -> exception.set(e)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we can use a future here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in ae93883
Settings settings = indexSettings(IndexVersion.current(), 1, 1).build(); | ||
IndexMetadata metadata = IndexMetadata.builder("test").putMapping(""" | ||
{ "properties": { "foo": { "type": "text"}}}""").settings(settings).primaryTerm(0, 1).build(); | ||
IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe rename to initializingShard
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 5dd1c9a
…eration-listener-on-closed-shard
Before approving this, can we change the description and commit message from the linked PR to reflect the actual issue? |
@fcofdez This is a new PR that's linked to Serverless PR that unmutes the stress test and adjusts the |
If a shard has been closed, we should quickly bail out and fail all waiting primary term and generation listeners. Otherwise, the engine implementation may try to successfully to complete the provided listeners and perform operations on an already closed shard and cause some unexpected errors.