Source code for src.share_kernel.domain.domain_event

"""Base domain event infrastructure.

Domain events are immutable records of significant state changes. They carry all
necessary data (no mutable object references) and are JSON-serializable for
transit via Celery/Redis.
"""

from __future__ import annotations

import uuid
from dataclasses import asdict, dataclass, field
from datetime import UTC, datetime
from typing import Any, ClassVar


[docs] @dataclass(frozen=True) class DomainEvent: """Base class for all domain events. Every event carries: - event_id: unique identifier for idempotency - correlation_id: links the originating HTTP request to downstream Celery tasks - occurred_at: UTC timestamp of when the event was emitted - tenant_id: tenant context (if applicable) """ event_id: str = field(default_factory=lambda: str(uuid.uuid4())) correlation_id: str = field(default_factory=lambda: str(uuid.uuid4())) occurred_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) tenant_id: str | None = None # Subclasses set this to a human-readable event name event_type: ClassVar[str] = "domain_event"
[docs] def to_dict(self) -> dict[str, Any]: """Serialize the event to a JSON-compatible dictionary.""" data = asdict(self) data["event_type"] = self.__class__.event_type return data
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> DomainEvent: """Deserialize an event from a dictionary. Looks up the registered event class by event_type and instantiates it. """ event_type = data.pop("event_type", None) event_cls = _EVENT_REGISTRY.get(event_type, cls) # Filter data to only include fields the target class expects valid_fields = {f.name for f in event_cls.__dataclass_fields__.values()} filtered = {k: v for k, v in data.items() if k in valid_fields} return event_cls(**filtered)
# Global registry mapping event_type strings to event classes _EVENT_REGISTRY: dict[str, type[DomainEvent]] = {}
[docs] def register_event(event_cls: type[DomainEvent]) -> type[DomainEvent]: """Class decorator to register a domain event in the global registry.""" _EVENT_REGISTRY[event_cls.event_type] = event_cls return event_cls
[docs] def get_event_class(event_type: str) -> type[DomainEvent] | None: """Look up a registered event class by its event_type string.""" return _EVENT_REGISTRY.get(event_type)