运行逻辑

这篇文档旨在解释,当我们开始运行一个工作流后,实际发生了什么。

首先,程序会将你的 Workflow 实例化为一个 Workflow Execute 对象:

class WorkflowExecuter:

    def __init__(self, workflow_instance: Workflow):
        self.node_executor_registry = NodeExecutorRegistry()
        self.workflow_instance = workflow_instance
        self.node_dependencies: dict = {}

    ...

接下来,开始执行该对象的 execute 函数,这个函数完成两件事:

class WorkflowExecuter:

    ...

    async def execute(self) -> bool:
        try:
            # 构建这个节点及其所有子节点的依赖关系
            nodes = await self.get_nodes()
            for node in nodes:
                connected_results = await filter_target_handles(node, connected=True)
                if all(connected_results):
                    await self.build_node_dependencies_async(node)

            no_dependency_nodes = [node for node, dependencies in self.node_dependencies.items()]
            await asyncio.gather(*(self.execute_node(node) for node in no_dependency_nodes))

            return True

        except Exception as e:
            return False

        finally:
            await sync_to_async(self.workflow_instance.save)()
    
    ...

Step 1: 建立节点运行依赖关系

程序首先获取所有 target handle 已被连接的 Node,构建这类 Node 及其子 Node 的依赖关系字典。

例如对于一个一般 VASP 工作流,将有 INCAR, POSCAR, KPOINTS 及 VASP 求解器 4 个计算节点,VASP 求解器节点将接受其余三个节点的输入。 则形成的依赖关系为:

{
    "<WorkflowNode: VASP>": {
        "<WorkflowNode: INCAR>", 
        "<WorkflowNode: KPOINTS>", 
        "<WorkflowNode: POSCAR>"
        },
    "<WorkflowNode: KPOINTS>": set(),
    "<WorkflowNode: POSCAR>": set(),
    "<WorkflowNode: INCAR>": set(),
}

接下来第二步,程序将根据构建的依赖关系字典异步并行的开始执行所有 Node。

Step 2: 异步并行运行节点

class WorkflowExecuter:

    ...

    async def execute_node(self, node: WorkflowNode):
        
        ...

        try:
            ...

            if not any(connected_target_results) and not any(connected_source_results):
                node.status = "skipped"

            elif all(connected_target_results) or any(connected_source_results):
                results = await self.get_all_results(node)
                status = await self.execute_results_script(node, results)
                if "failed" in status:
                    node.status = "failed"
                else:
                    node.status = "success"

            else:
                node.status = "failed"

        except Exception as e:
            raise Exception(error_message)

运行一个节点时,经历以下步骤:

  1. 任务开始时,向客户端发送任务开始消息 -> running
  2. 提取 Node 中的所有 Handle,获取其连接状态,并根据连接状态决定是否需要执行节点中的 Compiles,
    1. 如果节点没有连接任意 handle,就将这个 Node status 标记为跳过 -> skipped
    2. 如果 Node 满足以下任一情况,就执行 node 中的所有 results -> running
      • source handle 中任意一个被连接;
      • target handle 全部被连接;
    3. 如果 Node 中有部分 Target handle 未连接 -> failed
  1. 当 Node 满足 2.2 中的条件后,获取 Node 中所有需要运行的内容 Compiles,并行执行这个节点中的所有 Compiles。
    • 当有任意一个 Compile 发生错误时,返回 -> failed
    • 全部完成时,返回 -> success
  1. 当有错误发生时,execute_node 函数将捕获错误,并将错误信息返回给客户端。

Was this page helpful?