activi源码走读
activi源码走读
架构概述
Activiti工作流引擎架构大致分为6层。从上到下依次为工作流引擎层、部署层、业务接口 层、命令拦截层、命令层和行为层。
工作流引擎层: 主要指ProcessEngine接口,这是Activiti所有接口的总入口。
部署层: 包括DeploymentBuilder和BpmnModel等与流程部署相关的类。理论上,部署层并不属于 Activiti引擎架构的分层体系。将其单独拿出来作为一层,只是为了突出其重要性。流程运转基于流 程定义,而流程定义解析就是流程的开始。从流程模型转换为流程定义、将其解析为简单Java对象
(Plain Ordinary Java Object,POJO),都是基于部署层实现的。
业务接口层: 面向业务提供各种服务接口,如RuntimeService、TaskService等。
命令拦截层:采用责任链模式,通过拦截器层为命令的执行创造条件,如开启事务、创建CommandContext
上下文、记录日志等。
命令层: Activiti的业务处理层。Activiti的整体编码模式采用的是命令模式,将业务逻辑封装为一个个Command接口实现类。这样,新增一个业务功能时只需新增一个Command实现。
行为层:包括各种FlowNodeActivityBehavior和ActivitiEventListener,这些类负责执行和监听Activiti
流程具体的流转动作。
activiti设计模式
命令模式
Activiti源码主要应用了命令模式、责任链模式和命令链模式。
命令模式属于行为型模式。可以将一个请求或者操作(包含接受者信息)封装到命令对象中,然后将该命令对象交由执行者执行,执行者无须关心命令的接收人或者命令的具体内容,因为这些信息均已被封装到命令对象中。命令模式中涉及的角色及其作用分别介绍如下。
Command(抽象命令类):抽象命令对象,可以根据不同的命令类型,写出不同的实现类。
ConcreteCommand(具体命令类):抽象命令对象的具体实现。
Invoker(调用者):请求的发送者,通过命令对象来执行请求。一个调用者并不需要在设计时确定其接收者,因此它只与抽象命令之间存在关联。程序运行时,会调用命令对象的execute()方法,间接调用接收者的相关操作。
Receiver(接收者):接收者执行与请求相关的操作,是真正执行命令的对象,实现对请求的业务
处理。
Client(客户端):在客户类中需要创建调用者对象、具体命令类对象,创建具体命令对象时需要指
定对应的接收者。发送者和接收者之间不存在关联关系,均通过命令对象来调用。
Activiti中的每一个数据库的增、删、改、查操作,均为将一个命令的实现交给Activiti的命令执行者执行。
Activiti使用一个CommandContext类作为命令接收者,该类维护一系列Manager 对象,这些Manager对象类似J2EE中的数据访问对象(Data Access Object,DAO)。
在Activiti中构建的命令模式的类,主要包括以下5个部分。
- Command: 抽象命令接口。该接口定义了一个execute()抽象方法,调用该方法时,需要传入参数CommandContext。
- CommandContext:命令上下文。CommandContext实例从Context获取,以栈的形式存储在使用本地线程的变量中(ThreadLocal)。
- CommandExecutor:命令执行者。该接口提供了两种方法执行命令,可以同时传入命令配置参数 CommandConfig和Command,也可以只传入Command。
- ServiceImpl:Activiti的服务类,如TaskServiceImpl等,均继承ServiceImpl类。该类持有CommandExecutor 对象,在该服务实现中,构造各个Command的实现类传递给CommandExecutor执行。这个类是命令的 发送者,对应标准命令模式定义中的Client。
- CommandInterceptor:命令拦截器,有多个实现类。它被CommandExecutor实现类调用,是最终的命令执行者,同时也是串接命令模式和责任链模式的衔接点。
责任链模式
Activiti使用了一系列命令拦截器(CommandInterceptor),这些命令拦截器扮演着命令执行者的角色。
责任链模式也是行为型模式。该设计模式让多个对象都有机会处理请求,从而避免了请求发送者与请求接收者耦合的情况。这些请求接收者将组成一条链,并沿着 这条链传递请求,直到有一个对象处理请求为止。责任链模式有以下参与者。
Handler(请求处理者接口):定义一个处理请求的接口,包含抽象处理方法和一个后继链。
ConcreteHandler(请求处理者实现):请求处理接口的实现,它可以判断是否能够处理本次请求,如
果可以处理请求就进行处理,否则就将该请求转发给它的后继者。
Client(客户端):组装责任链,并向链头的具体对象提交请求。它不关心处理细节和请求的传递
过程。
Activiti责任链模式下的类主要有以下几种。
CommandInterceptor:命令拦截器接口。它是采用命令模式实现的拦截器,作为责任链的“链节点”的定义,可以执行命令,也可以获取和设置下一个“链节点”。
ProcessEngineConfigurationImpl:维护整条责任链的类,在该工作流引擎抽象实现类中,实现了对整
条责任链的维护。
CommandInvoker:CommandInterceptor责任链“链节点”实现类之一,负责责任链末尾节点的命令
执行。
命令链模式
Activiti命令链模式实质上就是命令模式和责任链模式 结合的产物。
ProcessEngineConfigurationImpl:
public void initCommandExecutor() {
if (commandExecutor == null) {
CommandInterceptor first = initInterceptorChain(commandInterceptors);
commandExecutor = new CommandExecutorImpl(getDefaultCommandConfig(), first);
}
}
public CommandInterceptor initInterceptorChain(List<CommandInterceptor> chain) {
if (chain == null || chain.isEmpty()) {
throw new ActivitiException("invalid command interceptor chain configuration: " + chain);
}
for (int i = 0; i < chain.size() - 1; i++) {
chain.get(i).setNext(chain.get(i + 1));
}
return chain.get(0);
}
核心代码走读
流程模型:
• 是通过流程设计器(比如 Web 模型编辑器)绘制的流程图。
• 主要用来设计和预览流程,但尚未实际部署。
• 通常以 JSON 格式 存储在 ACT_RE_MODEL 表的 META_INFO 字段中,用于描述设计阶段的流程结构。
流程定义:
• 是将流程模型部署到引擎后的产物,真正可以在引擎中执行。
• 通常以 XML 格式(符合 BPMN 2.0 标准)存储。
• XML 文件的内容描述了流程的详细定义(任务、网关、事件等),存储在 ACT_GE_BYTEARRAY 表中,供引擎加载和运行。
流程模型部署过程可以划分为以下3个步骤。
第一步,获取ACT_RE_MODEL表中的流程模型数据,将其转换为BpmnModel对象。由于不同流程设计 器的保存格式也不一样,所以这部分逻辑主要由用户自己实现。
第二步,调用Activiti提供的BpmnXMLConverter类。该类支持BpmnModel对象和符合BPMN 2.0规范的 XML互相转换。
第三步,使用Activiti提供的RepositoryService接口,执行流程模型部署操作。
// 创建部署构造器
DeploymentBuilder deploymentBuilder = repositoryService.createDeployment();
// 定义部署对象的ID、name、key等属性 deploymentBuilder.processDefinitionId(processDefinitionId)
.name(processModel.getName()).key(processModel.getKey()).tenantId(tenantFlag);
// 设置XML输入流
deploymentBuilder.addInputStream(processModel.getKey().replaceAll(" ", "") + ".bpmn",
new ByteArrayInputStream(modelXML));
// 设置表单、决策表等,代码略
// 执行流程模型部署操作
Deployment deployment = deploymentBuilder.deploy();
DeploymentManager:
public void deploy(DeploymentEntity deployment, Map<String, Object> deploymentSettings) {
for (Deployer deployer : deployers) {
deployer.deploy(deployment, deploymentSettings);
}
}
流程模型部署
BpmnDeployer:
public void deploy(DeploymentEntity deployment, Map<String, Object> deploymentSettings) {
//ParsedDeployment用于描述与流程定义相关联的部署信息、BPMN资源和模型等
ParsedDeployment parsedDeployment = parsedDeploymentBuilderFactory
.getBuilderForDeploymentAndSettings(deployment, deploymentSettings).build();
//校验是否有重复的流程定义key
bpmnDeploymentHelper.verifyProcessDefinitionsDoNotShareKeys(
parsedDeployment.getAllProcessDefinitions());
//将Deployment对象中的部分数据赋给流程定义
bpmnDeploymentHelper.copyDeploymentValuesToProcessDefinitions(
parsedDeployment.getDeployment(),
parsedDeployment.getAllProcessDefinitions());
//设置流程定义中的资源名称
bpmnDeploymentHelper.setResourceNamesOnProcessDefinitions(parsedDeployment);
//创建新的流程图
createAndPersistNewDiagramsIfNeeded(parsedDeployment);
//设置流程定义的DiagramResourceName属性
setProcessDefinitionDiagramNames(parsedDeployment);
if (deployment.isNew()) {
Map<ProcessDefinitionEntity, ProcessDefinitionEntity>
mapOfNewProcessDefinitionToPreviousVersion =
getPreviousVersionsOfProcessDefinitions(parsedDeployment);
//设置流程定义的版本号和ID
setProcessDefinitionVersionsAndIds(parsedDeployment, mapOfNewProcessDefinitionToPreviousVersion);
//持久化流程定义到数据库并且添加有关联的用户信息
persistProcessDefinitionsAndAuthorizations(parsedDeployment);
//更新TimerJob和EventSubscription
updateTimersAndEvents(parsedDeployment,mapOfNewProcessDefinitionToPreviousVersion);
//派发流程定义对象初始化事件
dispatchProcessDefinitionEntityInitializedEvent(parsedDeployment);
} else {
//非新建部署对象保持之前保存的版本号
makeProcessDefinitionsConsistentWithPersistedVersions(parsedDeployment);
}
//将流程定义数据保存到缓存中
cachingAndArtifactsManager.updateCachingAndArtifacts(parsedDeployment);
}
流程定义解析
流程模型部署后,流程定义作为扩展名为.bpmn20.xml或者.bpmn.xml的XML文件,保存在ACT_GE_BYTEARRAY资源表中。流程执行过程十分依赖流程定义信息,因此,需要解析流程定义,将之转换为工作流引擎易于使用的对象,使得工作流引擎可以在创建、流转流程的过程中,方便地按照流程定义进行正确的处理。
Activiti解析流程定义其实也是在流程部署过程中进行的。BpmnDeployer类的deploy()方法,执行流程定义解析的代码如下:
ParsedDeployment parsedDeployment = parsedDeploymentBuilderFactory
.getBuilderForDeploymentAndSettings(deployment, deploymentSettings).build();
ParsedDeploymentBuilder:
public ParsedDeployment build() {
List<ProcessDefinitionEntity> processDefinitions = new ArrayList<ProcessDefinitionEntity>();
Map<ProcessDefinitionEntity, BpmnParse> processDefinitionsToBpmnParseMap
= new LinkedHashMap<ProcessDefinitionEntity, BpmnParse>();
Map<ProcessDefinitionEntity, ResourceEntity> processDefinitionsToResourceMap
= new LinkedHashMap<ProcessDefinitionEntity, ResourceEntity>();
for (ResourceEntity resource : deployment.getResources().values()) {
if (isBpmnResource(resource.getName())) {
log.debug("Processing BPMN resource {}", resource.getName());
BpmnParse parse = createBpmnParseFromResource(resource);
for (ProcessDefinitionEntity processDefinition : parse.getProcessDefinitions()) {
processDefinitions.add(processDefinition);
processDefinitionsToBpmnParseMap.put(processDefinition, parse);
processDefinitionsToResourceMap.put(processDefinition, resource);
}
}
}
return new ParsedDeployment(deployment, processDefinitions,
processDefinitionsToBpmnParseMap, processDefinitionsToResourceMap);
}
protected BpmnParse createBpmnParseFromResource(ResourceEntity resource) {
String resourceName = resource.getName();
ByteArrayInputStream inputStream = new ByteArrayInputStream(resource.getBytes());
BpmnParse bpmnParse = bpmnParser.createParse()
.sourceInputStream(inputStream)
.setSourceSystemId(resourceName)
.deployment(deployment)
.name(resourceName);
if (deploymentSettings != null) {
// Schema validation if needed
if (deploymentSettings.containsKey(DeploymentSettings.IS_BPMN20_XSD_VALIDATION_ENABLED)) {
bpmnParse.setValidateSchema((Boolean) deploymentSettings.get(DeploymentSettings.IS_BPMN20_XSD_VALIDATION_ENABLED));
}
// Process validation if needed
if (deploymentSettings.containsKey(DeploymentSettings.IS_PROCESS_VALIDATION_ENABLED)) {
bpmnParse.setValidateProcess((Boolean) deploymentSettings.get(DeploymentSettings.IS_PROCESS_VALIDATION_ENABLED));
}
} else {
// On redeploy, we assume it is validated at the first deploy
bpmnParse.setValidateSchema(false);
bpmnParse.setValidateProcess(false);
}
bpmnParse.execute();
return bpmnParse;
}
public BpmnParse execute() {
try {
ProcessEngineConfigurationImpl processEngineConfiguration = Context.getProcessEngineConfiguration();
BpmnXMLConverter converter = new BpmnXMLConverter();
boolean enableSafeBpmnXml = false;
String encoding = null;
if (processEngineConfiguration != null) {
enableSafeBpmnXml = processEngineConfiguration.isEnableSafeBpmnXml();
encoding = processEngineConfiguration.getXmlEncoding();
}
if (encoding != null) {
bpmnModel = converter.convertToBpmnModel(streamSource, validateSchema,
enableSafeBpmnXml, encoding); } else {
} ...
bpmnModel = converter.convertToBpmnModel(streamSource, validateSchema, enableSafeBpmnXml);
} catch (Exception e) {
if (e instanceof ActivitiException) {
... }
return this;
}
流程启动
我们调用RuntimeService接口的createProcessInstanceBuilder()方法创建一个ProcessInstanceBuilder对象, 然后调用ProcessInstanceBuilder的start()方法来启动流程实例(省略部分代码):
// 创建流程实例构造器
ProcessInstanceBuilder processInstanceBuilder = runtimeService.createProcessInstanceBuilder();
...// 设置流程实例属性
// 启动流程实例
ProcessInstance instance = processInstanceBuilder.start();
展开ProcessInstanceBuilder的start()方法源码,可知它调用RuntimeService接口的startProcessInstance()方法 启动流程实例:
public class ProcessInstanceBuilderImpl implements ProcessInstanceBuilder {
public ProcessInstance start() {
return runtimeService.startProcessInstance(this);
} }
RuntimeServiceImpl:
public ProcessInstance startProcessInstance(ProcessInstanceBuilderImpl processInstanceBuilder) {
if (processInstanceBuilder.getProcessDefinitionId() != null || processInstanceBuilder.getProcessDefinitionKey() != null) {
return commandExecutor.execute(new StartProcessInstanceCmd<ProcessInstance>(processInstanceBuilder));
} else if (processInstanceBuilder.getMessageName() != null) {
return commandExecutor.execute(new StartProcessInstanceByMessageCmd(processInstanceBuilder));
} else {
throw new ActivitiIllegalArgumentException("No processDefinitionId, processDefinitionKey nor messageName provided");
}
}
在以上代码中,如果设置了ProcessDefinitionId或ProcessDefinitionKey属性,RuntimeService会调用Start ProcessInstanceCmd命令启动流程;如果设置了MessageName属性,RuntimeService会调用StartProcessInstance ByMessageCmd命令启动流程,否则将抛出异常。
下面将以StartProcessInstanceCmd命令为例,讲解基于流程定义启动流程的过程。
public class StartProcessInstanceCmd<T> implements Command<ProcessInstance>, Serializable {
public ProcessInstance execute(CommandContext commandContext) {
DeploymentManager deploymentCache =
commandContext.getProcessEngineConfiguration().getDeploymentManager();
// 1.获取流程定义
ProcessDefinition processDefinition = null;
if (processDefinitionId != null) {
...//基于流程定义ID获取流程定义
} else if (processDefinitionKey != null &&
(tenantId == null || ProcessEngineConfiguration.NO_TENANT_ID.equals(tenantId))) {
...//基于流程定义key获取流程定义
} else if (processDefinitionKey != null &&
tenantId != null && !ProcessEngineConfiguration.NO_TENANT_ID.equals(tenantId)) {
...//基于流程定义key和租户ID获取流程定义
} else {
throw new ActivitiIllegalArgumentException(
"processDefinitionKey and processDefinitionId are null");
}
processInstanceHelper =
commandContext.getProcessEngineConfiguration().getProcessInstanceHelper();
// 2.开始创建流程实例
ProcessInstance processInstance = createAndStartProcessInstance(processDefinition,
businessKey, processInstanceName, variables, transientVariables);
return processInstance;
}
protected ProcessInstance createAndStartProcessInstance(ProcessDefinition processDefinition, String businessKey, String processInstanceName, Map<String, Object> variables, Map<String, Object> transientVariables) {
return processInstanceHelper.createAndStartProcessInstance(processDefinition, businessKey, processInstanceName, variables, transientVariables);
}
}
先根据参数查找相应的流程定义,然 后调用createAndStartProcessInstance()方法,通过调用ProcessInstanceHelper类创建并发起流程实例。
public ProcessInstance createAndStartProcessInstanceWithInitialFlowElement(ProcessDefinition processDefinition,
String businessKey, String processInstanceName, FlowElement initialFlowElement,
Process process, Map<String, Object> variables, Map<String, Object> transientVariables, boolean startProcessInstance) {
CommandContext commandContext = Context.getCommandContext();
// 创建流程实例
String initiatorVariableName = null;
if (initialFlowElement instanceof StartEvent) {
initiatorVariableName = ((StartEvent) initialFlowElement).getInitiator();
}
ExecutionEntity processInstance = commandContext.getExecutionEntityManager()
.createProcessInstanceExecution(processDefinition, businessKey, processDefinition.getTenantId(), initiatorVariableName);
commandContext.getHistoryManager().recordProcessInstanceStart(processInstance, initialFlowElement);
processInstance.setVariables(processDataObjects(process.getDataObjects()));
//设置流程变量
// Set the variables passed into the start command
if (variables != null) {
for (String varName : variables.keySet()) {
processInstance.setVariable(varName, variables.get(varName));
}
}
if (transientVariables != null) {
for (String varName : transientVariables.keySet()) {
processInstance.setTransientVariable(varName, transientVariables.get(varName));
}
}
//设置流程实例名称
if (processInstanceName != null) {
processInstance.setName(processInstanceName);
commandContext.getHistoryManager().recordProcessInstanceNameChange(processInstance.getId(), processInstanceName);
}
//发布全局事件
if (Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
Context.getProcessEngineConfiguration().getEventDispatcher()
.dispatchEvent(ActivitiEventBuilder.createEntityWithVariablesEvent(ActivitiEventType.ENTITY_INITIALIZED, processInstance, variables, false));
}
//创建第一个执行实例
ExecutionEntity execution = commandContext.getExecutionEntityManager().createChildExecution(processInstance);
//设置当前节点为开始节点
execution.setCurrentFlowElement(initialFlowElement);
//判断是否启动流程
if (startProcessInstance) {
startProcessInstance(processInstance, commandContext, variables);
}
//返回流程实例
return processInstance;
}
public void startProcessInstance(ExecutionEntity processInstance, CommandContext commandContext, Map<String, Object> variables) {
Process process = ProcessDefinitionUtil.getProcess(processInstance.getProcessDefinitionId());
// Event sub process handling
List<MessageEventSubscriptionEntity> messageEventSubscriptions = new LinkedList<>();
for (FlowElement flowElement : process.getFlowElements()) {
if (flowElement instanceof EventSubProcess) {
EventSubProcess eventSubProcess = (EventSubProcess) flowElement;
for (FlowElement subElement : eventSubProcess.getFlowElements()) {
if (subElement instanceof StartEvent) {
StartEvent startEvent = (StartEvent) subElement;
if (CollectionUtil.isNotEmpty(startEvent.getEventDefinitions())) {
EventDefinition eventDefinition = startEvent.getEventDefinitions().get(0);
if (eventDefinition instanceof MessageEventDefinition) {
MessageEventDefinition messageEventDefinition = (MessageEventDefinition) eventDefinition;
BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(processInstance.getProcessDefinitionId());
if (bpmnModel.containsMessageId(messageEventDefinition.getMessageRef())) {
messageEventDefinition.setMessageRef(bpmnModel.getMessage(messageEventDefinition.getMessageRef()).getName());
}
ExecutionEntity messageExecution = commandContext.getExecutionEntityManager().createChildExecution(processInstance);
messageExecution.setCurrentFlowElement(startEvent);
messageExecution.setEventScope(true);
messageEventSubscriptions
.add(commandContext.getEventSubscriptionEntityManager().insertMessageEvent(messageEventDefinition.getMessageRef(), messageExecution));
}
}
}
}
}
}
//获取第一个子执行实例
ExecutionEntity execution = processInstance.getExecutions().get(0); // There will always be one child execution created
//继续流转流程
commandContext.getAgenda().planContinueProcessOperation(execution);
//发布流程启动全局事件
if (Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
ActivitiEventDispatcher eventDispatcher = Context.getProcessEngineConfiguration().getEventDispatcher();
eventDispatcher.dispatchEvent(ActivitiEventBuilder.createProcessStartedEvent(execution, variables, false));
for (MessageEventSubscriptionEntity messageEventSubscription : messageEventSubscriptions) {
commandContext.getProcessEngineConfiguration().getEventDispatcher()
.dispatchEvent(ActivitiEventBuilder.createMessageEvent(ActivitiEventType.ACTIVITY_MESSAGE_WAITING, messageEventSubscription.getActivityId(),
messageEventSubscription.getEventName(), null, messageEventSubscription.getExecution().getId(),
messageEventSubscription.getProcessInstanceId(), messageEventSubscription.getProcessDefinitionId()));
}
}
}
节点流转
流程启动后,Activiti工作流引擎将进行节点流转。Activiti流转流程主要依赖ActivitiEngineAgenda接口实 现,该接口主要包含以下方法:
public interface ActivitiEngineAgenda extends Agenda {
// 计划继续流程操作
void planContinueProcessOperation(ExecutionEntity execution);
// 计划继续流程同步操作
void planContinueProcessSynchronousOperation(ExecutionEntity execution);
// 计划继续流程补偿操作
void planContinueProcessInCompensation(ExecutionEntity execution);
// 计划继续多实例操作
void planContinueMultiInstanceOperation(ExecutionEntity execution);
// 计划外出顺序流操作
void planTakeOutgoingSequenceFlowsOperation(ExecutionEntity execution, boolean evaluateConditions);
// 计划结束执行操作
void planEndExecutionOperation(ExecutionEntity execution);
// 计划触发器执行操作
void planTriggerExecutionOperation(ExecutionEntity execution);
// 计划销毁当前作用域操作
void planDestroyScopeOperation(ExecutionEntity execution);
// 计划执行闲置行为操作
void planExecuteInactiveBehaviorsOperation();
}
ActivitiEngineAgenda接口的默认实现类是DefaultActivitiEngineAgenda,该类持有两个变量。其代码如下:
protected LinkedList<Runnable> operations = new LinkedList<Runnable>();
protected CommandContext commandContext;
其中operations是一个链表,在流程流转过程中,不同阶段会通过ActivitiEngineAgenda接口将不同的
operation加入链表。过程如下:
public void planContinueProcessOperation(ExecutionEntity execution) {
planOperation(new ContinueProcessOperation(commandContext, execution)); }
public void planOperation(Runnable operation) { operations.add(operation);
if (operation instanceof AbstractOperation) {
ExecutionEntity execution = ((AbstractOperation) operation).getExecution();
if (execution != null) {
commandContext.addInvolvedExecution(execution);
}
}
logger.debug("Operation {} added to agenda", operation.getClass());
}
加入operations链表中的operation将在CommandInvoker中被弹出执行,这也是ActivitiEngineAgenda接口 方法都以plan开头的原因。
接下来,让我们看一下CommandInvoker如何执行这些operation:
public class CommandInvoker extends AbstractCommandInterceptor {
public <T> T execute(final CommandConfig config, final Command<T> command) {
final CommandContext commandContext = Context.getCommandContext();
commandContext.getAgenda().planOperation(new Runnable() {
@Override
public void run() {
commandContext.setResult(command.execute(commandContext));
}
});
//执行Operations
executeOperations(commandContext);
if (commandContext.hasInvolvedExecutions()) {
Context.getAgenda().planExecuteInactiveBehaviorsOperation();
executeOperations(commandContext);
}
return (T) commandContext.getResult();
}
//执行所有操作
protected void executeOperations(final CommandContext commandContext) {
while (!commandContext.getAgenda().isEmpty()) {
//取出Operation
Runnable runnable = commandContext.getAgenda().getNextOperation();
executeOperation(runnable);
} }
//执行单个操作
public void executeOperation(Runnable runnable) {
if (runnable instanceof AbstractOperation) {
AbstractOperation operation = (AbstractOperation) runnable;
if (operation.getExecution() == null || !operation.getExecution().isEnded()) {
runnable.run();
}
} else {
runnable.run();
}
} }
CommandInvoker会遍历所有的operation,调用相应命令的run()方法进行执行。
我们已经将execution的当前节点设置为了 startEvent,并调用了planContinueProcessOperation()方法压入了ContinueProcessOperation。
接下来,CommandInvoker会调用ContinueProcessOperation的run()方法,继续推进流程流转(省略部分 代码):
public class ContinueProcessOperation extends AbstractOperation {
@Override
public void run() {
//获取当前节点(注意,当前节点是开始节点)
FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
continueThroughFlowNode((FlowNode) currentFlowElement);
...
}
//通过流节点
protected void continueThroughFlowNode(FlowNode flowNode) {
...
executeSynchronous(flowNode);
...
}
//同步执行
protected void executeSynchronous(FlowNode flowNode) {
...
//获取当前节点的behavior
ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior(); if (activityBehavior != null) {
executeActivityBehavior(activityBehavior, flowNode); }
... }
//执行behavior
protected void executeActivityBehavior(ActivityBehavior activityBehavior,
FlowNode flowNode) {
...
//调用behavior的execute()方法
activityBehavior.execute(execution);
...
} }
当前节点是一个空开始事件节点,对应的behavior是NoneStartEventActivityBehavior,这个behavior中什
么也没有,直接继承父类方法,在父类FlowNodeActivityBehavior中调用leave()方法。其代码如下:
public class NoneStartEventActivityBehavior extends FlowNodeActivityBehavior {
private static final long serialVersionUID = 1L;
// 这个开始节点不需要做什么,执行父类的方法
}
public abstract class FlowNodeActivityBehavior implements TriggerableActivityBehavior { private static final long serialVersionUID = 1L;
protected BpmnActivityBehavior bpmnActivityBehavior = new BpmnActivityBehavior(); public void execute(DelegateExecution execution) {
leave(execution); }
//离开节点
public void leave(DelegateExecution execution) {
bpmnActivityBehavior.performDefaultOutgoingBehavior((ExecutionEntity) execution);
}
}
FlowNodeActivityBehavior的leave()方法调用了bpmnActivityBehavior对象的performDefaultOutgoingBehavior() 方法,执行节点外出顺序流的behavior。接下来,让我们进入BpmnActivityBehavior源码,看一下它执行外出顺序 流的相关操作。其源码如下:
public class BpmnActivityBehavior implements Serializable {
public void performDefaultOutgoingBehavior(ExecutionEntity activityExecution) {
performOutgoingBehavior(activityExecution, true, false); }
protected void performOutgoingBehavior(ExecutionEntity execution, boolean checkConditions, boolean throwExceptionIfExecutionStuck) {
//将执行外出顺序流操作放入执行计划
Context.getAgenda().planTakeOutgoingSequenceFlowsOperation(execution, true);
} }
它将执行外出顺序流的操作TakeOutgoingSequenceFlowsOperation放入执行计划。接下来,CommandInvoker 会调用TakeOutgoingSequenceFlowsOperation的run()方法,将当前节点变更为顺序流1。相关代码如下(省略部 分代码):
public class TakeOutgoingSequenceFlowsOperation extends AbstractOperation {
@Override
public void run() {
//获取当前节点(注意,当前节点还是开始节点)
FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
if (currentFlowElement instanceof FlowNode) {
handleFlowNode((FlowNode) currentFlowElement);
} else if (currentFlowElement instanceof SequenceFlow) {
handleSequenceFlow();
}
}
//处理流节点
protected void handleFlowNode(FlowNode flowNode) {
handleActivityEnd(flowNode);
if (flowNode.getParentContainer() != null
&& flowNode.getParentContainer() instanceof AdhocSubProcess) {
handleAdhocSubProcess(flowNode);
} else {
leaveFlowNode(flowNode);
}
}
//离开流节点
protected void leaveFlowNode(FlowNode flowNode) {
...//获取“开始”节点外出顺序流
SequenceFlow sequenceFlow = outgoingSequenceFlows.get(0); execution.setCurrentFlowElement(sequenceFlow); execution.setActive(true);
//此时将执行实例的当前节点变更为“顺序流1”
outgoingExecutions.add((ExecutionEntity) execution);
...
// 将继续流程操作放入执行计划
for (ExecutionEntity outgoingExecution : outgoingExecutions) {
Context.getAgenda().planContinueProcessOperation(outgoingExecution);
} }
}
TakeOutgoingSequenceFlowsOperation执行完相关操作后,再次调用planContinueProcessOperation()方法, 将顺序流/后续操作放入执行计划。
接下来,CommandInvoker将第二次执行ContinueProcessOperation操作,实现顺序流1到“申请”节点的流转,即将流程的当前节点设置为“申请”节点。相关代码如下(省略部分代码):
public class ContinueProcessOperation extends AbstractOperation { @Override
public void run() {
//获取当前节点(注意,当前节点是顺序流1)
FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
continueThroughSequenceFlow((SequenceFlow) currentFlowElement); ...
}
//通过顺序流
protected void continueThroughSequenceFlow(SequenceFlow sequenceFlow) {
...
//获取顺序流的目标节点,并将目标节点设置为执行实例的当前节点
FlowElement targetFlowElement = sequenceFlow.getTargetFlowElement(); execution.setCurrentFlowElement(targetFlowElement); //将继续流程操作放入执行计划
Context.getAgenda().planContinueProcessOperation(execution);
} }
修改完当前节点后,CommandInvoker会第三次执行ContinueProcessOperation操作,这次当前节点是
FlowNode,因此会执行continueThroughFlowNode()方法。相关代码如下(省略部分代码):
public class ContinueProcessOperation extends AbstractOperation { @Override
public void run() {
//获取当前节点(注意,当前节点是“申请”节点)
FlowElement currentFlowElement = getCurrentFlowElement(execution);
...
continueThroughFlowNode((FlowNode) currentFlowElement); ...
}
//通过流节点
protected void continueThroughFlowNode(FlowNode flowNode) {
... executeSynchronous(flowNode); ...
}
//同步执行
protected void executeSynchronous(FlowNode flowNode) {
... //获取当前节点的behavior
ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior();
if (activityBehavior != null) {
executeActivityBehavior(activityBehavior, flowNode); }
... }
//执行behavior
protected void executeActivityBehavior(ActivityBehavior activityBehavior,
FlowNode flowNode) {
...
//调用behavior的execute()方法
activityBehavior.execute(execution);
...
} }
由于当前节点是用户节点,对应的behavior是UserTaskActivityBehavior,所以会调用该behavior的execute() 方法创建相应的用户任务。
此时,流程从开始节点流转到了“申请”用户节点.
网关控制
当通过“审批”用户节点后,流程会继续往下流转,这时会经过一个排他网关,Activiti如何处理网关节点呢?
我们将从调用Taskservice类的complete()方法完成“审批”用户任务这个动作入口开始,了解Activiti流转到网关并执行网关处理相关逻辑的过程:
public class TaskServiceImpl extends ServiceImpl implements TaskService{ public void complete(String taskId, Map<String, Object> variables) {
commandExecutor.execute(new CompleteTaskCmd(taskId, variables)); }
}
接下来,让我们进入CompleteTaskCmd源码,看一下“审批”任务完成时,流程是如何流转到网关节点 的(省略部分代码):
public class CompleteTaskCmd extends AbstractCompleteTaskCmd {
protected Void execute(CommandContext commandContext, TaskEntity task) {
...
executeTaskComplete(commandContext, task, variables, localScope);
return null;
}
//执行完成任务相关操作
protected void executeTaskComplete(CommandContext commandContext,
TaskEntity taskEntity,
Map<String, Object> variables,
boolean localScope) {
...
if (taskEntity.getExecutionId() != null) {
ExecutionEntity executionEntity = commandContext.getExecutionEntityManager() .findById(taskEntity.getExecutionId());
//将触发执行实例操作放入执行计划
Context.getAgenda().planTriggerExecutionOperation(executionEntity);
} }
}
CompleteTaskCmd在任务完成后,将TriggerExecutionOperation放入执行计划,通过TriggerExecutionOperation 来触发执行实例继续流转(省略部分代码):
public class TriggerExecutionOperation extends AbstractOperation {
@Override
public void run() {
//获取当前节点(注意,当前节点是“审批”用户节点)
FlowElement currentFlowElement = getCurrentFlowElement(execution); if (currentFlowElement instanceof FlowNode) {
ActivityBehavior activityBehavior =
(ActivityBehavior) ((FlowNode) currentFlowElement).getBehavior();
if (activityBehavior instanceof TriggerableActivityBehavior) {
...
//调用activityBehavior的trigger()方法
((TriggerableActivityBehavior) activityBehavior).trigger(execution, null, null); ...
}
... }
... }
}
当前节点“审批”是用户任务(UserTask),对应的behavior是UserTaskActivityBehavior。因此,会调用 UserTaskActivityBehavior的trigger()方法(省略部分代码):
public class UserTaskActivityBehavior extends TaskActivityBehavior {
public void trigger(DelegateExecution execution, String signalName, Object signalData) { ...
//离开执行计划
leave(execution); }
}
此处省略了UserTaskActivityBehavior的具体操作,仅关注流程的流转。UserTaskActivityBehavior执行操作后,调用父类FlowNodeActivityBehavior的leave()方法(省略部分代码):
public abstract class FlowNodeActivityBehavior implements TriggerableActivityBehavior { public void leave(DelegateExecution execution) {
bpmnActivityBehavior.performDefaultOutgoingBehavior((ExecutionEntity) execution); }
}
以上代码段在执行leave()方法时,调用BpmnActivityBehavior的performDefaultOutgoingBehavior()方法, 将执行外出顺序流操作放入执行计划。
public class BpmnActivityBehavior implements Serializable {
public void performDefaultOutgoingBehavior(ExecutionEntity activityExecution) {
performOutgoingBehavior(activityExecution, true, false); }
protected void performOutgoingBehavior(ExecutionEntity execution, boolean checkConditions, boolean throwExceptionIfExecutionStuck) {
//将执行外出顺序流操作放入执行计划
Context.getAgenda().planTakeOutgoingSequenceFlowsOperation(execution, true);
}
}
当CommandInvoker执行TakeOutgoingSequenceFlowsOperation时,当前节点还是“审批”用户节点,因此会执行leaveFlowNode()方法,将当前节点流转为“审批”用户节点之后的外出顺序流3(省略部分代码):
public class TakeOutgoingSequenceFlowsOperation extends AbstractOperation { @Override
public void run() {
//获取当前节点(注意,当前节点还是“审批”用户节点)
FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
if (currentFlowElement instanceof FlowNode) {
handleFlowNode((FlowNode) currentFlowElement);
} else if (currentFlowElement instanceof SequenceFlow) {
handleSequenceFlow();
}
}
//处理流节点
protected void handleFlowNode(FlowNode flowNode) {
handleActivityEnd(flowNode);
if (flowNode.getParentContainer() != null
&& flowNode.getParentContainer() instanceof AdhocSubProcess) {
handleAdhocSubProcess(flowNode);
} else {
leaveFlowNode(flowNode); }
}
//离开流节点
protected void leaveFlowNode(FlowNode flowNode) {
...//获取“审批”用户节点外出顺序流
SequenceFlow sequenceFlow = outgoingSequenceFlows.get(0); execution.setCurrentFlowElement(sequenceFlow); execution.setActive(true);
//此时执行实例的当前节点变更为顺序流3
outgoingExecutions.add((ExecutionEntity) execution);
...
// 将继续流程操作放入执行计划
for (ExecutionEntity outgoingExecution : outgoingExecutions) {
Context.getAgenda().planContinueProcessOperation(outgoingExecution);
} }
}
TakeOutgoingSequenceFlowsOperation变更当前节点为“顺序流3”后,将继续流程操作放入执行计划链 表,CommandInvoker会调用ContinueProcessOperation的run()方法流转流程。当前节点是顺序流,所以会调用 continueThroughSequenceFlow()方法(省略部分代码):
public class ContinueProcessOperation extends AbstractOperation { @Override
public void run() {
//获取当前节点(注意,当前节点是顺序流3)
FlowElement currentFlowElement = getCurrentFlowElement(execution);
...
continueThroughSequenceFlow((SequenceFlow) currentFlowElement); ...
}
//通过顺序流
protected void continueThroughSequenceFlow(SequenceFlow sequenceFlow) {
...
//获取顺序流的目标节点,并将目标节点设置为执行实例的当前节点
FlowElement targetFlowElement = sequenceFlow.getTargetFlowElement(); execution.setCurrentFlowElement(targetFlowElement);
//将执行实例放入执行计划
Context.getAgenda().planContinueProcessOperation(execution);
} }
上面这段代码已将流程的当前节点变更为“网关”节点,并再次将执行实例放入执行计划,因此, CommandInvoker会再次调用ContinueProcessOperation的run()方法流转流程。当前节点是网关节点,所以执行 continueThroughFlowNode()方法(省略部分代码):
public class ContinueProcessOperation extends AbstractOperation { @Override
public void run() {
//获取当前节点(注意,当前节点是网关)
FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
continueThroughFlowNode((FlowNode) currentFlowElement);
...
}
//通过流节点
protected void continueThroughFlowNode(FlowNode flowNode) {
... executeSynchronous(flowNode); ...
}
//同步执行
protected void executeSynchronous(FlowNode flowNode) {
...
//获取当前节点的behavior
ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior(); if (activityBehavior != null) {
executeActivityBehavior(activityBehavior, flowNode); }
... }
//执行behavior
protected void executeActivityBehavior(ActivityBehavior activityBehavior, FlowNode flowNode) {
...
//调用behavior的execute()方法 activityBehavior.execute(execution); ...
} }
当前节点是排他网关节点,对应的behavior是ExclusiveGatewayActivityBehavior,接下来正式进入排他网 关的behavior执行代码。代码如下(省略部分代码):
public class ExclusiveGatewayActivityBehavior extends GatewayActivityBehavior {
//排他网关的execute()方法,继承父类的execute()方法,默认执行leave()操作
@Override
public void leave(DelegateExecution execution) {
ExclusiveGateway exclusiveGateway =
(ExclusiveGateway) execution.getCurrentFlowElement();
...
SequenceFlow outgoingSequenceFlow = null;
SequenceFlow defaultSequenceFlow = null;
String defaultSequenceFlowId = exclusiveGateway.getDefaultFlow();
//判断要流转哪条外出顺序流
Iterator<SequenceFlow> sequenceFlowIterator =
exclusiveGateway.getOutgoingFlows().iterator();
while (outgoingSequenceFlow == null && sequenceFlowIterator.hasNext()) {
SequenceFlow sequenceFlow = sequenceFlowIterator.next();
String skipExpressionString = sequenceFlow.getSkipExpression();
if (!SkipExpressionUtil
.isSkipExpressionEnabled(execution, skipExpressionString)) { //判断分支条件是否为true
boolean conditionEvaluatesToTrue =
ConditionUtil.hasTrueCondition(sequenceFlow, execution);
if (conditionEvaluatesToTrue && (defaultSequenceFlowId == null ||
!defaultSequenceFlowId.equals(sequenceFlow.getId()))) {
outgoingSequenceFlow = sequenceFlow;
}
} else if (SkipExpressionUtil
.shouldSkipFlowElement(Context.getCommandContext(),
execution, skipExpressionString)) {
outgoingSequenceFlow = sequenceFlow;
}
//查找默认外出顺序流
if (defaultSequenceFlowId != null
&& defaultSequenceFlowId.equals(sequenceFlow.getId())) {
defaultSequenceFlow = sequenceFlow;
}
}
//记录网关节点的结束时间 Context.getCommandContext().getHistoryManager()
.recordActivityEnd((ExecutionEntity) execution, null);
//离开网关
if (outgoingSequenceFlow != null) {
//将找到的符合条件的外出顺序流设置为当前节点
execution.setCurrentFlowElement(outgoingSequenceFlow);
} else {
if (defaultSequenceFlow != null) { //将默认外出顺序流设置为当前节点 execution.setCurrentFlowElement(defaultSequenceFlow);
} else {
//没有找到可以执行的外出顺序流,报错
throw new NoOutgoingException("No outgoing sequence flow of the exclusive gateway
'" + exclusiveGateway.getId() + "' could be selected for continuing the process"); }
} //当前节点变更为网关外出顺序流 //离开执行计划 super.leave(execution);
} }
public abstract class FlowNodeActivityBehavior implements TriggerableActivityBehavior {
public void execute(DelegateExecution execution) { leave(execution);
}
public void leave(DelegateExecution execution) { bpmnActivityBehavior.performDefaultOutgoingBehavior(
(ExecutionEntity) execution);
} }
public class BpmnActivityBehavior implements Serializable {
public void performDefaultOutgoingBehavior(ExecutionEntity activityExecution) { performOutgoingBehavior(activityExecution, true, false);
}
protected void performOutgoingBehavior(ExecutionEntity execution, boolean checkConditions, boolean throwExceptionIfExecutionStuck) {
//将执行外出顺序流操作放入执行计划
Context.getAgenda().planTakeOutgoingSequenceFlowsOperation(execution, true);
} }
流程结束
首先,CommandInvoker会从执行计划中获取并执行TakeOutgoingSequenceFlowsOperation(省略部分代码):
public class TakeOutgoingSequenceFlowsOperation extends AbstractOperation { @Override
public void run() {
FlowElement currentFlowElement = getCurrentFlowElement(execution); //获取当前节点(注意,当前节点是排他网关节点后的外出顺序流5)
...
if (currentFlowElement instanceof FlowNode) {
handleFlowNode((FlowNode) currentFlowElement);
} else if (currentFlowElement instanceof SequenceFlow) {
handleSequenceFlow(); }
}
//处理流节点
protected void handleSequenceFlow() { commandContext.getHistoryManager().recordActivityEnd(execution, null);
// 将继续流程操作放入执行计划
Context.getAgenda().planContinueProcessOperation(execution);
} }
TakeOutgoingSequenceFlowsOperation再次将ContinueProcessOperation放入执行计划。CommandInvoker再次 调用ContinueProcessOperation的run()方法,由于当前节点是顺序流节点,所以调用continueThroughSequenceFlow() 方法,将顺序流5的“结束”目标节点变更为当前节点(省略部分代码):
public class ContinueProcessOperation extends AbstractOperation { @Override
public void run() { //获取当前节点(注意,当前节点是排他网关节点后的外出顺序流5)
FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
continueThroughSequenceFlow((SequenceFlow) currentFlowElement); ...
}
//通过顺序流
protected void continueThroughSequenceFlow(SequenceFlow sequenceFlow) {
...
//获取顺序流的目标节点,并将目标节点变更为执行实例的当前节点
FlowElement targetFlowElement = sequenceFlow.getTargetFlowElement(); execution.setCurrentFlowElement(targetFlowElement); //将执行实例放入执行计划 Context.getAgenda().planContinueProcessOperation(execution);
} }
设置当前节点后,程序会再次执行ContinueProcessOperation的run()方法,此时当前节点变更为结束节点。
结束节点是FlowNode,因此会执行continueThroughFlowNode()方法(省略部分代码):
public class ContinueProcessOperation extends AbstractOperation { @Override
public void run() {
//获取当前节点(注意,当前节点是结束节点)
FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
continueThroughFlowNode((FlowNode) currentFlowElement);
...
}
//通过流节点
protected void continueThroughFlowNode(FlowNode flowNode) {
... executeSynchronous(flowNode); ...
}
//同步执行
protected void executeSynchronous(FlowNode flowNode) { ...
//获取当前节点的behavior
ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior(); if (activityBehavior != null) {
executeActivityBehavior(activityBehavior, flowNode); }
... }
//执行behavior
protected void executeActivityBehavior(ActivityBehavior activityBehavior,
FlowNode flowNode) {
...
//调用behavior的execute()方法
activityBehavior.execute(execution);
...
}
当前节点是结束节点,对应的behavior是NoneEndEventActivityBehavior,因此接下来会调用NoneEnd EventActivityBehavior的execute()方法。 NoneEndEventActivityBehavior不做任何操作,只将TakeOutgoingSequenceFlowsOperation放入执行计划。 当CommandInvoker再次执行TakeOutgoingSequenceFlowsOperation时,由于结束节点没有外出顺序流,所以 将结束流程操作EndExecutionOperation放入执行计划(省略部分代码):
public class TakeOutgoingSequenceFlowsOperation extends AbstractOperation { @Override
public void run() {
//获取当前节点(注意,当前节点是结束节点)
FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
if (currentFlowElement instanceof FlowNode) {
handleFlowNode((FlowNode) currentFlowElement);
} else if (currentFlowElement instanceof SequenceFlow) {
handleSequenceFlow();
}
}
//处理流节点
protected void handleFlowNode(FlowNode flowNode) {
handleActivityEnd(flowNode);
if (flowNode.getParentContainer() != null
&& flowNode.getParentContainer() instanceof AdhocSubProcess) {
handleAdhocSubProcess(flowNode);
} else {
leaveFlowNode(flowNode); }
}
//离开流节点
protected void leaveFlowNode(FlowNode flowNode) {
...
//结束节点没有外出顺序流
if (outgoingSequenceFlows.size() == 0) {
if (flowNode.getOutgoingFlows() == null || flowNode.getOutgoingFlows().size() == 0) { //将结束执行实例操作放入执行计划 Context.getAgenda().planEndExecutionOperation(execution);
} ...
} ...
} }
最后,CommandInvoker执行EndExecutionOperation操作结束流程的执行实例时,还会执行流程结束监听 器,业务上可以在流程结束监听器中处理与流程结束相关的业务操作,如给申请人发通知告知流程已完成等 (省略部分代码):
public class EndExecutionOperation extends AbstractOperation { @Override
public void run() {
if (execution.isProcessInstanceType()) {
handleProcessInstanceExecution(execution); } else {
handleRegularExecution();
}
}
//处理流程实例的执行实例
protected void handleProcessInstanceExecution(
ExecutionEntity processInstanceExecution) {
ExecutionEntityManager executionEntityManager =
commandContext.getExecutionEntityManager();
String processInstanceId = processInstanceExecution.getId();
ExecutionEntity superExecution = processInstanceExecution.getSuperExecution(); SubProcessActivityBehavior subProcessActivityBehavior = null;
//存在父级执行实例
if (superExecution != null) {
...//执行父级执行实例当前节点的behavior的completing()方法 //获取流程实例当前活动的执行实例数量
int activeExecutions = getNumberOfActiveChildExecutionsForProcessInstance(
executionEntityManager, processInstanceId);
if (activeExecutions == 0) {
//删除流程实例的执行实例 executionEntityManager.deleteProcessInstanceExecutionEntity(
processInstanceId, execution.getCurrentFlowElement() != null ?
execution.getCurrentFlowElement().getId() : null, null, false, false); } else {
logger.debug("Active executions found. Process instance {} will not be ended.", processInstanceId);
}
Process process = ProcessDefinitionUtil.getProcess(
processInstanceExecution.getProcessDefinitionId());
//执行流程结束监听器
if (CollectionUtil.isNotEmpty(process.getExecutionListeners())) {
executeExecutionListeners(process, processInstanceExecution, ExecutionListener.EVENTNAME_END);
}
//存在父级执行实例
if (superExecution != null) {
...//执行父级执行实例当前节点的behavior的completed()方法 }
} }
乐观锁
Activiti基于数据库乐观锁机制解决并发问题。在Activiti数据表中每张表都存在一个字段REV_,该字段 用于实现乐观锁。下面以多线程同时操作ACT_RU_TASK表中的同一个Task为例,详细说明Activiti乐观锁的 实现过程。假设两个线程A和B同时对一个taskId进行complete操作,则有如下过程:
(1)A线程拿到Task相关数据,此时字段REV_的值为1。_
(2)B线程也拿到Task相关数据,此时字段REV_的值也为1。 _
_(3)A线程完成该任务,更新ID为taskId,并将REV_的值更新为2。
(4)这时B线程也完成该任务,但是更新时,已无法找到ID为taskId且REV_值为1的记录了。此时,数
据库update操作返回受影响的行数为0,这时Activiti会抛出一个异常ActivitiOptimisticLockingException。
(5)B线程执行的事务回滚。