工作流引擎批量处理

批量

批处理是将工作负载从当前执行中卸载以在后台处理的概念。这允许在大量实例上异步运行流程引擎命令而不会阻塞。它还将单独的命令调用彼此分离。

例如,可以使用批处理执行流程实例迁移命令 。这允许异步迁移流程实例。在同步流程实例迁移中,所有迁移都在单个事务中执行。首先,这要求所有人都成功提交事务。对于大量流程实例,事务也可能变得太大,甚至无法提交给数据库。随着批量迁移,这两个特征都改变了。批处理以较小的块执行迁移,每个块都使用一个事务。

好处:

1、异步(非阻塞)执行

2、执行可以利用多个线程和作业执行器

3、解耦执行,即每个批处理执行作业都使用其自己的事务

缺点:

1、手动轮询以完成批次

2、与流程引擎执行的其他作业竞争

3、一个子集已经执行时,批处理可能会部分失败,例如,某些流程实例已迁移,而其他实例失败

从技术上讲,批处理代表一组作业,这些作业在流程引擎的上下文中执行命令。

批处理利用流程引擎的作业执行器执行批处理作业。单个批处理包含三种作业类型:

1、种子作业:创建完成批次所需的所有批次执行作业

2、执行作业:批处理命令的实际执行,例如流程实例迁移

3、监视作业:种子作业完成后,它监视批处理执行和完成的进度

API

以下概述了批处理的Java API。

创建批次

通过异步执行流程引擎命令来创建批处理。

当前支持的批处理类型:

1、流程实例迁移

2、取消正在运行的流程实例

3、删除历史流程实例

4、更新流程实例挂起状态

5、设置与流程实例关联的作业的重试

6、流程实例修改

7、流程实例重启

8、设置外部任务重试

9、为历史流程实例设置删除时间

10、为历史决策实例设置删除时间

11、将移除时间设置为历史批次

Java API可用于创建批处理命令,有关具体用法示例,请参阅特定命令。

查询批次

您可以按ID和类型查询正在运行的批处理,例如查询所有正在运行的流程实例迁移批处理。

List<Batch> migrationBatches = processEngine.getManagementService()

  .createBatchQuery()

  .type(Batch.TYPE_PROCESS_INSTANCE_MIGRATION)

  .list();

批次统计

您可以使用管理服务查询批次的统计信息。批处理统计信息将包含有关剩余,已完成和失败的批处理执行作业的信息。

List<BatchStatistics> migrationBatches = processEngine.getManagementService()

  .createBatchStatisticsQuery()

  .type(Batch.TYPE_PROCESS_INSTANCE_MIGRATION)

  .list();

批次历史

对于历史记录级别, FULL将创建历史记录批处理条目。您可以使用历史记录服务查询它。

HistoricBatch historicBatch = processEngine.getHistoryService()

  .createHistoricBatchQuery()

  .batchId(batch.getId())

  .singleResult();

历史记录还包含种子,监视和执行作业的作业日志条目。您可以通过特定的作业定义ID查询相应的作业日志条目。

HistoricBatch historicBatch = ...

List<HistoricJobLog> batchExecutionJobLogs = processEngine.getHistoryService()

  .createHistoricJobLogQuery()

  .jobDefinitionId(historicBatch.getBatchJobDefinitionId())

  .orderByTimestamp()

  .list();

您可以为完成的历史批处理操作的历史记录清理进行配置。

暂停批次

要暂停批处理和所有相应作业的执行,可以使用管理服务暂停批处理。

processEngine.getManagementService()

  .suspendBatchById("myBatch");

然后,也可以使用管理服务来激活暂停的批次。

processEngine.getManagementService()

  .activateBatchById("myBatch");

删除批次

可以使用管理服务删除正在运行的批处理。

// Delete a batch preserving the history of the batch

processEngine.getManagementService()

  .deleteBatch("myBatch", false);// Delete a batch include history of the batch

processEngine.getManagementService()

  .deleteBatch("myBatch", true);

可以使用历史记录服务删除历史记录批处理。

processEngine.getHistoryService()

  .deleteHistoricBatch("myBatch");

对于仍在执行作业的正在运行的批处理,建议在删除该批处理之前将其挂起。有关详细信息,请参见“ 暂停批处理”部分。

