From e5d897edaee391d05a55e6ac8a420e3416fef6d9 Mon Sep 17 00:00:00 2001 From: vsinghal85 Date: Wed, 20 Nov 2024 01:27:07 +0530 Subject: [PATCH] [GOBBLIN-2173] Disallow adhoc flows sharing the same FlowId within epsilon (#4076) where epsilon is the multi-active execution/lease consolidation period --- .../restli/FlowConfigsV2ResourceHandler.java | 4 + .../api/TooSoonToRerunSameFlowException.java | 49 ++++++++++ .../DagManagementStateStore.java | 9 ++ .../InstrumentedLeaseArbiter.java | 5 + .../MultiActiveLeaseArbiter.java | 11 +++ .../MySqlDagManagementStateStore.java | 12 ++- .../MysqlMultiActiveLeaseArbiter.java | 6 ++ .../modules/orchestration/Orchestrator.java | 27 ++++++ .../MySqlDagManagementStateStoreTest.java | 26 ++++- .../MysqlMultiActiveLeaseArbiterTest.java | 45 ++++++++- .../orchestration/OrchestratorTest.java | 95 ++++++++++++++++--- 11 files changed, 272 insertions(+), 17 deletions(-) create mode 100644 gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java index 927909e57dd..055724d83d6 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java @@ -60,6 +60,7 @@ import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.FlowSpecSearchObject; +import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; @@ -256,6 +257,9 @@ public CreateKVResponse, FlowConfig> cr responseMap = this.flowCatalog.put(flowSpec, true); } catch (QuotaExceededException e) { throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); + } catch(TooSoonToRerunSameFlowException e) { + return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT, + "FlowSpec with URI " + flowSpec.getUri() + " was previously launched within the lease consolidation period, no action will be taken")); } catch (Throwable e) { // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java new file mode 100644 index 00000000000..f718ec4a985 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import lombok.Getter; + + +/** + * An exception thrown when another {@link FlowSpec} with same flow name and flow group + * is submitted within lease consolidation time. + */ +public class TooSoonToRerunSameFlowException extends RuntimeException { + @Getter + private final FlowSpec flowSpec; + + /** + * Account for unwrapping within @{link FlowCatalog#updateOrAddSpecHelper}`s `CallbackResult` error handling for `SpecCatalogListener`s + * @return `TooSoonToRerunSameFlowException` wrapped in another `TooSoonToRerunSameFlowException + */ + public static TooSoonToRerunSameFlowException wrappedOnce(FlowSpec flowSpec) { + return new TooSoonToRerunSameFlowException(flowSpec, new TooSoonToRerunSameFlowException(flowSpec)); + } + + public TooSoonToRerunSameFlowException(FlowSpec flowSpec) { + super("Lease already occupied by another recent execution of this flow: " + flowSpec); + this.flowSpec = flowSpec; + } + + /** restricted-access ctor: use {@link #wrappedOnce(FlowSpec)} instead */ + private TooSoonToRerunSameFlowException(FlowSpec flowSpec, Throwable cause) { + super("Lease already occupied by another recent execution of this flow: " + flowSpec, cause); + this.flowSpec = flowSpec; + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index fb7b23fdf0a..8059eab4e1c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -103,6 +103,15 @@ public interface DagManagementStateStore { */ void updateDagNode(Dag.DagNode dagNode) throws IOException; + /** + * Returns true if a flow has been launched recently with same flow name and flow group. + * @param flowGroup flow group for the flow + * @param flowName flow name for the flow + * @param flowExecutionId flow execution for the flow + * @throws IOException + */ + boolean existsCurrentlyLaunchingExecOfSameFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException; + /** * Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}. * Both params are returned as optional and are empty if not present in the store. diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java index 9e1c270c493..746ab662377 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java @@ -90,6 +90,11 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams throw new RuntimeException(String.format("Unexpected LeaseAttemptStatus (%s) for %s", leaseAttemptStatus.getClass().getName(), leaseParams)); } + @Override + public boolean existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams leaseParams) throws IOException { + return decoratedMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(leaseParams); + } + @Override public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus status) throws IOException { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java index c9a3b152bf8..f580e936a51 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java @@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + /** + * This method checks if entry for same flow name and flow group exists within the lease consolidation period + * returns true if entry for the same flow exists within Lease Consolidation Period (aka. epsilon) + * else returns false + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action + * was triggered, and if the dag action event we're checking on is a reminder event + * @return true if lease for a recently launched flow already exists for the flow details in leaseParams + */ + boolean existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams leaseParams) + throws IOException; + /** * This method is used to indicate the owner of the lease has successfully completed required actions while holding * the lease of the dag action event. It marks the lease as "no longer leasing", if the eventTimeMillis and diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index 29e652cce8e..b14d6bc85c2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -65,6 +65,7 @@ public class MySqlDagManagementStateStore implements DagManagementStateStore { // todo - these two stores should merge private DagStateStoreWithDagNodes dagStateStore; private DagStateStoreWithDagNodes failedDagStateStore; + private MultiActiveLeaseArbiter multiActiveLeaseArbiter; private final JobStatusRetriever jobStatusRetriever; private boolean dagStoresInitialized = false; private final UserQuotaManager quotaManager; @@ -79,13 +80,14 @@ public class MySqlDagManagementStateStore implements DagManagementStateStore { @Inject public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, UserQuotaManager userQuotaManager, - JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore) { + JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore, MultiActiveLeaseArbiter multiActiveLeaseArbiter) { this.quotaManager = userQuotaManager; this.config = config; this.flowCatalog = flowCatalog; this.jobStatusRetriever = jobStatusRetriever; this.dagManagerMetrics.activate(); this.dagActionStore = dagActionStore; + this.multiActiveLeaseArbiter = multiActiveLeaseArbiter; } // It should be called after topology spec map is set @@ -168,6 +170,14 @@ public synchronized void updateDagNode(Dag.DagNode dagNode) this.dagStateStore.updateDagNode(dagNode); } + @Override + public boolean existsCurrentlyLaunchingExecOfSameFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException { + DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, + flowExecutionId, DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); + return multiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(leaseParams); + } + @Override public Optional> getDag(Dag.DagId dagId) throws IOException { return Optional.ofNullable(this.dagStateStore.getDag(dagId)); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java index 48112790481..fed800c8384 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java @@ -362,6 +362,12 @@ else if (leaseValidityStatus == 2) { } } + @Override + public boolean existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams leaseParams) throws IOException { + Optional infoResult = getExistingEventInfo(leaseParams); + return infoResult.isPresent() ? infoResult.get().isWithinEpsilon() : false; + } + /** * Checks leaseArbiterTable for an existing entry for this dag action and event time */ diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index ae053ab51b4..f0a9fdd43d9 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -53,6 +53,7 @@ import org.apache.gobblin.runtime.api.SpecCatalogListener; import org.apache.gobblin.runtime.api.SpecProducer; import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; import org.apache.gobblin.service.modules.flow.FlowUtils; @@ -78,6 +79,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { protected final SpecCompiler specCompiler; protected final TopologyCatalog topologyCatalog; private final JobStatusRetriever jobStatusRetriever; + private final DagManagementStateStore dagManagementStateStore; protected final MetricContext metricContext; @@ -100,6 +102,7 @@ public Orchestrator(Config config, TopologyCatalog topologyCatalog, Optional(null); } + /* + enforces that a similar adhoc flow is not launching, + else throw {@link TooSoonToRerunSameFlowException} + */ + private void enforceNoRecentAdhocExecOfSameFlow(FlowSpec flowSpec) { + if (!flowSpec.isScheduled()) { + Config flowConfig = flowSpec.getConfig(); + String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); + + _log.info("Checking existing adhoc flow entry for " + flowGroup + "." + flowName); + try { + if (dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup, flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) { + _log.warn("Another recent adhoc flow execution found for " + flowGroup + "." + flowName); + throw TooSoonToRerunSameFlowException.wrappedOnce(flowSpec); + } + } catch (IOException exception) { + _log.error("Unable to check whether similar flow exists " + flowGroup + "." + flowName); + throw new RuntimeException("Unable to check whether similar flow exists " + flowGroup + "." + flowName, exception); + } + } + } + public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties()); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java index c14a7b62386..6dbbd0ba8b5 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -47,8 +48,7 @@ import org.apache.gobblin.service.monitoring.JobStatusRetriever; import org.apache.gobblin.util.CompletedFuture; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -59,6 +59,7 @@ public class MySqlDagManagementStateStoreTest { private ITestMetastoreDatabase testDb; + private static MultiActiveLeaseArbiter leaseArbiter; private MySqlDagManagementStateStore dagManagementStateStore; private static final String TEST_USER = "testUser"; public static final String TEST_PASSWORD = "testPassword"; @@ -68,6 +69,7 @@ public class MySqlDagManagementStateStoreTest { @BeforeClass public void setUp() throws Exception { // Setting up mock DB + this.leaseArbiter = mock(MultiActiveLeaseArbiter.class); this.testDb = TestMetastoreDatabaseFactory.get(); this.dagManagementStateStore = getDummyDMSS(this.testDb); } @@ -92,6 +94,22 @@ public static boolean compareLists(List list1, List list2) { return true; } + @Test + public void testExistsCurrentlyLaunchingSimilarFlowGivesTrue() throws Exception{ + Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + String flowName = "testFlow"; + String flowGroup = "testGroup"; + Assert.assertTrue(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup, flowName, System.currentTimeMillis())); + } + + @Test + public void testExistsCurrentlyLaunchingSimilarFlowGivesFalse() throws Exception{ + Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + String flowName = "testFlow"; + String flowGroup = "testGroup"; + Assert.assertFalse(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup, flowName, System.currentTimeMillis())); + } + @Test public void testAddDag() throws Exception { Dag dag = DagTestUtils.buildDag("test", 12345L); @@ -150,9 +168,11 @@ public static MySqlDagManagementStateStore getDummyDMSS(ITestMetastoreDatabase t TopologySpec topologySpec = LaunchDagProcTest.buildNaiveTopologySpec(TEST_SPEC_EXECUTOR_URI); URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI); topologySpecMap.put(specExecURI, topologySpec); + MultiActiveLeaseArbiter multiActiveLeaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class); + leaseArbiter = multiActiveLeaseArbiter; MySqlDagManagementStateStore dagManagementStateStore = new MySqlDagManagementStateStore(config, null, null, jobStatusRetriever, - MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase)); + MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase), multiActiveLeaseArbiter); dagManagementStateStore.setTopologySpecMap(topologySpecMap); return dagManagementStateStore; } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java index 9b132fe0d9b..9ba8f40c6b8 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java @@ -44,6 +44,7 @@ public class MysqlMultiActiveLeaseArbiterTest { private static final long EPSILON = 10000L; private static final long MORE_THAN_EPSILON = (long) (EPSILON * 1.1); + private static final long LESS_THAN_EPSILON = (long) (EPSILON * 0.90); // NOTE: `sleep`ing this long SIGNIFICANTLY slows tests, but we need a large enough value that exec. variability won't cause spurious failure private static final long LINGER = 20000L; private static final long MORE_THAN_LINGER = (long) (LINGER * 1.1); @@ -53,9 +54,12 @@ public class MysqlMultiActiveLeaseArbiterTest { private static final String CONSTANTS_TABLE = "constants_store"; private static final String flowGroup = "testFlowGroup"; private static final String flowGroup2 = "testFlowGroup2"; + private static final String flowGroup3 = "testFlowGroup3"; + private static final String flowGroup4 = "testFlowGroup4"; private static final String flowName = "testFlowName"; private static final String jobName = "testJobName"; - private static final long flowExecutionId = 12345677L; + private static final long flowExecutionId = 12345677213L; + private static final long flowExecutionId1 = 12345996546L; private static final long eventTimeMillis = 1710451837L; // Dag actions with the same flow info but different flow action types are considered unique private static final DagActionStore.DagAction launchDagAction = @@ -70,6 +74,18 @@ public class MysqlMultiActiveLeaseArbiterTest { new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); private static final DagActionStore.LeaseParams launchLeaseParams2 = new DagActionStore.LeaseParams(launchDagAction2, false, eventTimeMillis); + private static final DagActionStore.LeaseParams + launchLeaseParams3 = new DagActionStore.LeaseParams(new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId, jobName, + DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis); + private static final DagActionStore.LeaseParams + launchLeaseParams3_similar = new DagActionStore.LeaseParams(new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId1, jobName, + DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis); + private static final DagActionStore.LeaseParams + launchLeaseParams4 = new DagActionStore.LeaseParams(new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, jobName, + DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis); + private static final DagActionStore.LeaseParams + launchLeaseParams4_similar = new DagActionStore.LeaseParams(new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId1, jobName, + DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis); private static final Timestamp dummyTimestamp = new Timestamp(99999); private ITestMetastoreDatabase testDb; private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter; @@ -201,6 +217,33 @@ public void testAcquireLeaseSingleParticipant() throws Exception { <= sixthObtainedStatus.getLeaseAcquisitionTimestamp()); } + /* + test to verify if leasable entity is unavailable before epsilon time + to account for clock drift + */ + @Test + public void testExistsSimilarLeaseWithinConsolidationPeriod() throws Exception{ + LeaseAttemptStatus firstLaunchStatus = + mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true); + Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); + completeLeaseHelper(launchLeaseParams3); + Thread.sleep(LESS_THAN_EPSILON); + Assert.assertTrue(mysqlMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(launchLeaseParams3_similar)); + } + + /* + test to verify if leasable entity exists post epsilon time + */ + @Test + public void testDoesNotExistsSimilarLeaseWithinConsolidationPeriod() throws Exception{ + LeaseAttemptStatus firstLaunchStatus = + mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams4, true); + Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); + completeLeaseHelper(launchLeaseParams4); + Thread.sleep(MORE_THAN_EPSILON); + Assert.assertFalse(mysqlMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(launchLeaseParams4_similar)); + } + /* Tests attemptLeaseIfNewRow() method to ensure a new row is inserted if no row matches the primary key in the table. If such a row does exist, the method should disregard the resulting SQL error and return 0 rows updated, indicating diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index ee5f14cb873..acfa6c51cad 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -18,13 +18,20 @@ package org.apache.gobblin.service.modules.orchestration; import java.io.File; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; import java.util.Properties; import org.apache.commons.io.FileUtils; +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; +import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; +import org.apache.gobblin.service.modules.flow.FlowUtils; +import org.apache.gobblin.service.modules.flow.SpecCompiler; import org.apache.hadoop.fs.Path; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -63,6 +70,7 @@ import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -85,7 +93,9 @@ public class OrchestratorTest { private FlowCatalog flowCatalog; private FlowSpec flowSpec; private ITestMetastoreDatabase testMetastoreDatabase; - private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator; + private Orchestrator orchestrator; + private DagManagementStateStore dagManagementStateStore; + private SpecCompiler specCompiler; @BeforeClass public void setUpClass() throws Exception { @@ -107,7 +117,7 @@ public void setUp() throws Exception { flowProperties.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR); this.serviceLauncher = new ServiceBasedAppLauncher(orchestratorProperties, "OrchestratorCatalogTest"); - + this.specCompiler = Mockito.mock(SpecCompiler.class); this.topologyCatalog = new TopologyCatalog(ConfigUtils.propertiesToConfig(topologyProperties), Optional.of(logger)); this.serviceLauncher.addService(topologyCatalog); @@ -116,20 +126,21 @@ public void setUp() throws Exception { this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties), Optional.of(logger), Optional.absent(), true); this.serviceLauncher.addService(flowCatalog); - + MultiActiveLeaseArbiter leaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class); MySqlDagManagementStateStore dagManagementStateStore = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase)); + this.dagManagementStateStore = dagManagementStateStore; SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties)); FlowCompilationValidationHelper flowCompilationValidationHelper = new FlowCompilationValidationHelper(ConfigFactory.empty(), sharedFlowMetricsSingleton, mock(UserQuotaManager.class), dagManagementStateStore); - this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties), + this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties), this.topologyCatalog, Optional.of(logger), mock(FlowLaunchHandler.class), sharedFlowMetricsSingleton, dagManagementStateStore, flowCompilationValidationHelper, mock(JobStatusRetriever.class)); - this.topologyCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator); - this.flowCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator); + this.topologyCatalog.addListener(orchestrator); + this.flowCatalog.addListener(orchestrator); // Start application this.serviceLauncher.start(); // Create Spec to play with @@ -233,7 +244,7 @@ public URI computeTopologySpecURI(String parent, String current) { // TODO: this test doesn't exercise `Orchestrator` and so belongs elsewhere - move it, then rework into `@BeforeMethod` init (since others depend on this) @Test public void createTopologySpec() { - IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler(); + IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler(); // List Current Specs Collection specs = topologyCatalog.getSpecs(); @@ -272,7 +283,7 @@ public void createFlowSpec() throws Throwable { // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod` createTopologySpec(); // make 1 Topology with 1 SpecProducer available and responsible for our new FlowSpec - IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler(); + IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler(); SpecExecutor sei = specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor(); // List Current Specs @@ -311,12 +322,72 @@ public void createFlowSpec() throws Throwable { "SpecProducer should contain 0 Spec after addition"); } + /* + If another flow has already acquired lease for this flowspec details within + lease consolidation time, then we do not execute this flow, hence do not process and store the spec + and throw TooSoonToRerunSameFlowException + */ + @Test(expectedExceptions = TooSoonToRerunSameFlowException.class) + public void onAddSpecForAdhocFlowWhenSimilarExistingFlowIsCurrentlyLaunching() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()) + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow("testGroup","testName", FlowUtils.getOrCreateFlowExecutionId(flowSpec))).thenReturn(true); + orchestrator.onAddSpec(flowSpec); + } + + /* + If no other flow has acquired lease within the epsilon time, then flow + compilation and addition to the store occurs normally + */ + @Test + public void onAddSpecForAdhocFlowWhenNoExistingFlowIsCurrentlyLaunching() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()) + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow("testGroup","testName", FlowUtils.getOrCreateFlowExecutionId(flowSpec))).thenReturn(false); + AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec); + Assert.assertNotNull(addSpecResponse); + } + + /* + For Scheduled flow lease acquirable check does not occur, + and flow compilation occurs successfully + */ + @Test + public void onAddSpecForScheduledFlow() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *") + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + AddSpecResponse response = new AddSpecResponse<>(new Object()); + Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response); + AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec); + Assert.assertNotNull(addSpecResponse); + // Verifying that for scheduled flow existsCurrentlyLaunchingExecOfSameFlow is not called + Mockito.verify(dagManagementStateStore, Mockito.never()).existsCurrentlyLaunchingExecOfSameFlow(anyString(), anyString(), anyLong()); + } + @Test public void deleteFlowSpec() throws Throwable { // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod` createFlowSpec(); // make 1 Flow available (for deletion herein) - IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler(); + IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler(); SpecExecutor sei = specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor(); // List Current Specs @@ -359,19 +430,19 @@ public void doNotRegisterMetricsAdhocFlows() throws Throwable { createTopologySpec(); // for flow compilation to pass FlowId flowId = GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME); - MetricContext metricContext = this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSharedFlowMetricsSingleton().getMetricContext(); + MetricContext metricContext = this.orchestrator.getSharedFlowMetricsSingleton().getMetricContext(); String metricName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowId.getFlowGroup(), flowId.getFlowName(), ServiceMetricNames.COMPILED); this.topologyCatalog.getInitComplete().countDown(); // unblock orchestration FlowSpec adhocSpec = createBasicFlowSpecForFlowId(flowId); - this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(adhocSpec, new Properties(), 0, false); + this.orchestrator.orchestrate(adhocSpec, new Properties(), 0, false); Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName)); Properties scheduledProps = new Properties(); scheduledProps.setProperty("job.schedule", "0/2 * * * * ?"); FlowSpec scheduledSpec = createBasicFlowSpecForFlowId(flowId, scheduledProps); - this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(scheduledSpec, new Properties(), 0, false); + this.orchestrator.orchestrate(scheduledSpec, new Properties(), 0, false); Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName)); }