activi源码走读

fxz大约 49 分钟

activi源码走读

架构概述

Activiti工作流引擎架构大致分为6层。从上到下依次为工作流引擎层、部署层、业务接口 层、命令拦截层、命令层和行为层。

  • 工作流引擎层: 主要指ProcessEngine接口,这是Activiti所有接口的总入口。

  • 部署层: 包括DeploymentBuilder和BpmnModel等与流程部署相关的类。理论上,部署层并不属于 Activiti引擎架构的分层体系。将其单独拿出来作为一层,只是为了突出其重要性。流程运转基于流 程定义,而流程定义解析就是流程的开始。从流程模型转换为流程定义、将其解析为简单Java对象

    (Plain Ordinary Java Object,POJO),都是基于部署层实现的。

  • 业务接口层: 面向业务提供各种服务接口,如RuntimeService、TaskService等。

  • 命令拦截层:采用责任链模式,通过拦截器层为命令的执行创造条件,如开启事务、创建CommandContext

    上下文、记录日志等。

  • 命令层: Activiti的业务处理层。Activiti的整体编码模式采用的是命令模式,将业务逻辑封装为一个个Command接口实现类。这样,新增一个业务功能时只需新增一个Command实现。

  • 行为层:包括各种FlowNodeActivityBehavior和ActivitiEventListener,这些类负责执行和监听Activiti

    流程具体的流转动作。

activiti设计模式

命令模式

Activiti源码主要应用了命令模式、责任链模式和命令链模式。

命令模式属于行为型模式。可以将一个请求或者操作(包含接受者信息)封装到命令对象中,然后将该命令对象交由执行者执行,执行者无须关心命令的接收人或者命令的具体内容,因为这些信息均已被封装到命令对象中。命令模式中涉及的角色及其作用分别介绍如下。

  • Command(抽象命令类):抽象命令对象,可以根据不同的命令类型,写出不同的实现类。

  • ConcreteCommand(具体命令类):抽象命令对象的具体实现。

  • Invoker(调用者):请求的发送者,通过命令对象来执行请求。一个调用者并不需要在设计时确定其接收者,因此它只与抽象命令之间存在关联。程序运行时,会调用命令对象的execute()方法,间接调用接收者的相关操作。

  • Receiver(接收者):接收者执行与请求相关的操作,是真正执行命令的对象,实现对请求的业务

    处理。

  • Client(客户端):在客户类中需要创建调用者对象、具体命令类对象,创建具体命令对象时需要指

    定对应的接收者。发送者和接收者之间不存在关联关系,均通过命令对象来调用。

Activiti中的每一个数据库的增、删、改、查操作,均为将一个命令的实现交给Activiti的命令执行者执行。

Activiti使用一个CommandContext类作为命令接收者,该类维护一系列Manager 对象,这些Manager对象类似J2EE中的数据访问对象(Data Access Object,DAO)。

在Activiti中构建的命令模式的类,主要包括以下5个部分。

  1. Command: 抽象命令接口。该接口定义了一个execute()抽象方法,调用该方法时,需要传入参数CommandContext。
  2. CommandContext:命令上下文。CommandContext实例从Context获取,以栈的形式存储在使用本地线程的变量中(ThreadLocal)。
  3. CommandExecutor:命令执行者。该接口提供了两种方法执行命令,可以同时传入命令配置参数 CommandConfig和Command,也可以只传入Command。
  4. ServiceImpl:Activiti的服务类,如TaskServiceImpl等,均继承ServiceImpl类。该类持有CommandExecutor 对象,在该服务实现中,构造各个Command的实现类传递给CommandExecutor执行。这个类是命令的 发送者,对应标准命令模式定义中的Client。
  5. CommandInterceptor:命令拦截器,有多个实现类。它被CommandExecutor实现类调用,是最终的命令执行者,同时也是串接命令模式和责任链模式的衔接点。

责任链模式

Activiti使用了一系列命令拦截器(CommandInterceptor),这些命令拦截器扮演着命令执行者的角色。

责任链模式也是行为型模式。该设计模式让多个对象都有机会处理请求,从而避免了请求发送者与请求接收者耦合的情况。这些请求接收者将组成一条链,并沿着 这条链传递请求,直到有一个对象处理请求为止。责任链模式有以下参与者。

  • Handler(请求处理者接口):定义一个处理请求的接口,包含抽象处理方法和一个后继链。

  • ConcreteHandler(请求处理者实现):请求处理接口的实现,它可以判断是否能够处理本次请求,如

    果可以处理请求就进行处理,否则就将该请求转发给它的后继者。

  • Client(客户端):组装责任链,并向链头的具体对象提交请求。它不关心处理细节和请求的传递

    过程。

Activiti责任链模式下的类主要有以下几种。

  • CommandInterceptor:命令拦截器接口。它是采用命令模式实现的拦截器,作为责任链的“链节点”的定义,可以执行命令,也可以获取和设置下一个“链节点”。

  • ProcessEngineConfigurationImpl:维护整条责任链的类,在该工作流引擎抽象实现类中,实现了对整

    条责任链的维护。

  • CommandInvoker:CommandInterceptor责任链“链节点”实现类之一,负责责任链末尾节点的命令

    执行。

命令链模式

Activiti命令链模式实质上就是命令模式和责任链模式 结合的产物。

ProcessEngineConfigurationImpl:

public void initCommandExecutor() {
    if (commandExecutor == null) {
      CommandInterceptor first = initInterceptorChain(commandInterceptors);
      commandExecutor = new CommandExecutorImpl(getDefaultCommandConfig(), first);
    }
}

public CommandInterceptor initInterceptorChain(List<CommandInterceptor> chain) {
    if (chain == null || chain.isEmpty()) {
      throw new ActivitiException("invalid command interceptor chain configuration: " + chain);
    }
    for (int i = 0; i < chain.size() - 1; i++) {
      chain.get(i).setNext(chain.get(i + 1));
    }
    return chain.get(0);
}

核心代码走读

流程模型

​ • 是通过流程设计器(比如 Web 模型编辑器)绘制的流程图。

​ • 主要用来设计和预览流程,但尚未实际部署。

​ • 通常以 JSON 格式 存储在 ACT_RE_MODEL 表的 META_INFO 字段中,用于描述设计阶段的流程结构。

流程定义

​ • 是将流程模型部署到引擎后的产物,真正可以在引擎中执行。

​ • 通常以 XML 格式(符合 BPMN 2.0 标准)存储。

