Source code for src.share_kernel.domain.domain_event
"""Base domain event infrastructure.
Domain events are immutable records of significant state changes. They carry all
necessary data (no mutable object references) and are JSON-serializable for
transit via Celery/Redis.
"""
from __future__ import annotations
import uuid
from dataclasses import asdict, dataclass, field
from datetime import UTC, datetime
from typing import Any, ClassVar
[docs]
@dataclass(frozen=True)
class DomainEvent:
"""Base class for all domain events.
Every event carries:
- event_id: unique identifier for idempotency
- correlation_id: links the originating HTTP request to downstream Celery tasks
- occurred_at: UTC timestamp of when the event was emitted
- tenant_id: tenant context (if applicable)
"""
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
occurred_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
tenant_id: str | None = None
# Subclasses set this to a human-readable event name
event_type: ClassVar[str] = "domain_event"
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialize the event to a JSON-compatible dictionary."""
data = asdict(self)
data["event_type"] = self.__class__.event_type
return data
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> DomainEvent:
"""Deserialize an event from a dictionary.
Looks up the registered event class by event_type and instantiates it.
"""
event_type = data.pop("event_type", None)
event_cls = _EVENT_REGISTRY.get(event_type, cls)
# Filter data to only include fields the target class expects
valid_fields = {f.name for f in event_cls.__dataclass_fields__.values()}
filtered = {k: v for k, v in data.items() if k in valid_fields}
return event_cls(**filtered)
# Global registry mapping event_type strings to event classes
_EVENT_REGISTRY: dict[str, type[DomainEvent]] = {}
[docs]
def register_event(event_cls: type[DomainEvent]) -> type[DomainEvent]:
"""Class decorator to register a domain event in the global registry."""
_EVENT_REGISTRY[event_cls.event_type] = event_cls
return event_cls
[docs]
def get_event_class(event_type: str) -> type[DomainEvent] | None:
"""Look up a registered event class by its event_type string."""
return _EVENT_REGISTRY.get(event_type)