批次优先级

由于所有批处理作业都是使用作业执行程序执行的,因此可以使用 作业优先级功能来调整批处理作业的优先级。默认批处理作业优先级由流程引擎配置设置 batchJobPriority。

可以 使用管理服务来调整特定批处理作业定义甚至单个批处理作业的优先级。

Batch batch = ...;

String batchJobDefinitionId = batch.getBatchJobDefinitionId();

processEngine.getManagementService()

  .setOverridingJobPriorityForJobDefinition(batchJobDefinitionId, 100, true);

操作日志

请注意,用户操作日志仅用于批量创建本身,种子作业的执行以及执行操作的单个作业均由Job Executor执行,因此不被视为用户操作。

工作定义

种子工作

批处理最初会创建一个种子作业。该种子将被重复执行以创建所有批处理执行作业。例如,如果用户启动了1000个流程实例的流程实例迁移批处理。使用默认的流程引擎配置,种子作业将在每次调用时创建10个批处理执行作业。每个执行作业将迁移1个流程实例。总而言之,种子作业将被调用100次,直到它创建完成批处理所需的1000个执行作业。

可以在流程引擎配置上配置每个种子作业调用创建的作业数batchJobsPerSeed (默认值:100)和每个批处理执行作业的调用数 invocationsPerBatchJob(默认值:1)。

Java API可用于获取批处理种子作业的作业定义:

Batch batch = ...;

JobDefinition seedJobDefinition = processEngine.getManagementService()

  .createJobDefinitionQuery()

  .jobDefinitionId(batch.getSeedJobDefinitionId())

  .singleResult();

要暂停其他批处理执行作业的创建,可以使用管理服务暂停种子作业定义:

processEngine.getManagementService()

  .suspendJobByJobDefinitionId(seedJobDefinition.getId());

执行工作

批处理的执行分为几个执行作业。特定的作业数量取决于批处理的总作业和流程引擎配置(请参阅种子作业)。每个执行作业都会针对给定的调用次数执行实际的批处理命令,例如,迁移多个流程实例。执行作业将由作业执行者执行。它们的行为类似于其他作业,这意味着它们可能会失败,并且作业执行者将 重试失败的批处理执行作业。同样,将发生 失败的批处理执行作业,而没有重试的事件

Java API可用于获取批处理的执行作业的作业定义,例如,流程实例迁移批处理

Batch batch = ...;

JobDefinition executionJobDefinition = processEngine.getManagementService()

  .createJobDefinitionQuery()

  .jobDefinitionId(batch.getBatchJobDefinitionId())

  .singleResult();

要暂停其他批处理执行作业的执行,可以使用管理服务挂起批处理作业定义:

processEngine.getManagementService()

  .suspendJobByJobDefinitionId(executionJobDefinition.getId());

监控工作

种子作业创建了所有批次执行作业之后,将为该批次创建一个监视作业。该作业会定期轮询是否已完成批处理,即所有批处理执行作业都已完成。可以通过流程引擎配置的batchPollTime(默认值:30秒)属性来配置轮询间隔。

Java API可用于获取批处理的监视作业的作业定义:

Batch batch = ...;

JobDefinition monitorJobDefinition = processEngine.getManagementService()

  .createJobDefinitionQuery()

  .jobDefinitionId(batch.getMonitorJobDefinitionId())

  .singleResult();

为了防止完成批处理,即删除运行时数据,可以使用管理服务来挂起监视作业定义:

processEngine.getManagementService()

  .suspendJobByJobDefinitionId(monitorJobDefinition.getId());

 

 

 

批处理操作

以下操作可以异步执行

1、流程实例迁移

2、取消正在运行的流程实例

3、删除历史流程实例

4、更新流程实例的挂起状态

5、设置与流程实例关联的作业的重试

6、流程实例修改

7、流程实例重启

8、设置外部任务重试

9、为历史流程实例设置删除时间

10、为历史决策实例设置删除时间

11、将移除时间设置为历史批次

所有批处理操作都依赖于相应的方法,这些方法提供了对实体列表进行同步操作的可能性。请参考常规批处理文档以更好地了解创建过程。

