============================== Bus d'Événements Domaine ============================== Le bus d'événements permet le découplage entre les bounded contexts via un système de publication/souscription double-mode (synchrone + asynchrone). Module : ``src.share_kernel.infrastructure.event_bus`` EventBus ======== .. module:: src.share_kernel.infrastructure.event_bus :synopsis: Bus d'événements domaine double-mode .. class:: EventBus Bus d'événements domaine supportant les handlers synchrones et asynchrones. - **Handlers synchrones** : exécutés immédiatement dans la même transaction. Utilisés pour les invariants transactionnels (ex: mise à jour de cache, validation cross-agrégat). - **Handlers asynchrones** : dispatchés via ``transaction.on_commit()`` + Celery. Utilisés pour les effets secondaires non-critiques (notifications, audit). .. method:: subscribe_sync(event_type: str, handler: EventHandler) -> None Enregistre un handler synchrone pour un type d'événement. :param event_type: Le ``event_type`` de l'événement (ex: ``"tenant.created"``) :param handler: Callable acceptant un ``DomainEvent`` .. method:: subscribe_async(event_type: str, handler: EventHandler) -> None Enregistre un handler asynchrone pour un type d'événement. :param event_type: Le ``event_type`` de l'événement :param handler: Callable acceptant un ``DomainEvent`` .. method:: publish(event: DomainEvent) -> None Publie un événement domaine à tous les handlers enregistrés. 1. Les handlers synchrones s'exécutent immédiatement. Si un handler lève une exception, elle est propagée. 2. Les handlers asynchrones sont planifiés via ``transaction.on_commit()`` pour s'assurer qu'ils ne se déclenchent qu'après le commit de la transaction. .. method:: clear() -> None Supprime tous les handlers enregistrés. **Pour les tests uniquement.** Fonctions de commodité ====================== .. function:: publish_event(event: DomainEvent) -> None Publie un événement sur le bus global singleton. .. code-block:: python from src.share_kernel.infrastructure.event_bus import publish_event from src.module.tenant.domain.events import TenantCreated publish_event(TenantCreated( tenant_id=str(tenant.id), tenant_slug=tenant.slug, schema_name=tenant.schema_name, )) .. function:: subscribe_sync(event_type: str, handler: EventHandler) -> None Raccourci pour ``event_bus.subscribe_sync()``. .. function:: subscribe_async(event_type: str, handler: EventHandler) -> None Raccourci pour ``event_bus.subscribe_async()``. Dispatch asynchrone =================== Quand des handlers asynchrones sont enregistrés, le bus procède ainsi : 1. L'événement est sérialisé via ``event.to_dict()`` 2. ``transaction.on_commit()`` planifie le dispatch 3. Si Celery est disponible, l'événement est envoyé comme tâche Celery 4. Sinon, les handlers sont exécutés de manière synchrone (fallback) .. code-block:: text publish_event(TenantCreated) │ ├── Sync handlers → exécution immédiate │ └── Async handlers → transaction.on_commit() │ ├── Celery disponible → dispatch_async_event.delay() │ │ │ └── Worker Celery → handler(event) │ └── Celery indisponible → handler(event) [sync fallback] Exemple complet =============== .. code-block:: python from src.share_kernel.infrastructure.event_bus import ( event_bus, subscribe_sync, subscribe_async, publish_event, ) from src.share_kernel.domain.domain_event import DomainEvent, register_event from dataclasses import dataclass # 1. Définir un événement @register_event @dataclass(frozen=True) class InvoiceApproved(DomainEvent): event_type = "invoice.approved" invoice_id: str = "" total: float = 0.0 # 2. Enregistrer un handler synchrone (même transaction) def update_accounting_cache(event): print(f"Cache mis à jour pour facture {event.invoice_id}") subscribe_sync("invoice.approved", update_accounting_cache) # 3. Enregistrer un handler asynchrone (après commit) def send_approval_email(event): print(f"Email envoyé pour facture {event.invoice_id}") subscribe_async("invoice.approved", send_approval_email) # 4. Publier l'événement publish_event(InvoiceApproved( invoice_id="inv-001", total=1500.00, tenant_id="acme-tenant-id", ))