101 lines
2.8 KiB
Python
101 lines
2.8 KiB
Python
import asyncio
|
|
import json
|
|
|
|
from config_manager.v2.management import ManagementServer
|
|
|
|
|
|
def test_health_mapping_ok_to_200():
|
|
async def provider():
|
|
return {"status": "ok"}
|
|
|
|
async def scenario() -> None:
|
|
server = ManagementServer(
|
|
host="127.0.0.1",
|
|
port=8000,
|
|
timeout=0.2,
|
|
health_provider=provider,
|
|
)
|
|
code, payload = await server._build_health_response()
|
|
assert code == 200
|
|
assert payload["status"] == "ok"
|
|
|
|
asyncio.run(scenario())
|
|
|
|
|
|
def test_health_mapping_unhealthy_to_503():
|
|
async def provider():
|
|
return {"status": "unhealthy", "detail": "worker failed"}
|
|
|
|
async def scenario() -> None:
|
|
server = ManagementServer(
|
|
host="127.0.0.1",
|
|
port=8000,
|
|
timeout=0.2,
|
|
health_provider=provider,
|
|
)
|
|
code, payload = await server._build_health_response()
|
|
assert code == 503
|
|
assert payload["status"] == "unhealthy"
|
|
|
|
asyncio.run(scenario())
|
|
|
|
|
|
def test_action_routes_call_callbacks():
|
|
events: list[str] = []
|
|
|
|
async def provider():
|
|
return {"status": "ok"}
|
|
|
|
async def on_start() -> str:
|
|
events.append("start")
|
|
return "start accepted"
|
|
|
|
async def on_stop() -> str:
|
|
events.append("stop")
|
|
return "stop accepted"
|
|
|
|
async def request(port: int, path: str) -> tuple[int, dict[str, str]]:
|
|
reader, writer = await asyncio.open_connection("127.0.0.1", port)
|
|
writer.write(
|
|
f"GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n".encode("utf-8")
|
|
)
|
|
await writer.drain()
|
|
raw = await reader.read()
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
header, body = raw.split(b"\r\n\r\n", maxsplit=1)
|
|
status_code = int(header.split(b" ")[1])
|
|
payload = json.loads(body.decode("utf-8"))
|
|
return status_code, payload
|
|
|
|
async def scenario() -> None:
|
|
server = ManagementServer(
|
|
host="127.0.0.1",
|
|
port=0,
|
|
timeout=0.2,
|
|
health_provider=provider,
|
|
on_start=on_start,
|
|
on_stop=on_stop,
|
|
)
|
|
await server.start()
|
|
try:
|
|
port = server.port
|
|
assert port > 0
|
|
|
|
start_code, start_payload = await request(port, "/actions/start")
|
|
stop_code, stop_payload = await request(port, "/actions/stop")
|
|
finally:
|
|
await server.stop()
|
|
|
|
assert start_code == 200
|
|
assert start_payload["status"] == "ok"
|
|
assert start_payload["detail"] == "start accepted"
|
|
|
|
assert stop_code == 200
|
|
assert stop_payload["status"] == "ok"
|
|
assert stop_payload["detail"] == "stop accepted"
|
|
assert events == ["start", "stop"]
|
|
|
|
asyncio.run(scenario())
|