diff --git a/src/app_runtime/workflow/contracts/workflow.py b/src/app_runtime/workflow/contracts/workflow.py index 92e2b58..19340c0 100644 --- a/src/app_runtime/workflow/contracts/workflow.py +++ b/src/app_runtime/workflow/contracts/workflow.py @@ -7,9 +7,8 @@ from app_runtime.workflow.contracts.step import WorkflowStep @dataclass(slots=True) class WorkflowNode: - name: str step: WorkflowStep - transitions: dict[str, str] = field(default_factory=dict) + transitions: dict[str, str | None] = field(default_factory=dict) @dataclass(slots=True) diff --git a/src/app_runtime/workflow/engine/workflow_engine.py b/src/app_runtime/workflow/engine/workflow_engine.py index 5383f74..8a6e82e 100644 --- a/src/app_runtime/workflow/engine/workflow_engine.py +++ b/src/app_runtime/workflow/engine/workflow_engine.py @@ -29,37 +29,44 @@ class WorkflowEngine: current_name = self._workflow.definition.start_at while current_name is not None: node = self._workflow.definition.nodes[current_name] - self._logger.info("Workflow run %s: step '%s' started.", run_id, node.name) - self._hooks.on_step_started(context, node.name) - self._persistence.start_step(run_id, node.name, context.snapshot()) - self._traces.step(node.name) - self._traces.info(f"Step '{node.name}' started.", status="started") + context.state["runtime"]["current_node"] = current_name + self._logger.info("Workflow run %s: step '%s' started.", run_id, current_name) + self._hooks.on_step_started(context, current_name) + self._persistence.start_step(run_id, current_name, context.snapshot()) + self._traces.step(current_name) + self._traces.info(f"Step '{current_name}' started.", status="started") try: result = node.step.run(context) except Exception as error: - self._persistence.fail_step(run_id, node.name, context.snapshot(), error) + self._persistence.fail_step(run_id, current_name, context.snapshot(), error) self._persistence.fail_run(run_id, context.snapshot()) self._traces.error( - f"Step '{node.name}' failed: {error}", + f"Step '{current_name}' failed: {error}", status="failed", attrs={"exception_type": type(error).__name__}, ) - self._logger.exception("Workflow run %s: step '%s' failed.", run_id, node.name) - self._hooks.on_step_failed(context, node.name) + self._logger.exception("Workflow run %s: step '%s' failed.", run_id, current_name) + self._hooks.on_step_failed(context, current_name) raise context.state.update(result.updates) - self._persistence.complete_step(run_id, node.name, result.status, result.transition, context.snapshot()) + self._persistence.complete_step( + run_id, + current_name, + result.status, + result.transition, + context.snapshot(), + ) self._traces.info( - f"Step '{node.name}' completed with transition '{result.transition}'.", + f"Step '{current_name}' completed with transition '{result.transition}'.", status=result.status, ) self._logger.info( "Workflow run %s: step '%s' completed with transition '%s'.", run_id, - node.name, + current_name, result.transition, ) - self._hooks.on_step_finished(context, node.name) + self._hooks.on_step_finished(context, current_name) current_name = self._transition_resolver.resolve(node, result) self._persistence.complete_run(run_id, context.snapshot()) self._traces.step("workflow")