42 lines
1.0 KiB
Python
42 lines
1.0 KiB
Python
import asyncio
|
|
|
|
from config_manager.v2.health import HealthServer
|
|
|
|
|
|
def test_health_mapping_ok_to_200():
|
|
async def provider():
|
|
return {"status": "ok"}
|
|
|
|
async def scenario() -> None:
|
|
server = HealthServer(
|
|
host="127.0.0.1",
|
|
port=8000,
|
|
path="/health",
|
|
timeout=0.2,
|
|
health_provider=provider,
|
|
)
|
|
code, payload = await server._build_health_response()
|
|
assert code == 200
|
|
assert payload["status"] == "ok"
|
|
|
|
asyncio.run(scenario())
|
|
|
|
|
|
def test_health_mapping_unhealthy_to_503():
|
|
async def provider():
|
|
return {"status": "unhealthy", "detail": "worker failed"}
|
|
|
|
async def scenario() -> None:
|
|
server = HealthServer(
|
|
host="127.0.0.1",
|
|
port=8000,
|
|
path="/health",
|
|
timeout=0.2,
|
|
health_provider=provider,
|
|
)
|
|
code, payload = await server._build_health_response()
|
|
assert code == 503
|
|
assert payload["status"] == "unhealthy"
|
|
|
|
asyncio.run(scenario())
|