​ • XML 文件的内容描述了流程的详细定义(任务、网关、事件等),存储在 ACT_GE_BYTEARRAY 表中,供引擎加载和运行。

流程模型部署过程可以划分为以下3个步骤。

第一步,获取ACT_RE_MODEL表中的流程模型数据,将其转换为BpmnModel对象。由于不同流程设计 器的保存格式也不一样,所以这部分逻辑主要由用户自己实现。

第二步,调用Activiti提供的BpmnXMLConverter类。该类支持BpmnModel对象和符合BPMN 2.0规范的 XML互相转换。

第三步,使用Activiti提供的RepositoryService接口,执行流程模型部署操作。

// 创建部署构造器
DeploymentBuilder deploymentBuilder = repositoryService.createDeployment(); 
// 定义部署对象的ID、name、key等属性 deploymentBuilder.processDefinitionId(processDefinitionId)
.name(processModel.getName()).key(processModel.getKey()).tenantId(tenantFlag); 
// 设置XML输入流
deploymentBuilder.addInputStream(processModel.getKey().replaceAll(" ", "") + ".bpmn",
    new ByteArrayInputStream(modelXML));
// 设置表单、决策表等,代码略
// 执行流程模型部署操作
Deployment deployment = deploymentBuilder.deploy();
DeploymentManagerpublic void deploy(DeploymentEntity deployment, Map<String, Object> deploymentSettings) {
  for (Deployer deployer : deployers) {
    deployer.deploy(deployment, deploymentSettings);
  }
}

流程模型部署

BpmnDeployer:

public void deploy(DeploymentEntity deployment, Map<String, Object> deploymentSettings) {
//ParsedDeployment用于描述与流程定义相关联的部署信息、BPMN资源和模型等
  ParsedDeployment parsedDeployment = parsedDeploymentBuilderFactory
        .getBuilderForDeploymentAndSettings(deployment, deploymentSettings).build();

  //校验是否有重复的流程定义key 
  bpmnDeploymentHelper.verifyProcessDefinitionsDoNotShareKeys(
parsedDeployment.getAllProcessDefinitions()); 
  
  //将Deployment对象中的部分数据赋给流程定义 
  bpmnDeploymentHelper.copyDeploymentValuesToProcessDefinitions(
        parsedDeployment.getDeployment(),
    
parsedDeployment.getAllProcessDefinitions());
  
//设置流程定义中的资源名称 
  bpmnDeploymentHelper.setResourceNamesOnProcessDefinitions(parsedDeployment);
  
  //创建新的流程图
createAndPersistNewDiagramsIfNeeded(parsedDeployment); 
  
  //设置流程定义的DiagramResourceName属性
  setProcessDefinitionDiagramNames(parsedDeployment);
    if (deployment.isNew()) {
        Map<ProcessDefinitionEntity, ProcessDefinitionEntity>
            mapOfNewProcessDefinitionToPreviousVersion =
getPreviousVersionsOfProcessDefinitions(parsedDeployment); 
      
      //设置流程定义的版本号和ID
			setProcessDefinitionVersionsAndIds(parsedDeployment, mapOfNewProcessDefinitionToPreviousVersion); 
      
      //持久化流程定义到数据库并且添加有关联的用户信息 
      persistProcessDefinitionsAndAuthorizations(parsedDeployment);
      //更新TimerJob和EventSubscription
      updateTimersAndEvents(parsedDeployment,mapOfNewProcessDefinitionToPreviousVersion);
			//派发流程定义对象初始化事件
      dispatchProcessDefinitionEntityInitializedEvent(parsedDeployment);
    } else {
				//非新建部署对象保持之前保存的版本号
        makeProcessDefinitionsConsistentWithPersistedVersions(parsedDeployment);
    }
  
	//将流程定义数据保存到缓存中
  cachingAndArtifactsManager.updateCachingAndArtifacts(parsedDeployment); 
}

流程定义解析

流程模型部署后,流程定义作为扩展名为.bpmn20.xml或者.bpmn.xml的XML文件,保存在ACT_GE_BYTEARRAY资源表中。流程执行过程十分依赖流程定义信息,因此,需要解析流程定义,将之转换为工作流引擎易于使用的对象,使得工作流引擎可以在创建、流转流程的过程中,方便地按照流程定义进行正确的处理。

Activiti解析流程定义其实也是在流程部署过程中进行的。BpmnDeployer类的deploy()方法,执行流程定义解析的代码如下:

ParsedDeployment parsedDeployment = parsedDeploymentBuilderFactory
    .getBuilderForDeploymentAndSettings(deployment, deploymentSettings).build();
ParsedDeploymentBuilder:

public ParsedDeployment build() {
  List<ProcessDefinitionEntity> processDefinitions = new ArrayList<ProcessDefinitionEntity>();
  Map<ProcessDefinitionEntity, BpmnParse> processDefinitionsToBpmnParseMap 
    = new LinkedHashMap<ProcessDefinitionEntity, BpmnParse>();
  Map<ProcessDefinitionEntity, ResourceEntity> processDefinitionsToResourceMap 
    = new LinkedHashMap<ProcessDefinitionEntity, ResourceEntity>();

  for (ResourceEntity resource : deployment.getResources().values()) {
    if (isBpmnResource(resource.getName())) {
      log.debug("Processing BPMN resource {}", resource.getName());
      BpmnParse parse = createBpmnParseFromResource(resource);
      for (ProcessDefinitionEntity processDefinition : parse.getProcessDefinitions()) {
        processDefinitions.add(processDefinition);
        processDefinitionsToBpmnParseMap.put(processDefinition, parse);
        processDefinitionsToResourceMap.put(processDefinition, resource);
      }
    }
  }

  return new ParsedDeployment(deployment, processDefinitions, 
      processDefinitionsToBpmnParseMap, processDefinitionsToResourceMap);
}


