Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix simultaneously creating a snapshot and updating the repository can potentially trigger an infinite loop #17532

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix exists queries on nested flat_object fields throws exception ([#16803](https://github.com/opensearch-project/OpenSearch/pull/16803))
- Add highlighting for wildcard search on `match_only_text` field ([#17101](https://github.com/opensearch-project/OpenSearch/pull/17101))
- Fix illegal argument exception when creating a PIT ([#16781](https://github.com/opensearch-project/OpenSearch/pull/16781))
- Fix simultaneously creating a snapshot and updating the repository can potentially trigger an infinite loop ([#17532](https://github.com/opensearch-project/OpenSearch/pull/17532))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@

package org.opensearch.repositories;

import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
Expand All @@ -45,6 +47,8 @@
import java.util.Collection;
import java.util.Collections;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -122,4 +126,66 @@ public void testSystemRepositoryCantBeCreated() {

assertThrows(RepositoryException.class, () -> createRepository(repositoryName, FsRepository.TYPE, repoSettings));
}

public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws InterruptedException {
// create index
internalCluster();
String indexName = "test-index";
createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 0).put(SETTING_NUMBER_OF_SHARDS, 1).build());
index(indexName, "_doc", "1", Collections.singletonMap("user", generateRandomStringArray(1, 10, false, false)));
flush(indexName);

// create repository
final String repositoryName = "test-repo";
Settings.Builder repoSettings = Settings.builder()
.put("location", randomRepoPath())
.put("max_snapshot_bytes_per_sec", "10mb")
.put("max_restore_bytes_per_sec", "10mb");
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
FsRepository.TYPE,
true,
repoSettings
);

Thread thread = new Thread(() -> {
String snapshotName = "test-snapshot";
logger.info("--> starting snapshot");
CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(repositoryName, snapshotName)
.setWaitForCompletion(true)
.setIndices(indexName)
.get();
logger.info("--> finishing snapshot");
});
thread.start();

logger.info("--> begin to reset repository");
repoSettings = Settings.builder().put("location", randomRepoPath()).put("max_snapshot_bytes_per_sec", "300mb");
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
client().admin().cluster(),
repositoryName,
FsRepository.TYPE,
true,
repoSettings
);
logger.info("--> finish to reset repository");

GetRepositoriesRequest getRepositoriesRequest = new GetRepositoriesRequest(new String[] { repositoryName });
try {
GetRepositoriesResponse getRepositoriesResponse = client().admin().cluster().getRepositories(getRepositoriesRequest).get();
assertThat(getRepositoriesResponse.repositories(), hasSize(1));
RepositoryMetadata repositoryMetadata = getRepositoriesResponse.repositories().get(0);
assertThat(repositoryMetadata.type(), equalTo(FsRepository.TYPE));
assertThat(repositoryMetadata.settings().get("max_snapshot_bytes_per_sec"), equalTo("300mb"));
assertThat(repositoryMetadata.settings().hasValue("max_restore_bytes_per_sec"), equalTo(false));
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("--> finish to get response about repository");
thread.join();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Repository that is filtered
Expand Down Expand Up @@ -284,13 +285,24 @@
in.updateState(state);
}

@Deprecated
@Override
public void executeConsistentStateUpdate(
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
String source,
Consumer<Exception> onFailure
) {
in.executeConsistentStateUpdate(createUpdateTask, source, onFailure);
executeConsistentStateUpdate(createUpdateTask, source, () -> this, onFailure);
}

Check warning on line 296 in server/src/main/java/org/opensearch/repositories/FilterRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/FilterRepository.java#L295-L296

Added lines #L295 - L296 were not covered by tests

@Override
public void executeConsistentStateUpdate(
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
String source,
Supplier<Repository> currentRepositeSupplier,
Consumer<Exception> onFailure
) {
in.executeConsistentStateUpdate(createUpdateTask, source, currentRepositeSupplier, onFailure);

Check warning on line 305 in server/src/main/java/org/opensearch/repositories/FilterRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/FilterRepository.java#L305

Added line #L305 was not covered by tests
}

@Override
Expand Down Expand Up @@ -345,4 +357,9 @@
public void close() {
in.close();
}

@Override
public boolean isOpen() {
return in.isOpen();

Check warning on line 363 in server/src/main/java/org/opensearch/repositories/FilterRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/FilterRepository.java#L363

Added line #L363 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* An interface for interacting with a repository in snapshot and restore.
Expand Down Expand Up @@ -542,6 +543,14 @@ default IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotInfo snapshotInf
* @param source the source of the cluster state update task
* @param onFailure error handler invoked on failure to get a consistent view of the current {@link RepositoryData}
*/
void executeConsistentStateUpdate(
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
String source,
Supplier<Repository> currentRepositeSupplier,
Consumer<Exception> onFailure
);

@Deprecated
void executeConsistentStateUpdate(
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
String source,
Expand Down Expand Up @@ -611,4 +620,7 @@ default void reload(RepositoryMetadata repositoryMetadata) {}
* Validate the repository metadata
*/
default void validateMetadata(RepositoryMetadata repositoryMetadata) {}

boolean isOpen();

}
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,8 @@
*/
protected volatile int bufferSize;

private volatile boolean closed;

/**
* Constructs new BlobStoreRepository
* @param repositoryMetadata The metadata for this repository including name and settings
Expand Down Expand Up @@ -630,21 +632,44 @@
}
if (store != null) {
try {
closed = true;
store.close();
} catch (Exception t) {
logger.warn("cannot close blob store", t);
}
}
}

@Deprecated
@Override
public void executeConsistentStateUpdate(
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
String source,
Consumer<Exception> onFailure
) {
executeConsistentStateUpdate(createUpdateTask, source, () -> this, onFailure);
}

Check warning on line 651 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L650-L651

Added lines #L650 - L651 were not covered by tests

@Override
public void executeConsistentStateUpdate(
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
String source,
Supplier<Repository> currentRepositeSupplier,
Consumer<Exception> onFailure
) {
final RepositoryMetadata repositoryMetadataStart = metadata;
getRepositoryData(ActionListener.wrap(repositoryData -> {
Repository currentRepository = this;
final RepositoryMetadata repositoryMetadataStart;
if (currentRepository != currentRepositeSupplier.get()) {
if (this.isOpen()) {
throw new IllegalStateException("the repository should be closed");

Check warning on line 664 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L664

Added line #L664 was not covered by tests
}
currentRepository = currentRepositeSupplier.get();
repositoryMetadataStart = currentRepository.getMetadata();

Check warning on line 667 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L666-L667

Added lines #L666 - L667 were not covered by tests
} else {
repositoryMetadataStart = metadata;
}

currentRepository.getRepositoryData(ActionListener.wrap(repositoryData -> {
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {

Expand Down Expand Up @@ -679,7 +704,7 @@
if (executedTask) {
updateTask.clusterStateProcessed(source, oldState, newState);
} else {
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
executeConsistentStateUpdate(createUpdateTask, source, currentRepositeSupplier, onFailure);
}
}

Expand Down Expand Up @@ -4690,6 +4715,11 @@
}
}

@Override
public boolean isOpen() {
return closed == false;
}

private static void failStoreIfCorrupted(Store store, Exception e) {
if (Lucene.isCorruptionException(e)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@
public TimeValue timeout() {
return request.clusterManagerNodeTimeout();
}
}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
}, "create_snapshot [" + snapshotName + ']', () -> repositoriesService.repository(request.repository()), listener::onFailure);
}

/**
Expand Down Expand Up @@ -640,7 +640,7 @@
return request.clusterManagerNodeTimeout();
}

}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
}, "create_snapshot [" + snapshotName + ']', () -> repositoriesService.repository(repositoryName), listener::onFailure);
}

private void cleanOrphanTimestamp(String repoName, RepositoryData repositoryData) {
Expand Down Expand Up @@ -1062,7 +1062,11 @@
public TimeValue timeout() {
return request.clusterManagerNodeTimeout();
}
}, "clone_snapshot_v2 [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
},
"clone_snapshot_v2 [" + request.source() + "][" + snapshotName + ']',
() -> repositoriesService.repository(repositoryName),
listener::onFailure

Check warning on line 1068 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1066-L1068

Added lines #L1066 - L1068 were not covered by tests
);
}

// TODO: It is worth revisiting the design choice of creating a placeholder entry in snapshots-in-progress here once we have a cache
Expand Down Expand Up @@ -1148,14 +1152,18 @@
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
logger.info("snapshot clone [{}] started", snapshot);
addListener(snapshot, ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure));
startCloning(repository, newEntry);
startCloning(repository, repositoryName, newEntry);

Check warning on line 1155 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1155

Added line #L1155 was not covered by tests
}

@Override
public TimeValue timeout() {
return request.clusterManagerNodeTimeout();
}
}, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
},
"clone_snapshot [" + request.source() + "][" + snapshotName + ']',
() -> repositoriesService.repository(repositoryName),
listener::onFailure

Check warning on line 1165 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1163-L1165

Added lines #L1163 - L1165 were not covered by tests
);
}

private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) {
Expand Down Expand Up @@ -1189,7 +1197,7 @@
* @param repository repository to run operation on
* @param cloneEntry clone operation in the cluster state
*/
private void startCloning(Repository repository, SnapshotsInProgress.Entry cloneEntry) {
private void startCloning(Repository repository, String repositoryName, SnapshotsInProgress.Entry cloneEntry) {
final List<IndexId> indices = cloneEntry.indices();
final SnapshotId sourceSnapshot = cloneEntry.source();
final Snapshot targetSnapshot = cloneEntry.snapshot();
Expand Down Expand Up @@ -1310,7 +1318,7 @@
logger.warn("Did not find expected entry [{}] in the cluster state", cloneEntry);
}
}
}, "start snapshot clone", onFailure), onFailure);
}, "start snapshot clone", () -> repositoriesService.repository(repositoryName), onFailure), onFailure);

Check warning on line 1321 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1321

Added line #L1321 was not covered by tests
}

private final Set<RepositoryShardId> currentlyCloning = Collections.synchronizedSet(new HashSet<>());
Expand Down Expand Up @@ -2639,7 +2647,7 @@
public TimeValue timeout() {
return request.clusterManagerNodeTimeout();
}
}, "delete snapshot", listener::onFailure);
}, "delete snapshot", () -> repositoriesService.repository(repoName), listener::onFailure);
}

private static List<SnapshotId> matchingSnapshotIds(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -811,13 +812,22 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
@Override
public void updateState(final ClusterState state) {}

@Deprecated
@Override
public void executeConsistentStateUpdate(
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
String source,
Consumer<Exception> onFailure
) {}

@Override
public void executeConsistentStateUpdate(
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
String source,
Supplier<Repository> currentRepositeSupplier,
Consumer<Exception> onFailure
) {}

@Override
public void cloneShardSnapshot(
SnapshotId source,
Expand All @@ -841,6 +851,11 @@ public void cloneRemoteStoreIndexShardSnapshot(

}

@Override
public boolean isOpen() {
return isClosed == false;
}

@Override
public Lifecycle.State lifecycleState() {
return null;
Expand Down
Loading
Loading