Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected behavior when retrying a CallActivity depending on the asyncComplete-flag #4014

Open
rssap opened this issue Jan 17, 2025 · 1 comment

Comments

@rssap
Copy link
Contributor

rssap commented Jan 17, 2025

I am observing different results when retrying a CallActivity with multi-instance behavior, depending on the value of completeAsync.

This issue is similar to this other topic. However, they are focusing on inconsistencies of the event context in case of a retry of a failed CallActivity. I decided to create a dedicated issue for this because they only mention my problem in a comment in their linked PR.

Describe the bug
If the CallActivity (with multi-instance behavior) is configured with completeAsync=true and retried after an error, only one subflow completes successfully, the other subflows get canceled

Unit Test:
Here is a commit with a complete test setup. For completeness of this post, I will outline the scenario here as well.

The parent process contains a CallActivity with multi-instance behavior, which starts 3 subflows. The CallActivity is either configured with completeAsync=true or completeAsync=false. Additionally, the CallActivity contains a VariableAggregation, which sole purpose is to provoke an error inside ParallelMultiInstanceBehavior::leave. The subflow only contains a start and end event.

The test with completeAsync=true fails (only one of the subflows completes successfully, the other two subflows get canceled), but the test with completeAsync=false passes.

    @Test
    @Deployment(resources = {
            "org/flowable/engine/test/api/event/CallActivityTest.testCallActivityWithMultiInstanceBehaviorAndAsyncCompleteRetry.bpmn20.xml",
            "org/flowable/engine/test/api/event/CallActivityTest.testCallActivityWithMultiInstanceBehaviorAndAsyncCompleteRetry_subflow.bpmn20.xml",
    })
    public void testCallActivityWithMultiInstanceBehaviorAndAsyncCompleteRetry() throws Exception {
        testCallActivityWithMultiInstanceBehaviorRetryAfterFailure();
    }
    
    @Test
    @Deployment(resources = {
            "org/flowable/engine/test/api/event/CallActivityTest.testCallActivityWithMultiInstanceBehaviorAndSyncCompleteRetry.bpmn20.xml",
            "org/flowable/engine/test/api/event/CallActivityTest.testCallActivityWithMultiInstanceBehaviorAndAsyncCompleteRetry_subflow.bpmn20.xml",
    })
    public void testCallActivityWithMultiInstanceBehaviorAndSyncCompleteRetry() throws Exception {
        testCallActivityWithMultiInstanceBehaviorRetryAfterFailure();
    }
    
    public void testCallActivityWithMultiInstanceBehaviorRetryAfterFailure() throws Exception {
        // Set number of retries to 1. In case of an error, the Job becomes a DeadLetterJob
        processEngineConfiguration.getJobServiceConfiguration().setAsyncExecutorNumberOfRetries(1);

        ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("callActivity");

        waitForJobExecutorToProcessAllJobsAndExecutableTimerJobs(20000L, 200L);

        // Check erroneous state:
        // The CallActivity is configured with VariableAggregation. However, it is intentionally referencing a non-existing variable. This will cause the CallActivity to fail.
        assertThat(processEngine.getManagementService().createDeadLetterJobQuery().withException().count() == 3).isTrue();

        // Set the missing process variable. After it is set, the process should complete successfully.
        runtimeService.setVariable(processInstance.getId(), "nonExistingVariable", "target");

        // Retry failed jobs
        managementService.createDeadLetterJobQuery().list().forEach(job -> managementService.moveDeadLetterJobToExecutableJob(job.getId(), 3));
        waitForJobExecutorToProcessAllJobsAndExecutableTimerJobs(20000L, 200L);
        
        // Check that the parent process and its subflows completed
        assertProcessEnded(processInstance.getId());
    }

Expected behavior
I expect that my test passes when completeAsync=true and when completeAsync=false.

Error Analysis: I will now explain what goes wrong, if completeAsync is true:

When the exception is thrown due to the VariableAggregation, the execution of the current job is set to “inactive” (JobRetryCmd::execute). In this test, the execution of the current job refers to the “CallActivity” (in the attached image, these are the executions with the IDs 17, 18, and 19). This differs from the test where completeAsync is false (there, the execution of the jobs refer to the EndEvent of the subflows (see IDs 23, 29, and 35 in the attached image)). After the retry (i.e. moving the DeadLetterJobs), all three CallActivity-executions are executed again. However, they remain “inactive” (i.e. the column IS_ACTIVE in the DB has the value false).

Every time an instance of the CallActivity finishes, the number of “completed” instances is compared to the total number of instances (this happens in ParallelMultiInstanceBehavior::internalLeave). If nrOfCompletedInstances >= nrOfInstances || isCompletionConditionSatisfied evaluates to true all remaining instances of the CallActivity get cancelled. However, here is the problem: when the first instance reaches this point (after the retry), the value of nrOfCompletedInstances is 4 (which is wrong) and the value of nrOfInstances is 3 (which is correct). Hence, the other two instances get cancelled.
The value of nrOfCompletedInstances is calculated by counting the number of CallActivity-executions where IS_ACTIVE==false (i.e. 17, 18, 19) (+ 1 for the current instance).

Note:
I originally posted this on the Flowable Forum. Since I did not receive any feedback there, I guess that this is a better place to raise the problem.

Additional context

Image

Image

@rssap
Copy link
Contributor Author

rssap commented Feb 17, 2025

We were able to mitigate the problem by extending the MoveDeadLetterJobToExecutableJobCmd in our code and overriding the execute method.

import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.impl.persistence.entity.ExecutionEntity;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.job.api.Job;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.impl.cmd.MoveDeadLetterJobToExecutableJobCmd;

public class CustomMoveDeadLetterJobToExecutableJobCmd extends MoveDeadLetterJobToExecutableJobCmd {

  public CustomMoveDeadLetterJobToExecutableJobCmd(String jobId, int retries, JobServiceConfiguration jobServiceConfiguration) {
    super(jobId, retries, jobServiceConfiguration);
  }


  @Override
  public Job execute(CommandContext commandContext) {
    Job job = super.execute(commandContext);

    if (job.getExecutionId() != null) {
      ExecutionEntity executionEntity = CommandContextUtil.getExecutionEntityManager(commandContext).findById(job.getExecutionId());
      if (executionEntity != null) {
        executionEntity.setActive(true);
      }
    }
    return job;
  }
}

This might serve as inspiration for fixing the problem in the Flowable code. However, we are using classes from flowable-engine, which is not a dependency of flowable-job-service (where the MoveDeadLetterJobToExecutableJobCmd is defined). Therefore, the code cannot be simply copied to MoveDeadLetterJobToExecutableJobCmd.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant