Bus d’Événements Domaine¶
Le bus d’événements permet le découplage entre les bounded contexts via un système de publication/souscription double-mode (synchrone + asynchrone).
Module : src.share_kernel.infrastructure.event_bus
Fonctions de commodité¶
Publie un événement sur le bus global singleton.
from src.share_kernel.infrastructure.event_bus import publish_event from src.module.tenant.domain.events import TenantCreated publish_event(TenantCreated( tenant_id=str(tenant.id), tenant_slug=tenant.slug, schema_name=tenant.schema_name, ))
Raccourci pour
event_bus.subscribe_sync().
Raccourci pour
event_bus.subscribe_async().
Dispatch asynchrone¶
Quand des handlers asynchrones sont enregistrés, le bus procède ainsi :
L’événement est sérialisé via
event.to_dict()transaction.on_commit()planifie le dispatchSi Celery est disponible, l’événement est envoyé comme tâche Celery
Sinon, les handlers sont exécutés de manière synchrone (fallback)
publish_event(TenantCreated)
│
├── Sync handlers → exécution immédiate
│
└── Async handlers → transaction.on_commit()
│
├── Celery disponible → dispatch_async_event.delay()
│ │
│ └── Worker Celery → handler(event)
│
└── Celery indisponible → handler(event) [sync fallback]
Exemple complet¶
from src.share_kernel.infrastructure.event_bus import (
event_bus,
subscribe_sync,
subscribe_async,
publish_event,
)
from src.share_kernel.domain.domain_event import DomainEvent, register_event
from dataclasses import dataclass
# 1. Définir un événement
@register_event
@dataclass(frozen=True)
class InvoiceApproved(DomainEvent):
event_type = "invoice.approved"
invoice_id: str = ""
total: float = 0.0
# 2. Enregistrer un handler synchrone (même transaction)
def update_accounting_cache(event):
print(f"Cache mis à jour pour facture {event.invoice_id}")
subscribe_sync("invoice.approved", update_accounting_cache)
# 3. Enregistrer un handler asynchrone (après commit)
def send_approval_email(event):
print(f"Email envoyé pour facture {event.invoice_id}")
subscribe_async("invoice.approved", send_approval_email)
# 4. Publier l'événement
publish_event(InvoiceApproved(
invoice_id="inv-001",
total=1500.00,
tenant_id="acme-tenant-id",
))