NodeExecutor
Node 由存储了 Node 所处的位置等信息,在 Node Data 中,存储了
- Handle(处理 Node 的连接信息)、
- Body(处理 Node 中需要用户计算的信息)、
- Result(处理并存储 Node 需要的运算以及结果)、
- Message(Node 生命周期中发生的信息)
每个 Node Result 对应有一个执行函数 Executor,由 Result key 指定,在 NodeExecutorRegistry() 对象中注册
Node 的生命周期
一个 Node 从点击开始运行时开始创建其生命周期,到任务结束时销毁。
Node Result 中的事件处理函数
NodeExecutor 是所有 Executor 必须继承的基类,定义了 Executor 中的常用方法。
class NodeExecutor(ABC):
def __init__(self, node: WorkflowNode):
self.node = node
self.node_uuid = str(node.uuid)
@sync_to_async
def get_creator(self) -> User:
return self.node.workflow.creator
async def get_bohrium_access_key(self) -> str:
creator = await self.get_creator()
access_token = await filter_bohrium_access_token(creator)
if access_token is None:
raise ValueError("Bohrium access token is None")
return access_token
@sync_to_async
def get_workflow(self, node: WorkflowNode) -> Workflow:
return node.workflow
@sync_to_async # 必须使用 sync_to_async 装饰器,不能直接使用 async def
def get_workflow_uuid(self, node: WorkflowNode) -> str:
return str(node.workflow.uuid)
async def get_body_source(self, key: str) -> str | None:
body = await sync_to_async(self.node.node_data.body.get)(key=key)
return body.source
async def get_body_source_from_results(self, result: WorkflowNodeResult, key: str) -> str:
body = await sync_to_async(result.bodies.get)(key=key)
return body.source
async def generate_file_path(self) -> str:
workflow_uuid = await self.get_workflow_uuid(self.node)
return os.path.join(settings.WORKFLOW_ROOT, workflow_uuid, self.node_uuid)
async def create_dir_path(self) -> str:
dir_path = await self.generate_file_path()
os.makedirs(dir_path, exist_ok=True)
return dir_path
async def write(self, file_path: str, content: str) -> None:
with open(file_path, "w") as f:
f.write(content)
async def read(self, file_path: str) -> str:
with open(file_path, "r") as f:
return f.read()
async def save_result(self, result: WorkflowNodeResult) -> None:
await sync_to_async(result.save)()
@abstractmethod
async def execute(self, result: WorkflowNodeResult) -> str:
pass