Source code for updo.celery

"""Celery application configuration for django-multitenant.

Auto-discovers tasks from all bounded contexts.
Consumer projects should import this in their celery.py or configure
autodiscover_tasks to include updo apps.
"""

from __future__ import annotations

import logging

logger = logging.getLogger(__name__)


[docs] def setup_celery_app(app_name: str = "updo") -> object | None: """Create and configure a Celery app for the library. Call this from your project's celery.py:: from updo.celery import setup_celery_app mt_celery = setup_celery_app() Or simply autodiscover tasks:: app.autodiscover_tasks([ 'src.share_kernel.infrastructure', 'src.module.tenant', 'src.module.authorization', ]) Returns: The configured Celery app, or None if Celery is not available. """ try: from celery import Celery app = Celery(app_name) app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks([ "src.share_kernel.infrastructure", "src.module.tenant", "src.module.authorization", ]) logger.info("Celery app configured for django-multitenant") return app except ImportError: logger.info("Celery not installed, async features disabled") return None