Utilitaires Celery

Le module Celery fournit une base de tâche tenant-aware et la détection automatique de disponibilité. Celery est optionnel — le système fonctionne sans lui (fallback synchrone).

Module : src.share_kernel.infrastructure.celery_utils

Détection de disponibilité

src.share_kernel.infrastructure.celery_utils.celery_is_available() bool

Vérifie si Celery est disponible et configuré. Le résultat est mis en cache.

Respect du paramètre MULTITENANT['ASYNC_BACKEND'] :

  • 'sync' : retourne toujours False

  • 'celery' : retourne True ou lève ConfigurationError

  • 'auto' : auto-détecte, retourne False si Celery n’est pas importable

src.share_kernel.infrastructure.celery_utils.reset_celery_detection() None

Réinitialise le cache de détection. Pour les tests uniquement.

Tâche Celery Tenant-Aware

src.share_kernel.infrastructure.celery_utils.get_tenant_aware_task_base() type | None

Retourne la classe de base TenantAwareTask si Celery est disponible.

TenantAwareTask propage automatiquement le contexte tenant :

  1. Côté producteur (apply_async) : injecte tenant_id dans les headers de tâche

  2. Côté worker (__call__) : restaure le contexte tenant depuis les headers

from src.share_kernel.infrastructure.celery_utils import get_tenant_aware_task_base

TenantAwareTask = get_tenant_aware_task_base()

if TenantAwareTask:
    @app.task(base=TenantAwareTask)
    def process_invoice(invoice_id):
        # Le contexte tenant est automatiquement restauré
        tenant = get_current_tenant()
        invoice = Invoice.objects.get(id=invoice_id)
        ...

Dispatch d’événements

src.share_kernel.infrastructure.celery_utils.celery_dispatch_event(event_type: str, event_data: dict) bool

Dispatche un événement domaine comme tâche Celery si disponible.

Returns:

True si dispatché via Celery, False si fallback nécessaire.

Utilisé internement par le EventBus pour le dispatch asynchrone.

Exécution avec fallback

src.share_kernel.infrastructure.celery_utils.run_async_or_sync(task_func, *args, **kwargs) Any

Exécute une tâche Celery de manière asynchrone si possible, sinon de manière synchrone.

Tous les appels ``.delay()`` devraient passer par cette fonction pour un comportement de fallback cohérent.

from src.share_kernel.infrastructure.celery_utils import run_async_or_sync
from src.module.tenant.tasks import provision_tenant_schema

# Async si Celery disponible, sync sinon
run_async_or_sync(provision_tenant_schema, tenant_slug="acme-corp")

Gère les erreurs de dispatch (OperationalError, broker down) en exécutant la tâche de manière synchrone comme fallback.

Configuration Celery

Module : updo.celery

updo.celery.setup_celery_app(app_name: str = 'updo') Celery | None[source]

Crée et configure l’application Celery.

Auto-découvre les tâches dans :

  • src.share_kernel.infrastructure

  • src.module.tenant

  • src.module.authorization

Returns:

L’app Celery configurée, ou None si Celery n’est pas installé.

# updo/celery.py — déjà configuré
from updo.celery import setup_celery_app
app = setup_celery_app()

Tâches Celery de l’infrastructure

Module : src.share_kernel.infrastructure.tasks

src.share_kernel.infrastructure.tasks.dispatch_async_event(event_type: str, event_data: dict) None

Désérialise et dispatche un événement domaine aux handlers asynchrones. Enregistrée comme tâche Celery shared_task si Celery est disponible.

src.share_kernel.infrastructure.tasks.process_audit_log(event_data: dict) None

Persiste un événement d’audit sans bloquer la requête HTTP. Enregistrée comme tâche Celery si disponible.