Bus d’Événements Domaine

Le bus d’événements permet le découplage entre les bounded contexts via un système de publication/souscription double-mode (synchrone + asynchrone).

Module : src.share_kernel.infrastructure.event_bus

EventBus

class src.share_kernel.infrastructure.event_bus.EventBus

Bus d’événements domaine supportant les handlers synchrones et asynchrones.

  • Handlers synchrones : exécutés immédiatement dans la même transaction. Utilisés pour les invariants transactionnels (ex: mise à jour de cache, validation cross-agrégat).

  • Handlers asynchrones : dispatchés via transaction.on_commit() + Celery. Utilisés pour les effets secondaires non-critiques (notifications, audit).

subscribe_sync(event_type: str, handler: EventHandler) None

Enregistre un handler synchrone pour un type d’événement.

Parameters:
  • event_type – Le event_type de l’événement (ex: "tenant.created")

  • handler – Callable acceptant un DomainEvent

subscribe_async(event_type: str, handler: EventHandler) None

Enregistre un handler asynchrone pour un type d’événement.

Parameters:
  • event_type – Le event_type de l’événement

  • handler – Callable acceptant un DomainEvent

publish(event: DomainEvent) None

Publie un événement domaine à tous les handlers enregistrés.

  1. Les handlers synchrones s’exécutent immédiatement. Si un handler lève une exception, elle est propagée.

  2. Les handlers asynchrones sont planifiés via transaction.on_commit() pour s’assurer qu’ils ne se déclenchent qu’après le commit de la transaction.

clear() None

Supprime tous les handlers enregistrés. Pour les tests uniquement.

Fonctions de commodité

src.share_kernel.infrastructure.event_bus.publish_event(event: DomainEvent) None

Publie un événement sur le bus global singleton.

from src.share_kernel.infrastructure.event_bus import publish_event
from src.module.tenant.domain.events import TenantCreated

publish_event(TenantCreated(
    tenant_id=str(tenant.id),
    tenant_slug=tenant.slug,
    schema_name=tenant.schema_name,
))
src.share_kernel.infrastructure.event_bus.subscribe_sync(event_type: str, handler: EventHandler) None

Raccourci pour event_bus.subscribe_sync().

src.share_kernel.infrastructure.event_bus.subscribe_async(event_type: str, handler: EventHandler) None

Raccourci pour event_bus.subscribe_async().

Dispatch asynchrone

Quand des handlers asynchrones sont enregistrés, le bus procède ainsi :

  1. L’événement est sérialisé via event.to_dict()

  2. transaction.on_commit() planifie le dispatch

  3. Si Celery est disponible, l’événement est envoyé comme tâche Celery

  4. Sinon, les handlers sont exécutés de manière synchrone (fallback)

publish_event(TenantCreated)
    │
    ├── Sync handlers → exécution immédiate
    │
    └── Async handlers → transaction.on_commit()
            │
            ├── Celery disponible → dispatch_async_event.delay()
            │                           │
            │                           └── Worker Celery → handler(event)
            │
            └── Celery indisponible → handler(event) [sync fallback]

Exemple complet

from src.share_kernel.infrastructure.event_bus import (
    event_bus,
    subscribe_sync,
    subscribe_async,
    publish_event,
)
from src.share_kernel.domain.domain_event import DomainEvent, register_event
from dataclasses import dataclass

# 1. Définir un événement
@register_event
@dataclass(frozen=True)
class InvoiceApproved(DomainEvent):
    event_type = "invoice.approved"
    invoice_id: str = ""
    total: float = 0.0

# 2. Enregistrer un handler synchrone (même transaction)
def update_accounting_cache(event):
    print(f"Cache mis à jour pour facture {event.invoice_id}")

subscribe_sync("invoice.approved", update_accounting_cache)

# 3. Enregistrer un handler asynchrone (après commit)
def send_approval_email(event):
    print(f"Email envoyé pour facture {event.invoice_id}")

subscribe_async("invoice.approved", send_approval_email)

# 4. Publier l'événement
publish_event(InvoiceApproved(
    invoice_id="inv-001",
    total=1500.00,
    tenant_id="acme-tenant-id",
))