FLOWABLE的真正的并行服务任务执行(下)

FLOWABLE的真正的并行服务任务执行(下)

如您所见,该接口看起来与JavaDelegate非常相似。有一个execute方法带有DelegateExecution和一个新的AsyncTaskInvoker。AsyncTaskInvoker是一个新接口,可用于安排Flowable维护的公共共享线程池上的工作。但是,您可以根据需要使用自己的实现,甚至可以重用从所使用的库返回的CompletableFuture(例如,Elasticsearch的Java API,MongoDB等)。

除了execute方法之外,还有另一个方法(afterExecution)采用了DelegateExecution和执行数据。此方法是从与流程实例相同的线程和相同的事务中调用的,一旦昂贵的逻辑完成,应使用此方法在DelegateExecution上设置数据。不应在另一个线程上安排的工作中使用DelegateExecution;在安排工作之前,应检索所有需要的数据。一个示例实现可能如下所示:

public class LongRunningJavaDelegate implements FutureJavaDelegate {

    public CompletableFuture execute(DelegateExecution execution, AsyncTaskInvoker taskInvoker) {
        // This is running in the same transaction as the process instance and is still possible to set and extract data from the execution
        String input = (String) execution.getVariable("input");
        // The taskInvoker is a common invoker provided by Flowable that can be used to submit complex executions on a new thread.
        // However, you don't have to use it, you can use your own custom ExecutorService or return a CompletableFuture from your own services.
        return taskInvoker.submit(() -> {
            // This is running on a new thread. The execution shouldn't be used here.
            // There is also no transaction here. In case a new transaction is needed, then it should be managed by your own services
            // Perform some complex logic that takes some time, e.g. invoking an external service
            return "done";
        });
    }

    public void afterExecution(DelegateExecution execution, String executionData) {
        // This is running in the same transaction and thread as the process instance and data can be set on the execution
        execution.setVariable("longRunningResult", executionData);
    }
}

 

除了FutureJavaDelegate之外,还有两个其他接口可以使您更轻松地实现业务逻辑:

 

可以使用MapBasedFlowableFutureJavaDelegate以下方式创建上面的示例实现:

public class LongRunningJavaDelegate implements MapBasedFlowableFutureJavaDelegate {

    public Map execute(ReadOnlyDelegateExecution execution) {
        // The execution contains a read only snapshot of the delegate execution
        // This is running on a new thread. The execution shouldn't be used here.
        // There is also no transaction here. In case a new transaction is needed, then it should be managed by your own services
        // Perform some complex logic that takes some time, e.g. invoking an external service
        Map result = new HashMap<>();
        result.put("longRunningResult", "done");
        // All the values from the returned map will be set on the execution
        return result;
    }
}

除了通过使用委托支持真正的并行执行外,我们还通过返回CompletableFuture的表达式来支持服务任务的此类执行。如果服务任务的表达式结果返回CompletableFuture,我们将继续执行并行流,一旦将来完成,我们将继续该流的路径。

我们如何实现这一目标?

我们已经展示了使用Flowable实现真正的并行执行所需的操作。让我们快速看一下它是如何实现的,以及为什么我们以前没有这样做。

主要解决方案包括将委托类的执行分为不同的阶段。处理执行数据(例如变量)的阶段计划在调用线程上运行并参与现有事务。例如,这避免了连接多个异步路径的问题,该问题需要在同一实体上连接数据,如上所述,使用异步标志时。然后使用一个单独的线程池在一个阶段中计划将这些数据用作输入并生成输出数据的实际逻辑。最终,所有执行路径及其结果将合并到一个在事务上安全的阶段。

多年以来,我们就一直在使用此功能。到达此处花了一段时间的主要原因是,从技术上讲,它很难实现。原始的V5架构并不适合于此类事情,而执行它的方式意味着实现这一点极为困难。请注意,并非并非不可能(毕竟都是代码),但麻烦,并且从我们在V5架构上的经验来看,在所有用例中都易于出错。

但是,借助V6架构和我们最近进行的重构,这变得容易实现。所有的操作都在一个议程中计划(所有引擎,BPMN / CMMN / DMN /…都是这样工作的),这使我们可以计划一些特殊的操作,这些操作将在议程没有剩余时检查未来是否已经完成正常运作。这样,就可以继续执行已在议程上计划的其他并行流程。一旦没有其他正常操作,我们将执行一项操作,直到任何一个计划的期货完成(保持交易未完成)之前,该操作都将阻塞。这个未来的完成将允许在议程上计划新的正常操作,然后可以执行该操作。这将一直持续到我们进入等待状态,并且在此事务中不再执行任何其他类型的操作(包括期货)。

当然,这是其实现方式的简化。如果你有兴趣在低级别的细节那么我们建议你看看这个PR是添加了这个功能。

真正的并行HTTP任务

回到引言中的示例,我们使用了HTTP任务。在Flowable 6.6+中,默认情况下,不使用这种真正的并行方法执行HTTP任务。我们决定保持这种方式,以确保所有当前正在运行的进程的执行顺序保持不变,并为您提供控制权,以决定您希望它们在同一线程中执行还是在新线程中执行。为此,我们添加了新的BPMN和CMMN扩展属性flowable :parallelInSameTransaction,可以将其设置为true以使用新的并行方法执行HTTP Task。如果没有设置该属性,然后在全球defaultParallelInSameTransaction从HttpClientConfig在BPMN和CMMN发动机配置来决定它应该如何执行。

我们目前正在制定基准测试,以向您展示Flowable HTTP Tasks的真正并行执行与同步并行执行之间的区别。我们还在尝试使用HTTP NIO和Spring WebClient来实现更高的吞吐量。

 

相关教程