Files
plba/tests/test_workflow_repository.py

42 lines
1.3 KiB
Python

from app_runtime.workflow.persistence.workflow_repository import WorkflowRepository
class StubConnectionFactory:
@staticmethod
def dumps(snapshot) -> str:
return str(snapshot)
def test_build_run_payload_prefers_active_task_trace_id() -> None:
repository = WorkflowRepository(StubConnectionFactory())
snapshot = {
"payload": {"inbox_message": {"external_message_id": "msg-1", "id": 7}},
"state": {
"runtime": {
"trace_id": "task-trace-1",
"email_trace_id": "message-trace-1",
"queue_task_id": 13,
}
},
}
payload = repository._build_run_payload("estimate", snapshot)
assert payload[0] == "estimate"
assert payload[2] == "msg-1"
assert payload[3] == 13
assert payload[4] == 7
assert payload[8] == "task-trace-1"
def test_build_run_payload_falls_back_to_legacy_email_trace_id() -> None:
repository = WorkflowRepository(StubConnectionFactory())
snapshot = {
"payload": {"inbox_message": {"external_message_id": "msg-2", "id": 8}},
"state": {"runtime": {"email_trace_id": "message-trace-2"}},
}
payload = repository._build_run_payload("estimate", snapshot)
assert payload[8] == "message-trace-2"