NodeExecutor

Node 由存储了 Node 所处的位置等信息,在 Node Data 中,存储了

  • Handle(处理 Node 的连接信息)、
  • Body(处理 Node 中需要用户计算的信息)、
  • Result(处理并存储 Node 需要的运算以及结果)、
  • Message(Node 生命周期中发生的信息)

每个 Node Result 对应有一个执行函数 Executor,由 Result key 指定,在 NodeExecutorRegistry() 对象中注册

Node 的生命周期

一个 Node 从点击开始运行时开始创建其生命周期,到任务结束时销毁。

Node Result 中的事件处理函数

NodeExecutor 是所有 Executor 必须继承的基类,定义了 Executor 中的常用方法。

class NodeExecutor(ABC):

    def __init__(self, node: WorkflowNode):
        self.node = node
        self.node_uuid = str(node.uuid)

    @sync_to_async
    def get_creator(self) -> User:
        return self.node.workflow.creator

    async def get_bohrium_access_key(self) -> str:
        creator = await self.get_creator()
        access_token = await filter_bohrium_access_token(creator)
        if access_token is None:
            raise ValueError("Bohrium access token is None")
        return access_token

    @sync_to_async
    def get_workflow(self, node: WorkflowNode) -> Workflow:
        return node.workflow

    @sync_to_async  # 必须使用 sync_to_async 装饰器,不能直接使用 async def
    def get_workflow_uuid(self, node: WorkflowNode) -> str:
        return str(node.workflow.uuid)

    async def get_body_source(self, key: str) -> str | None:
        body = await sync_to_async(self.node.node_data.body.get)(key=key)
        return body.source

    async def get_body_source_from_results(self, result: WorkflowNodeResult, key: str) -> str:
        body = await sync_to_async(result.bodies.get)(key=key)
        return body.source

    async def generate_file_path(self) -> str:
        workflow_uuid = await self.get_workflow_uuid(self.node)
        return os.path.join(settings.WORKFLOW_ROOT, workflow_uuid, self.node_uuid)

    async def create_dir_path(self) -> str:
        dir_path = await self.generate_file_path()
        os.makedirs(dir_path, exist_ok=True)
        return dir_path

    async def write(self, file_path: str, content: str) -> None:
        with open(file_path, "w") as f:
            f.write(content)

    async def read(self, file_path: str) -> str:
        with open(file_path, "r") as f:
            return f.read()

    async def save_result(self, result: WorkflowNodeResult) -> None:
        await sync_to_async(result.save)()

    @abstractmethod
    async def execute(self, result: WorkflowNodeResult) -> str:
        pass

Was this page helpful?