Переделал воркфлоу

This commit is contained in:
2026-03-06 23:42:14 +03:00
parent 4fa2b5ad5d
commit 73ac540a5d
2 changed files with 21 additions and 15 deletions

View File

@@ -7,9 +7,8 @@ from app_runtime.workflow.contracts.step import WorkflowStep
@dataclass(slots=True)
class WorkflowNode:
name: str
step: WorkflowStep
transitions: dict[str, str] = field(default_factory=dict)
transitions: dict[str, str | None] = field(default_factory=dict)
@dataclass(slots=True)

View File

@@ -29,37 +29,44 @@ class WorkflowEngine:
current_name = self._workflow.definition.start_at
while current_name is not None:
node = self._workflow.definition.nodes[current_name]
self._logger.info("Workflow run %s: step '%s' started.", run_id, node.name)
self._hooks.on_step_started(context, node.name)
self._persistence.start_step(run_id, node.name, context.snapshot())
self._traces.step(node.name)
self._traces.info(f"Step '{node.name}' started.", status="started")
context.state["runtime"]["current_node"] = current_name
self._logger.info("Workflow run %s: step '%s' started.", run_id, current_name)
self._hooks.on_step_started(context, current_name)
self._persistence.start_step(run_id, current_name, context.snapshot())
self._traces.step(current_name)
self._traces.info(f"Step '{current_name}' started.", status="started")
try:
result = node.step.run(context)
except Exception as error:
self._persistence.fail_step(run_id, node.name, context.snapshot(), error)
self._persistence.fail_step(run_id, current_name, context.snapshot(), error)
self._persistence.fail_run(run_id, context.snapshot())
self._traces.error(
f"Step '{node.name}' failed: {error}",
f"Step '{current_name}' failed: {error}",
status="failed",
attrs={"exception_type": type(error).__name__},
)
self._logger.exception("Workflow run %s: step '%s' failed.", run_id, node.name)
self._hooks.on_step_failed(context, node.name)
self._logger.exception("Workflow run %s: step '%s' failed.", run_id, current_name)
self._hooks.on_step_failed(context, current_name)
raise
context.state.update(result.updates)
self._persistence.complete_step(run_id, node.name, result.status, result.transition, context.snapshot())
self._persistence.complete_step(
run_id,
current_name,
result.status,
result.transition,
context.snapshot(),
)
self._traces.info(
f"Step '{node.name}' completed with transition '{result.transition}'.",
f"Step '{current_name}' completed with transition '{result.transition}'.",
status=result.status,
)
self._logger.info(
"Workflow run %s: step '%s' completed with transition '%s'.",
run_id,
node.name,
current_name,
result.transition,
)
self._hooks.on_step_finished(context, node.name)
self._hooks.on_step_finished(context, current_name)
current_name = self._transition_resolver.resolve(node, result)
self._persistence.complete_run(run_id, context.snapshot())
self._traces.step("workflow")