protected BpmnParse createBpmnParseFromResource(ResourceEntity resource) {
  String resourceName = resource.getName();
  ByteArrayInputStream inputStream = new ByteArrayInputStream(resource.getBytes());

  BpmnParse bpmnParse = bpmnParser.createParse()
      .sourceInputStream(inputStream)
      .setSourceSystemId(resourceName)
      .deployment(deployment)
      .name(resourceName);

  if (deploymentSettings != null) {

    // Schema validation if needed
    if (deploymentSettings.containsKey(DeploymentSettings.IS_BPMN20_XSD_VALIDATION_ENABLED)) {
      bpmnParse.setValidateSchema((Boolean) deploymentSettings.get(DeploymentSettings.IS_BPMN20_XSD_VALIDATION_ENABLED));
    }

    // Process validation if needed
    if (deploymentSettings.containsKey(DeploymentSettings.IS_PROCESS_VALIDATION_ENABLED)) {
      bpmnParse.setValidateProcess((Boolean) deploymentSettings.get(DeploymentSettings.IS_PROCESS_VALIDATION_ENABLED));
    }

  } else {
    // On redeploy, we assume it is validated at the first deploy
    bpmnParse.setValidateSchema(false);
    bpmnParse.setValidateProcess(false);
  }
  
  bpmnParse.execute();
  return bpmnParse;
}
public BpmnParse execute() {
  try {
ProcessEngineConfigurationImpl processEngineConfiguration = Context.getProcessEngineConfiguration();
        BpmnXMLConverter converter = new BpmnXMLConverter();
        boolean enableSafeBpmnXml = false;
        String encoding = null;
        if (processEngineConfiguration != null) {
            enableSafeBpmnXml = processEngineConfiguration.isEnableSafeBpmnXml();
            encoding = processEngineConfiguration.getXmlEncoding();
        }
if (encoding != null) {
bpmnModel = converter.convertToBpmnModel(streamSource, validateSchema,
enableSafeBpmnXml, encoding); } else {
} ...
bpmnModel = converter.convertToBpmnModel(streamSource, validateSchema, enableSafeBpmnXml);
    } catch (Exception e) {
        if (e instanceof ActivitiException) {
... }
    return this;
}

流程启动

我们调用RuntimeService接口的createProcessInstanceBuilder()方法创建一个ProcessInstanceBuilder对象, 然后调用ProcessInstanceBuilder的start()方法来启动流程实例(省略部分代码):

// 创建流程实例构造器
ProcessInstanceBuilder processInstanceBuilder = runtimeService.createProcessInstanceBuilder();

...// 设置流程实例属性
 // 启动流程实例
 ProcessInstance instance = processInstanceBuilder.start();

展开ProcessInstanceBuilder的start()方法源码,可知它调用RuntimeService接口的startProcessInstance()方法 启动流程实例:

public class ProcessInstanceBuilderImpl implements ProcessInstanceBuilder {
  public ProcessInstance start() {
        return runtimeService.startProcessInstance(this);
} }
RuntimeServiceImpl:

public ProcessInstance startProcessInstance(ProcessInstanceBuilderImpl processInstanceBuilder) {
  if (processInstanceBuilder.getProcessDefinitionId() != null || processInstanceBuilder.getProcessDefinitionKey() != null) {
    return commandExecutor.execute(new StartProcessInstanceCmd<ProcessInstance>(processInstanceBuilder));
  } else if (processInstanceBuilder.getMessageName() != null) {
    return commandExecutor.execute(new StartProcessInstanceByMessageCmd(processInstanceBuilder));
  } else {
    throw new ActivitiIllegalArgumentException("No processDefinitionId, processDefinitionKey nor messageName provided");
  }
  
}

在以上代码中,如果设置了ProcessDefinitionId或ProcessDefinitionKey属性,RuntimeService会调用Start ProcessInstanceCmd命令启动流程;如果设置了MessageName属性,RuntimeService会调用StartProcessInstance ByMessageCmd命令启动流程,否则将抛出异常。

下面将以StartProcessInstanceCmd命令为例,讲解基于流程定义启动流程的过程。


public class StartProcessInstanceCmd<T> implements Command<ProcessInstance>, Serializable {
    public ProcessInstance execute(CommandContext commandContext) {
        DeploymentManager deploymentCache =
                commandContext.getProcessEngineConfiguration().getDeploymentManager();
// 1.获取流程定义
        ProcessDefinition processDefinition = null;
        if (processDefinitionId != null) {
...//基于流程定义ID获取流程定义
        } else if (processDefinitionKey != null &&
                (tenantId == null || ProcessEngineConfiguration.NO_TENANT_ID.equals(tenantId))) {
...//基于流程定义key获取流程定义
        } else if (processDefinitionKey != null &&
                tenantId != null && !ProcessEngineConfiguration.NO_TENANT_ID.equals(tenantId)) {
...//基于流程定义key和租户ID获取流程定义
        } else {
            throw new ActivitiIllegalArgumentException(
                    "processDefinitionKey and processDefinitionId are null");
        }
        processInstanceHelper =
                commandContext.getProcessEngineConfiguration().getProcessInstanceHelper();
// 2.开始创建流程实例
        ProcessInstance processInstance = createAndStartProcessInstance(processDefinition,
                businessKey, processInstanceName, variables, transientVariables);
        return processInstance;
    }

    protected ProcessInstance createAndStartProcessInstance(ProcessDefinition processDefinition, String businessKey, String processInstanceName, Map<String, Object> variables, Map<String, Object> transientVariables) {
        return processInstanceHelper.createAndStartProcessInstance(processDefinition, businessKey, processInstanceName, variables, transientVariables);
    }
}

先根据参数查找相应的流程定义,然 后调用createAndStartProcessInstance()方法,通过调用ProcessInstanceHelper类创建并发起流程实例。

public ProcessInstance createAndStartProcessInstanceWithInitialFlowElement(ProcessDefinition processDefinition,
      String businessKey, String processInstanceName, FlowElement initialFlowElement,
      Process process, Map<String, Object> variables, Map<String, Object> transientVariables, boolean startProcessInstance) {

    CommandContext commandContext = Context.getCommandContext();

    // 创建流程实例
    String initiatorVariableName = null;
    if (initialFlowElement instanceof StartEvent) {
      initiatorVariableName = ((StartEvent) initialFlowElement).getInitiator();
    }

    ExecutionEntity processInstance = commandContext.getExecutionEntityManager()
    		.createProcessInstanceExecution(processDefinition, businessKey, processDefinition.getTenantId(), initiatorVariableName);

    commandContext.getHistoryManager().recordProcessInstanceStart(processInstance, initialFlowElement);

    processInstance.setVariables(processDataObjects(process.getDataObjects()));

    //设置流程变量
    // Set the variables passed into the start command
    if (variables != null) {
      for (String varName : variables.keySet()) {
        processInstance.setVariable(varName, variables.get(varName));
      }
    }
    if (transientVariables != null) {
      for (String varName : transientVariables.keySet()) {
        processInstance.setTransientVariable(varName, transientVariables.get(varName));
      }
    }

    //设置流程实例名称
    if (processInstanceName != null) {
      processInstance.setName(processInstanceName);
      commandContext.getHistoryManager().recordProcessInstanceNameChange(processInstance.getId(), processInstanceName);
    }

    //发布全局事件
    if (Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
      Context.getProcessEngineConfiguration().getEventDispatcher()
      .dispatchEvent(ActivitiEventBuilder.createEntityWithVariablesEvent(ActivitiEventType.ENTITY_INITIALIZED, processInstance, variables, false));
    }

    //创建第一个执行实例
    ExecutionEntity execution = commandContext.getExecutionEntityManager().createChildExecution(processInstance);
    //设置当前节点为开始节点
    execution.setCurrentFlowElement(initialFlowElement);

    //判断是否启动流程
    if (startProcessInstance) {
        startProcessInstance(processInstance, commandContext, variables);
      }

    //返回流程实例
      return processInstance;
    }

    public void startProcessInstance(ExecutionEntity processInstance, CommandContext commandContext, Map<String, Object> variables) {

      Process process = ProcessDefinitionUtil.getProcess(processInstance.getProcessDefinitionId());


    // Event sub process handling
      List<MessageEventSubscriptionEntity> messageEventSubscriptions = new LinkedList<>();
    for (FlowElement flowElement : process.getFlowElements()) {
      if (flowElement instanceof EventSubProcess) {
        EventSubProcess eventSubProcess = (EventSubProcess) flowElement;
        for (FlowElement subElement : eventSubProcess.getFlowElements()) {
          if (subElement instanceof StartEvent) {
            StartEvent startEvent = (StartEvent) subElement;
            if (CollectionUtil.isNotEmpty(startEvent.getEventDefinitions())) {
              EventDefinition eventDefinition = startEvent.getEventDefinitions().get(0);
              if (eventDefinition instanceof MessageEventDefinition) {
                MessageEventDefinition messageEventDefinition = (MessageEventDefinition) eventDefinition;
                BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(processInstance.getProcessDefinitionId());
                if (bpmnModel.containsMessageId(messageEventDefinition.getMessageRef())) {
                  messageEventDefinition.setMessageRef(bpmnModel.getMessage(messageEventDefinition.getMessageRef()).getName());
                }
                ExecutionEntity messageExecution = commandContext.getExecutionEntityManager().createChildExecution(processInstance);
                messageExecution.setCurrentFlowElement(startEvent);
                messageExecution.setEventScope(true);
                messageEventSubscriptions
                .add(commandContext.getEventSubscriptionEntityManager().insertMessageEvent(messageEventDefinition.getMessageRef(), messageExecution));
              }
            }
          }
        }
      }
    }

      //获取第一个子执行实例
    ExecutionEntity execution = processInstance.getExecutions().get(0); // There will always be one child execution created
      //继续流转流程
    commandContext.getAgenda().planContinueProcessOperation(execution);

      //发布流程启动全局事件
    if (Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
    	ActivitiEventDispatcher eventDispatcher = Context.getProcessEngineConfiguration().getEventDispatcher();
        eventDispatcher.dispatchEvent(ActivitiEventBuilder.createProcessStartedEvent(execution, variables, false));
        
        for (MessageEventSubscriptionEntity messageEventSubscription : messageEventSubscriptions) {
            commandContext.getProcessEngineConfiguration().getEventDispatcher()
                    .dispatchEvent(ActivitiEventBuilder.createMessageEvent(ActivitiEventType.ACTIVITY_MESSAGE_WAITING, messageEventSubscription.getActivityId(),
                            messageEventSubscription.getEventName(), null, messageEventSubscription.getExecution().getId(),
                            messageEventSubscription.getProcessInstanceId(), messageEventSubscription.getProcessDefinitionId()));
          }
    }
  }

节点流转

流程启动后,Activiti工作流引擎将进行节点流转。Activiti流转流程主要依赖ActivitiEngineAgenda接口实 现,该接口主要包含以下方法:

public interface ActivitiEngineAgenda extends Agenda {

// 计划继续流程操作
 void planContinueProcessOperation(ExecutionEntity execution);
 // 计划继续流程同步操作
 void planContinueProcessSynchronousOperation(ExecutionEntity execution);
 // 计划继续流程补偿操作
 void planContinueProcessInCompensation(ExecutionEntity execution);
 // 计划继续多实例操作
 void planContinueMultiInstanceOperation(ExecutionEntity execution);
 // 计划外出顺序流操作
 void planTakeOutgoingSequenceFlowsOperation(ExecutionEntity execution, boolean evaluateConditions);
 // 计划结束执行操作
 void planEndExecutionOperation(ExecutionEntity execution);
 // 计划触发器执行操作
 void planTriggerExecutionOperation(ExecutionEntity execution); 
  // 计划销毁当前作用域操作
 void planDestroyScopeOperation(ExecutionEntity execution);
 // 计划执行闲置行为操作
 void planExecuteInactiveBehaviorsOperation();

}

ActivitiEngineAgenda接口的默认实现类是DefaultActivitiEngineAgenda,该类持有两个变量。其代码如下:

protected LinkedList<Runnable> operations = new LinkedList<Runnable>();
protected CommandContext commandContext;

其中operations是一个链表,在流程流转过程中,不同阶段会通过ActivitiEngineAgenda接口将不同的

operation加入链表。过程如下:

public void planContinueProcessOperation(ExecutionEntity execution) {

planOperation(new ContinueProcessOperation(commandContext, execution)); }

public void planOperation(Runnable operation) { operations.add(operation);   
if (operation instanceof AbstractOperation) {
        ExecutionEntity execution = ((AbstractOperation) operation).getExecution();
        if (execution != null) {
            commandContext.addInvolvedExecution(execution);
        }
}
 logger.debug("Operation {} added to agenda", operation.getClass());
}

加入operations链表中的operation将在CommandInvoker中被弹出执行,这也是ActivitiEngineAgenda接口 方法都以plan开头的原因。

接下来,让我们看一下CommandInvoker如何执行这些operation:

public class CommandInvoker extends AbstractCommandInterceptor {
 public <T> T execute(final CommandConfig config, final Command<T> command) {
        final CommandContext commandContext = Context.getCommandContext();
        commandContext.getAgenda().planOperation(new Runnable() {
            @Override
            public void run() {
                commandContext.setResult(command.execute(commandContext));
            }
            });

	//执行Operations 
	executeOperations(commandContext);
    if (commandContext.hasInvolvedExecutions()) {
            Context.getAgenda().planExecuteInactiveBehaviorsOperation();
            executeOperations(commandContext);
        }
        return (T) commandContext.getResult();
    }
	//执行所有操作
protected void executeOperations(final CommandContext commandContext) {
while (!commandContext.getAgenda().isEmpty()) { 
  					//取出Operation
            Runnable runnable = commandContext.getAgenda().getNextOperation();
            executeOperation(runnable);
} }
//执行单个操作
public void executeOperation(Runnable runnable) {
if (runnable instanceof AbstractOperation) {
AbstractOperation operation = (AbstractOperation) runnable;
if (operation.getExecution() == null || !operation.getExecution().isEnded()) {
                runnable.run();
}
} else {
            runnable.run();
        }
} }

CommandInvoker会遍历所有的operation,调用相应命令的run()方法进行执行。

我们已经将execution的当前节点设置为了 startEvent,并调用了planContinueProcessOperation()方法压入了ContinueProcessOperation。

接下来,CommandInvoker会调用ContinueProcessOperation的run()方法,继续推进流程流转(省略部分 代码):

public class ContinueProcessOperation extends AbstractOperation { 
 @Override
public void run() {
 //获取当前节点(注意,当前节点是开始节点)
 FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
 continueThroughFlowNode((FlowNode) currentFlowElement);
 ...

}

//通过流节点
 protected void continueThroughFlowNode(FlowNode flowNode) {

	... 
  executeSynchronous(flowNode); 
   ...

}

//同步执行
 protected void executeSynchronous(FlowNode flowNode) {
...
 //获取当前节点的behavior
 ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior(); if (activityBehavior != null) {
executeActivityBehavior(activityBehavior, flowNode); }
... }

//执行behavior
 protected void executeActivityBehavior(ActivityBehavior activityBehavior,
        FlowNode flowNode) {
        ...
					//调用behavior的execute()方法 
          activityBehavior.execute(execution); 
   ...
} }


当前节点是一个空开始事件节点,对应的behavior是NoneStartEventActivityBehavior,这个behavior中什

么也没有,直接继承父类方法,在父类FlowNodeActivityBehavior中调用leave()方法。其代码如下:

public class NoneStartEventActivityBehavior extends FlowNodeActivityBehavior { 
  private static final long serialVersionUID = 1L;

// 这个开始节点不需要做什么,执行父类的方法 
}

public abstract class FlowNodeActivityBehavior implements TriggerableActivityBehavior { private static final long serialVersionUID = 1L;
 protected BpmnActivityBehavior bpmnActivityBehavior = new BpmnActivityBehavior(); public void execute(DelegateExecution execution) {

leave(execution); }

//离开节点
 public void leave(DelegateExecution execution) {
bpmnActivityBehavior.performDefaultOutgoingBehavior((ExecutionEntity) execution); 
 }
}


FlowNodeActivityBehavior的leave()方法调用了bpmnActivityBehavior对象的performDefaultOutgoingBehavior() 方法,执行节点外出顺序流的behavior。接下来,让我们进入BpmnActivityBehavior源码,看一下它执行外出顺序 流的相关操作。其源码如下:

 public class BpmnActivityBehavior implements Serializable {

public void performDefaultOutgoingBehavior(ExecutionEntity activityExecution) {

performOutgoingBehavior(activityExecution, true, false); }

protected void performOutgoingBehavior(ExecutionEntity execution, boolean checkConditions, boolean throwExceptionIfExecutionStuck) { 
  //将执行外出顺序流操作放入执行计划 
  Context.getAgenda().planTakeOutgoingSequenceFlowsOperation(execution, true);

} }

它将执行外出顺序流的操作TakeOutgoingSequenceFlowsOperation放入执行计划。接下来,CommandInvoker 会调用TakeOutgoingSequenceFlowsOperation的run()方法,将当前节点变更为顺序流1。相关代码如下(省略部 分代码):

public class TakeOutgoingSequenceFlowsOperation extends AbstractOperation { 
  @Override
public void run() {
 //获取当前节点(注意,当前节点还是开始节点)
 FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
 if (currentFlowElement instanceof FlowNode) {
         handleFlowNode((FlowNode) currentFlowElement);
        } else if (currentFlowElement instanceof SequenceFlow) {
            handleSequenceFlow();
        }
}
  //处理流节点
 protected void handleFlowNode(FlowNode flowNode) {
        handleActivityEnd(flowNode);
        if (flowNode.getParentContainer() != null
            && flowNode.getParentContainer() instanceof AdhocSubProcess) {
            handleAdhocSubProcess(flowNode);
        } else {
            leaveFlowNode(flowNode);

}

   }
//离开流节点
protected void leaveFlowNode(FlowNode flowNode) {
...//获取“开始”节点外出顺序流
SequenceFlow sequenceFlow = outgoingSequenceFlows.get(0); execution.setCurrentFlowElement(sequenceFlow); execution.setActive(true); 
  //此时将执行实例的当前节点变更为“顺序流1” 
  outgoingExecutions.add((ExecutionEntity) execution);
...
// 将继续流程操作放入执行计划
for (ExecutionEntity outgoingExecution : outgoingExecutions) {
            Context.getAgenda().planContinueProcessOperation(outgoingExecution);
} }
}

TakeOutgoingSequenceFlowsOperation执行完相关操作后,再次调用planContinueProcessOperation()方法, 将顺序流/后续操作放入执行计划。

接下来,CommandInvoker将第二次执行ContinueProcessOperation操作,实现顺序流1到“申请”节点的流转,即将流程的当前节点设置为“申请”节点。相关代码如下(省略部分代码):

public class ContinueProcessOperation extends AbstractOperation { @Override

public void run() {
 //获取当前节点(注意,当前节点是顺序流1)
 FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
 continueThroughSequenceFlow((SequenceFlow) currentFlowElement); ...

}

//通过顺序流
 protected void continueThroughSequenceFlow(SequenceFlow sequenceFlow) {

...
 //获取顺序流的目标节点,并将目标节点设置为执行实例的当前节点
 FlowElement targetFlowElement = sequenceFlow.getTargetFlowElement(); execution.setCurrentFlowElement(targetFlowElement); //将继续流程操作放入执行计划 
   Context.getAgenda().planContinueProcessOperation(execution);

} }


修改完当前节点后,CommandInvoker会第三次执行ContinueProcessOperation操作,这次当前节点是

FlowNode,因此会执行continueThroughFlowNode()方法。相关代码如下(省略部分代码):

public class ContinueProcessOperation extends AbstractOperation { @Override

public void run() {
 //获取当前节点(注意,当前节点是“申请”节点)
 FlowElement currentFlowElement = getCurrentFlowElement(execution);

...
 continueThroughFlowNode((FlowNode) currentFlowElement); ...

}

//通过流节点
 protected void continueThroughFlowNode(FlowNode flowNode) {

... executeSynchronous(flowNode); ...

}

//同步执行
 protected void executeSynchronous(FlowNode flowNode) {

... //获取当前节点的behavior
    ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior();
        if (activityBehavior != null) {
executeActivityBehavior(activityBehavior, flowNode); }
... }
//执行behavior
protected void executeActivityBehavior(ActivityBehavior activityBehavior,
FlowNode flowNode) {
...
//调用behavior的execute()方法 
  activityBehavior.execute(execution);
  ...
} }

由于当前节点是用户节点,对应的behavior是UserTaskActivityBehavior,所以会调用该behavior的execute() 方法创建相应的用户任务。

此时,流程从开始节点流转到了“申请”用户节点.

网关控制

当通过“审批”用户节点后,流程会继续往下流转,这时会经过一个排他网关,Activiti如何处理网关节点呢?

我们将从调用Taskservice类的complete()方法完成“审批”用户任务这个动作入口开始,了解Activiti流转到网关并执行网关处理相关逻辑的过程:

public class TaskServiceImpl extends ServiceImpl implements TaskService{ public void complete(String taskId, Map<String, Object> variables) {

commandExecutor.execute(new CompleteTaskCmd(taskId, variables)); }

}

接下来,让我们进入CompleteTaskCmd源码,看一下“审批”任务完成时,流程是如何流转到网关节点 的(省略部分代码):

public class CompleteTaskCmd extends AbstractCompleteTaskCmd {

protected Void execute(CommandContext commandContext, TaskEntity task) { 
...
executeTaskComplete(commandContext, task, variables, localScope);
       return null;
    }
  //执行完成任务相关操作
protected void executeTaskComplete(CommandContext commandContext,
        TaskEntity taskEntity,
        Map<String, Object> variables,
        boolean localScope) {
        ...
        if (taskEntity.getExecutionId() != null) {
ExecutionEntity executionEntity = commandContext.getExecutionEntityManager() .findById(taskEntity.getExecutionId());
//将触发执行实例操作放入执行计划
            Context.getAgenda().planTriggerExecutionOperation(executionEntity);
} }
}

CompleteTaskCmd在任务完成后,将TriggerExecutionOperation放入执行计划,通过TriggerExecutionOperation 来触发执行实例继续流转(省略部分代码):

public class TriggerExecutionOperation extends AbstractOperation {
@Override
public void run() {
//获取当前节点(注意,当前节点是“审批”用户节点)
FlowElement currentFlowElement = getCurrentFlowElement(execution); if (currentFlowElement instanceof FlowNode) {
ActivityBehavior activityBehavior =
(ActivityBehavior) ((FlowNode) currentFlowElement).getBehavior();
            if (activityBehavior instanceof TriggerableActivityBehavior) {
                ...
//调用activityBehavior的trigger()方法
((TriggerableActivityBehavior) activityBehavior).trigger(execution, null, null); ...
}
... }
... }
}

当前节点“审批”是用户任务(UserTask),对应的behavior是UserTaskActivityBehavior。因此,会调用 UserTaskActivityBehavior的trigger()方法(省略部分代码):

public class UserTaskActivityBehavior extends TaskActivityBehavior {
public void trigger(DelegateExecution execution, String signalName, Object signalData) { ...
//离开执行计划
leave(execution); }
}

此处省略了UserTaskActivityBehavior的具体操作,仅关注流程的流转。UserTaskActivityBehavior执行操作后,调用父类FlowNodeActivityBehavior的leave()方法(省略部分代码):

public abstract class FlowNodeActivityBehavior implements TriggerableActivityBehavior { public void leave(DelegateExecution execution) {
bpmnActivityBehavior.performDefaultOutgoingBehavior((ExecutionEntity) execution); }
}

以上代码段在执行leave()方法时,调用BpmnActivityBehavior的performDefaultOutgoingBehavior()方法, 将执行外出顺序流操作放入执行计划。

public class BpmnActivityBehavior implements Serializable {
public void performDefaultOutgoingBehavior(ExecutionEntity activityExecution) {
performOutgoingBehavior(activityExecution, true, false); }
protected void performOutgoingBehavior(ExecutionEntity execution, boolean checkConditions, boolean throwExceptionIfExecutionStuck) {
  //将执行外出顺序流操作放入执行计划 
	Context.getAgenda().planTakeOutgoingSequenceFlowsOperation(execution, true);
	}
}

当CommandInvoker执行TakeOutgoingSequenceFlowsOperation时,当前节点还是“审批”用户节点,因此会执行leaveFlowNode()方法,将当前节点流转为“审批”用户节点之后的外出顺序流3(省略部分代码):

public class TakeOutgoingSequenceFlowsOperation extends AbstractOperation { @Override
public void run() {
//获取当前节点(注意,当前节点还是“审批”用户节点)
FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
if (currentFlowElement instanceof FlowNode) {
handleFlowNode((FlowNode) currentFlowElement);
} else if (currentFlowElement instanceof SequenceFlow) {
            handleSequenceFlow();
        }
}
//处理流节点
protected void handleFlowNode(FlowNode flowNode) {
        handleActivityEnd(flowNode);
        if (flowNode.getParentContainer() != null
            && flowNode.getParentContainer() instanceof AdhocSubProcess) {
            handleAdhocSubProcess(flowNode);
        } else {
leaveFlowNode(flowNode); }
}
//离开流节点
protected void leaveFlowNode(FlowNode flowNode) {
...//获取“审批”用户节点外出顺序流
SequenceFlow sequenceFlow = outgoingSequenceFlows.get(0); execution.setCurrentFlowElement(sequenceFlow); execution.setActive(true);
  //此时执行实例的当前节点变更为顺序流3 
  outgoingExecutions.add((ExecutionEntity) execution);
...
// 将继续流程操作放入执行计划
for (ExecutionEntity outgoingExecution : outgoingExecutions) {
            Context.getAgenda().planContinueProcessOperation(outgoingExecution);
} }
}

TakeOutgoingSequenceFlowsOperation变更当前节点为“顺序流3”后,将继续流程操作放入执行计划链 表,CommandInvoker会调用ContinueProcessOperation的run()方法流转流程。当前节点是顺序流,所以会调用 continueThroughSequenceFlow()方法(省略部分代码):

public class ContinueProcessOperation extends AbstractOperation { @Override
public void run() {
//获取当前节点(注意,当前节点是顺序流3)
FlowElement currentFlowElement = getCurrentFlowElement(execution);
...
continueThroughSequenceFlow((SequenceFlow) currentFlowElement); ...
}
//通过顺序流
protected void continueThroughSequenceFlow(SequenceFlow sequenceFlow) {
...
//获取顺序流的目标节点,并将目标节点设置为执行实例的当前节点
FlowElement targetFlowElement = sequenceFlow.getTargetFlowElement(); execution.setCurrentFlowElement(targetFlowElement);
  //将执行实例放入执行计划
  Context.getAgenda().planContinueProcessOperation(execution);
} }

上面这段代码已将流程的当前节点变更为“网关”节点,并再次将执行实例放入执行计划,因此, CommandInvoker会再次调用ContinueProcessOperation的run()方法流转流程。当前节点是网关节点,所以执行 continueThroughFlowNode()方法(省略部分代码):

public class ContinueProcessOperation extends AbstractOperation { @Override
public void run() {
//获取当前节点(注意,当前节点是网关)
FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
continueThroughFlowNode((FlowNode) currentFlowElement);
...
}
//通过流节点
protected void continueThroughFlowNode(FlowNode flowNode) {
... executeSynchronous(flowNode); ...
}
//同步执行
protected void executeSynchronous(FlowNode flowNode) {
...
//获取当前节点的behavior
ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior(); if (activityBehavior != null) {
executeActivityBehavior(activityBehavior, flowNode); }
... }
//执行behavior
protected void executeActivityBehavior(ActivityBehavior activityBehavior, FlowNode flowNode) {
...
//调用behavior的execute()方法 activityBehavior.execute(execution); ...
} }

当前节点是排他网关节点,对应的behavior是ExclusiveGatewayActivityBehavior,接下来正式进入排他网 关的behavior执行代码。代码如下(省略部分代码):

public class ExclusiveGatewayActivityBehavior extends GatewayActivityBehavior {
//排他网关的execute()方法,继承父类的execute()方法,默认执行leave()操作
@Override
public void leave(DelegateExecution execution) {
        ExclusiveGateway exclusiveGateway =
            (ExclusiveGateway) execution.getCurrentFlowElement();
...
        SequenceFlow outgoingSequenceFlow = null;
        SequenceFlow defaultSequenceFlow = null;
        String defaultSequenceFlowId = exclusiveGateway.getDefaultFlow();
//判断要流转哪条外出顺序流
Iterator<SequenceFlow> sequenceFlowIterator =
exclusiveGateway.getOutgoingFlows().iterator();
while (outgoingSequenceFlow == null && sequenceFlowIterator.hasNext()) {
SequenceFlow sequenceFlow = sequenceFlowIterator.next();
  String skipExpressionString = sequenceFlow.getSkipExpression();
            if (!SkipExpressionUtil
.isSkipExpressionEnabled(execution, skipExpressionString)) { //判断分支条件是否为true
boolean conditionEvaluatesToTrue =
                    ConditionUtil.hasTrueCondition(sequenceFlow, execution);
                if (conditionEvaluatesToTrue && (defaultSequenceFlowId == null ||
!defaultSequenceFlowId.equals(sequenceFlow.getId()))) {
                    outgoingSequenceFlow = sequenceFlow;
                }
            } else if (SkipExpressionUtil
                .shouldSkipFlowElement(Context.getCommandContext(),
                execution, skipExpressionString)) {
                outgoingSequenceFlow = sequenceFlow;
}
//查找默认外出顺序流
if (defaultSequenceFlowId != null
                && defaultSequenceFlowId.equals(sequenceFlow.getId())) {
                defaultSequenceFlow = sequenceFlow;
            }
}
//记录网关节点的结束时间 Context.getCommandContext().getHistoryManager()
            .recordActivityEnd((ExecutionEntity) execution, null);
//离开网关
if (outgoingSequenceFlow != null) {
//将找到的符合条件的外出顺序流设置为当前节点
            execution.setCurrentFlowElement(outgoingSequenceFlow);
        } else {
if (defaultSequenceFlow != null) { //将默认外出顺序流设置为当前节点 execution.setCurrentFlowElement(defaultSequenceFlow);
} else {
//没有找到可以执行的外出顺序流,报错
throw new NoOutgoingException("No outgoing sequence flow of the exclusive gateway
'" + exclusiveGateway.getId() + "' could be selected for continuing the process"); }
} //当前节点变更为网关外出顺序流 //离开执行计划 super.leave(execution);
} }
public abstract class FlowNodeActivityBehavior implements TriggerableActivityBehavior {
public void execute(DelegateExecution execution) { leave(execution);
}
public void leave(DelegateExecution execution) { bpmnActivityBehavior.performDefaultOutgoingBehavior(
            (ExecutionEntity) execution);
} }
public class BpmnActivityBehavior implements Serializable {
public void performDefaultOutgoingBehavior(ExecutionEntity activityExecution) { performOutgoingBehavior(activityExecution, true, false);
}
protected void performOutgoingBehavior(ExecutionEntity execution, boolean checkConditions, boolean throwExceptionIfExecutionStuck) {
  //将执行外出顺序流操作放入执行计划
Context.getAgenda().planTakeOutgoingSequenceFlowsOperation(execution, true);
} }

流程结束

首先,CommandInvoker会从执行计划中获取并执行TakeOutgoingSequenceFlowsOperation(省略部分代码):

public class TakeOutgoingSequenceFlowsOperation extends AbstractOperation { @Override

public void run() {
 FlowElement currentFlowElement = getCurrentFlowElement(execution); //获取当前节点(注意,当前节点是排他网关节点后的外出顺序流5)
 ...
 if (currentFlowElement instanceof FlowNode) {
     handleFlowNode((FlowNode) currentFlowElement);
    } else if (currentFlowElement instanceof SequenceFlow) {
   handleSequenceFlow(); }

}
 //处理流节点
 protected void handleSequenceFlow() { commandContext.getHistoryManager().recordActivityEnd(execution, null);

// 将继续流程操作放入执行计划
    Context.getAgenda().planContinueProcessOperation(execution);
} }

TakeOutgoingSequenceFlowsOperation再次将ContinueProcessOperation放入执行计划。CommandInvoker再次 调用ContinueProcessOperation的run()方法,由于当前节点是顺序流节点,所以调用continueThroughSequenceFlow() 方法,将顺序流5的“结束”目标节点变更为当前节点(省略部分代码):

public class ContinueProcessOperation extends AbstractOperation { @Override

public void run() { //获取当前节点(注意,当前节点是排他网关节点后的外出顺序流5)
 FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
 continueThroughSequenceFlow((SequenceFlow) currentFlowElement); ...

}

//通过顺序流
 protected void continueThroughSequenceFlow(SequenceFlow sequenceFlow) {

...
 //获取顺序流的目标节点,并将目标节点变更为执行实例的当前节点
 FlowElement targetFlowElement = sequenceFlow.getTargetFlowElement(); execution.setCurrentFlowElement(targetFlowElement); //将执行实例放入执行计划 Context.getAgenda().planContinueProcessOperation(execution);

} }

设置当前节点后,程序会再次执行ContinueProcessOperation的run()方法,此时当前节点变更为结束节点。

结束节点是FlowNode,因此会执行continueThroughFlowNode()方法(省略部分代码):

public class ContinueProcessOperation extends AbstractOperation { @Override

public void run() {
 //获取当前节点(注意,当前节点是结束节点)
 FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
 continueThroughFlowNode((FlowNode) currentFlowElement);
...
}
//通过流节点
protected void continueThroughFlowNode(FlowNode flowNode) {
... executeSynchronous(flowNode); ...
}
//同步执行
protected void executeSynchronous(FlowNode flowNode) { ...
//获取当前节点的behavior
ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior(); if (activityBehavior != null) {
executeActivityBehavior(activityBehavior, flowNode); }
... }
//执行behavior
protected void executeActivityBehavior(ActivityBehavior activityBehavior,
FlowNode flowNode) {
...
//调用behavior的execute()方法 
  activityBehavior.execute(execution);
  ...
}

当前节点是结束节点,对应的behavior是NoneEndEventActivityBehavior,因此接下来会调用NoneEnd EventActivityBehavior的execute()方法。 NoneEndEventActivityBehavior不做任何操作,只将TakeOutgoingSequenceFlowsOperation放入执行计划。 当CommandInvoker再次执行TakeOutgoingSequenceFlowsOperation时,由于结束节点没有外出顺序流,所以 将结束流程操作EndExecutionOperation放入执行计划(省略部分代码):

public class TakeOutgoingSequenceFlowsOperation extends AbstractOperation { @Override
public void run() {
//获取当前节点(注意,当前节点是结束节点)
FlowElement currentFlowElement = getCurrentFlowElement(execution); ...
if (currentFlowElement instanceof FlowNode) {
handleFlowNode((FlowNode) currentFlowElement);
} else if (currentFlowElement instanceof SequenceFlow) {
            handleSequenceFlow();
        }
}
//处理流节点
protected void handleFlowNode(FlowNode flowNode) {
        handleActivityEnd(flowNode);
        if (flowNode.getParentContainer() != null
            && flowNode.getParentContainer() instanceof AdhocSubProcess) {
        handleAdhocSubProcess(flowNode);
        } else {
leaveFlowNode(flowNode); }
}
//离开流节点
protected void leaveFlowNode(FlowNode flowNode) {
...
//结束节点没有外出顺序流
if (outgoingSequenceFlows.size() == 0) {
if (flowNode.getOutgoingFlows() == null || flowNode.getOutgoingFlows().size() == 0) { //将结束执行实例操作放入执行计划 Context.getAgenda().planEndExecutionOperation(execution);
} ...
} ...
} }

最后,CommandInvoker执行EndExecutionOperation操作结束流程的执行实例时,还会执行流程结束监听 器,业务上可以在流程结束监听器中处理与流程结束相关的业务操作,如给申请人发通知告知流程已完成等 (省略部分代码):

public class EndExecutionOperation extends AbstractOperation { @Override
     public void run() {
         if (execution.isProcessInstanceType()) {
handleProcessInstanceExecution(execution); } else {
             handleRegularExecution();
         }
}
//处理流程实例的执行实例
protected void handleProcessInstanceExecution(
         ExecutionEntity processInstanceExecution) {
         ExecutionEntityManager executionEntityManager =
             commandContext.getExecutionEntityManager();
String processInstanceId = processInstanceExecution.getId();
ExecutionEntity superExecution = processInstanceExecution.getSuperExecution(); SubProcessActivityBehavior subProcessActivityBehavior = null;
//存在父级执行实例
if (superExecution != null) {
...//执行父级执行实例当前节点的behavior的completing()方法 //获取流程实例当前活动的执行实例数量
int activeExecutions = getNumberOfActiveChildExecutionsForProcessInstance(
             executionEntityManager, processInstanceId);
         if (activeExecutions == 0) {
//删除流程实例的执行实例 executionEntityManager.deleteProcessInstanceExecutionEntity(
                 processInstanceId, execution.getCurrentFlowElement() != null ?
execution.getCurrentFlowElement().getId() : null, null, false, false); } else {
logger.debug("Active executions found. Process instance {} will not be ended.", processInstanceId);
}
         Process process = ProcessDefinitionUtil.getProcess(
             processInstanceExecution.getProcessDefinitionId());
//执行流程结束监听器
if (CollectionUtil.isNotEmpty(process.getExecutionListeners())) {
executeExecutionListeners(process, processInstanceExecution, ExecutionListener.EVENTNAME_END);
}
//存在父级执行实例
if (superExecution != null) {
...//执行父级执行实例当前节点的behavior的completed()方法 }
} }

乐观锁

Activiti基于数据库乐观锁机制解决并发问题。在Activiti数据表中每张表都存在一个字段REV_,该字段 用于实现乐观锁。下面以多线程同时操作ACT_RU_TASK表中的同一个Task为例,详细说明Activiti乐观锁的 实现过程。假设两个线程A和B同时对一个taskId进行complete操作,则有如下过程:

(1)A线程拿到Task相关数据,此时字段REV_的值为1。_

(2)B线程也拿到Task相关数据,此时字段REV_的值也为1。 _

_(3)A线程完成该任务,更新ID为taskId,并将REV_的值更新为2。

(4)这时B线程也完成该任务,但是更新时,已无法找到ID为taskId且REV_值为1的记录了。此时,数

据库update操作返回受影响的行数为0,这时Activiti会抛出一个异常ActivitiOptimisticLockingException。

(5)B线程执行的事务回滚。