运行逻辑
这篇文档旨在解释,当我们开始运行一个工作流后,实际发生了什么。
TODO:!!! Source Handle 可以允许连接多个 Target Handle(共用数据源),但是 Target Handle 不允许连接多个 Source Handle(避免冲突)
首先,程序会将你的 Workflow 实例化为一个 Workflow Execute 对象:
class WorkflowExecuter:
def __init__(self, workflow_instance: Workflow):
self.node_executor_registry = NodeExecutorRegistry()
self.workflow_instance = workflow_instance
self.node_dependencies: dict = {}
...
接下来,开始执行该对象的 execute 函数,这个函数完成两件事:
class WorkflowExecuter:
...
async def execute(self) -> bool:
try:
# 构建这个节点及其所有子节点的依赖关系
nodes = await self.get_nodes()
for node in nodes:
connected_results = await filter_target_handles(node, connected=True)
if all(connected_results):
await self.build_node_dependencies_async(node)
no_dependency_nodes = [node for node, dependencies in self.node_dependencies.items()]
await asyncio.gather(*(self.execute_node(node) for node in no_dependency_nodes))
return True
except Exception as e:
return False
finally:
await sync_to_async(self.workflow_instance.save)()
...
Step 1: 建立节点运行依赖关系
程序首先获取所有 target handle 已被连接的 Node,构建这类 Node 及其子 Node 的依赖关系字典。
例如对于一个一般 VASP 工作流,将有 INCAR, POSCAR, KPOINTS 及 VASP 求解器 4 个计算节点,VASP 求解器节点将接受其余三个节点的输入。 则形成的依赖关系为:
{
"<WorkflowNode: VASP>": {
"<WorkflowNode: INCAR>",
"<WorkflowNode: KPOINTS>",
"<WorkflowNode: POSCAR>"
},
"<WorkflowNode: KPOINTS>": set(),
"<WorkflowNode: POSCAR>": set(),
"<WorkflowNode: INCAR>": set(),
}
在这个字典中,所有连接 VASP 求解器的三个子节点都作为了 VASP 求解关系, 剩下三个没有 target handle 的节点(即不需要获取其它节点数据的 Node 作为一个空 set() 对象存在在字典中)。
接下来第二步,程序将根据构建的依赖关系字典异步并行的开始执行所有 Node。
Step 2: 异步并行运行节点
class WorkflowExecuter:
...
async def execute_node(self, node: WorkflowNode):
...
try:
...
if not any(connected_target_results) and not any(connected_source_results):
node.status = "skipped"
elif all(connected_target_results) or any(connected_source_results):
results = await self.get_all_results(node)
status = await self.execute_results_script(node, results)
if "failed" in status:
node.status = "failed"
else:
node.status = "success"
else:
node.status = "failed"
except Exception as e:
raise Exception(error_message)
运行一个节点时,经历以下步骤:
- 任务开始时,向客户端发送任务开始消息 ->
running
- 提取 Node 中的所有 Handle,获取其连接状态,并根据连接状态决定是否需要执行节点中的 Compiles,
- 如果节点没有连接任意 handle,就将这个 Node status 标记为跳过 ->
skipped
- 如果 Node 满足以下任一情况,就执行 node 中的所有 results ->
running
- source handle 中任意一个被连接;
- target handle 全部被连接;
- 如果 Node 中有部分 Target handle 未连接 ->
failed
- 如果节点没有连接任意 handle,就将这个 Node status 标记为跳过 ->
对于一个 Node 而言,无论有没有 target handle,以及无论有 source handle 没有被连接,都需要被执行。
这个逻辑要求 Node 的所有 Target 都需要被连接。
- 当 Node 满足 2.2 中的条件后,获取 Node 中所有需要运行的内容
Compiles
,并行执行这个节点中的所有 Compiles。- 当有任意一个 Compile 发生错误时,返回 ->
failed
- 全部完成时,返回 ->
success
- 当有任意一个 Compile 发生错误时,返回 ->
这个逻辑要求 Node 中的所有 Compile 必须全部为可运行的。
- 当有错误发生时,execute_node 函数将捕获错误,并将错误信息返回给客户端。