可以基于特定实例的列表以及提供实例的结果列表的查询结果来执行异步操作。如果同时提供了实例列表和查询,则受影响实例的结果集将由这两个子集的并集组成。

取消正在运行的流程实例

可以使用以下Java API方法调用以异步方式取消正在运行的流程实例:

List<String> processInstanceIds = ...;

runtimeService.deleteProcessInstancesAsync(

        processInstanceIds, null, REASON);

删除历史流程实例

可以使用以下Java API方法调用来异步执行历史流程实例的删除:

List<String> historicProcessInstanceIds = ...;

historyService.deleteHistoricProcessInstancesAsync(

        historicProcessInstanceIds, TEST_REASON);

更新流程实例的挂起状态

使用以下Java API方法调用异步更新多个流程实例的挂起状态:

List<String> processInstanceIds = ...;

runtimeService.updateProcessInstanceSuspensionState().byProcessInstanceIds(

  processInstanceIds).suspendAsync();

设置与流程实例相关联的作业的重试

可以使用以下Java API方法调用来异步执行设置与流程实例相关联的作业的重试:

List<String> processInstanceIds = ...;int retries = ...;

managementService.setJobRetriesAsync(

        processInstanceIds, null, retries);

设置外部任务重试

可以使用以下Java API方法调用来异步执行设置外部任务的重试:

List<String> externalTaskIds = ...;

externalTaskService.setRetriesAsync(

        externalTaskIds, TEST_REASON);

设定移除时间

有时有必要推迟甚至阻止某些历史实例的删除。删除时间可以与历史过程,决策和批次异步设置。

可以选择以下模式:

1、绝对:将清除时间设置为任意日期

.absoluteRemovalTime(Date removalTime)

2、已清除:重置移除时间(以null-value 表示);没有删除时间的实例不会被清理

.clearedRemovalTime()

3、计算:根据工作流引擎的设置重新计算删除时间(基准时间+ TTL)

.calculatedRemovalTime()

历史过程和决策实例可以是层次结构的一部分。要为层次结构中的所有实例设置相同的删除时间,.hierarchical()需要调用该方法。

历史流程实例

HistoricProcessInstanceQuery query =

  historyService.createHistoricProcessInstanceQuery();

Batch batch = historyService.setRemovalTimeToHistoricProcessInstances()

  .absoluteRemovalTime(new Date()) // sets an absolute removal time

   // .clearedRemovalTime()        // resets the removal time to null

   // .calculatedRemovalTime()     // calculation based on the engine's configuration

  .byQuery(query)

  .byIds("693206dd-11e9-b7cb-be5e0f7575b7", "...")

   // .hierarchical()              // sets a removal time across the hierarchy

  .executeAsync();

历史决策实例

HistoricDecisionInstanceQuery query =

  historyService.createHistoricDecisionInstanceQuery();

Batch batch = historyService.setRemovalTimeToHistoricDecisionInstances()

  .absoluteRemovalTime(new Date()) // sets an absolute removal time

   // .clearedRemovalTime()        // resets the removal time to null

   // .calculatedRemovalTime()     // calculation based on the engine's configuration

  .byQuery(query)

  .byIds("693206dd-11e9-b7cb-be5e0f7575b7", "...")

   // .hierarchical()              // sets a removal time across the hierarchy

  .executeAsync();

已知限制

.hierarchical()决策实例批处理操作的标志仅设置决策层次结构内的删除时间。如果业务规则任务调用了决策,则不会更新调用流程实例(包括层次结构中存在的其他流程实例)。

要更新根流程实例(所有流程以及决策实例)的层次结构中的所有子实例,请对.hierarchical()启用了标记的流程实例使用批处理操作。

历史批次

HistoricBatchQuery query = historyService.createHistoricBatchQuery();

Batch batch = historyService.setRemovalTimeToHistoricBatches()

  .absoluteRemovalTime(new Date()) // sets an absolute removal time

   // .clearedRemovalTime()        // resets the removal time to null

   // .calculatedRemovalTime()     // calculation based on the engine's configuration

  .byQuery(query)

  .byIds("693206dd-11e9-b7cb-be5e0f7575b7", "...")

  .executeAsync();

 技术支持:盘古BPM工作流平台

相关教程