From 73ac540a5d1b3e2458566b18e0f84bd517849532 Mon Sep 17 00:00:00 2001 From: zosimovaa Date: Fri, 6 Mar 2026 23:42:14 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9F=D0=B5=D1=80=D0=B5=D0=B4=D0=B5=D0=BB?= =?UTF-8?q?=D0=B0=D0=BB=20=D0=B2=D0=BE=D1=80=D0=BA=D1=84=D0=BB=D0=BE=D1=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../workflow/contracts/workflow.py | 3 +- .../workflow/engine/workflow_engine.py | 33 +++++++++++-------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/app_runtime/workflow/contracts/workflow.py b/src/app_runtime/workflow/contracts/workflow.py index 92e2b58..19340c0 100644 --- a/src/app_runtime/workflow/contracts/workflow.py +++ b/src/app_runtime/workflow/contracts/workflow.py @@ -7,9 +7,8 @@ from app_runtime.workflow.contracts.step import WorkflowStep @dataclass(slots=True) class WorkflowNode: - name: str step: WorkflowStep - transitions: dict[str, str] = field(default_factory=dict) + transitions: dict[str, str | None] = field(default_factory=dict) @dataclass(slots=True) diff --git a/src/app_runtime/workflow/engine/workflow_engine.py b/src/app_runtime/workflow/engine/workflow_engine.py index 5383f74..8a6e82e 100644 --- a/src/app_runtime/workflow/engine/workflow_engine.py +++ b/src/app_runtime/workflow/engine/workflow_engine.py @@ -29,37 +29,44 @@ class WorkflowEngine: current_name = self._workflow.definition.start_at while current_name is not None: node = self._workflow.definition.nodes[current_name] - self._logger.info("Workflow run %s: step '%s' started.", run_id, node.name) - self._hooks.on_step_started(context, node.name) - self._persistence.start_step(run_id, node.name, context.snapshot()) - self._traces.step(node.name) - self._traces.info(f"Step '{node.name}' started.", status="started") + context.state["runtime"]["current_node"] = current_name + self._logger.info("Workflow run %s: step '%s' started.", run_id, current_name) + self._hooks.on_step_started(context, current_name) + self._persistence.start_step(run_id, current_name, context.snapshot()) + self._traces.step(current_name) + self._traces.info(f"Step '{current_name}' started.", status="started") try: result = node.step.run(context) except Exception as error: - self._persistence.fail_step(run_id, node.name, context.snapshot(), error) + self._persistence.fail_step(run_id, current_name, context.snapshot(), error) self._persistence.fail_run(run_id, context.snapshot()) self._traces.error( - f"Step '{node.name}' failed: {error}", + f"Step '{current_name}' failed: {error}", status="failed", attrs={"exception_type": type(error).__name__}, ) - self._logger.exception("Workflow run %s: step '%s' failed.", run_id, node.name) - self._hooks.on_step_failed(context, node.name) + self._logger.exception("Workflow run %s: step '%s' failed.", run_id, current_name) + self._hooks.on_step_failed(context, current_name) raise context.state.update(result.updates) - self._persistence.complete_step(run_id, node.name, result.status, result.transition, context.snapshot()) + self._persistence.complete_step( + run_id, + current_name, + result.status, + result.transition, + context.snapshot(), + ) self._traces.info( - f"Step '{node.name}' completed with transition '{result.transition}'.", + f"Step '{current_name}' completed with transition '{result.transition}'.", status=result.status, ) self._logger.info( "Workflow run %s: step '%s' completed with transition '%s'.", run_id, - node.name, + current_name, result.transition, ) - self._hooks.on_step_finished(context, node.name) + self._hooks.on_step_finished(context, current_name) current_name = self._transition_resolver.resolve(node, result) self._persistence.complete_run(run_id, context.snapshot()) self._traces.step("workflow")