feat: add workflow snapshot codec registry and strict sanitization
This commit is contained in:
@@ -0,0 +1,24 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from app_runtime.workflow.persistence.entity_codec import EntityCodec
|
||||
|
||||
|
||||
class CodecRegistry:
|
||||
def __init__(self, codecs: list[EntityCodec] | None = None) -> None:
|
||||
self._codecs = list(codecs or [])
|
||||
self._by_type_id = {codec.type_id: codec for codec in self._codecs}
|
||||
|
||||
def register(self, codec: EntityCodec) -> None:
|
||||
self._codecs.append(codec)
|
||||
self._by_type_id[codec.type_id] = codec
|
||||
|
||||
def find_for_value(self, value: Any) -> EntityCodec | None:
|
||||
for codec in self._codecs:
|
||||
if codec.can_encode(value):
|
||||
return codec
|
||||
return None
|
||||
|
||||
def find_by_type_id(self, type_id: str) -> EntityCodec | None:
|
||||
return self._by_type_id.get(type_id)
|
||||
Reference in New Issue
Block a user