Source code for updo.celery
"""Celery application configuration for django-multitenant.
Auto-discovers tasks from all bounded contexts.
Consumer projects should import this in their celery.py or configure
autodiscover_tasks to include updo apps.
"""
from __future__ import annotations
import logging
logger = logging.getLogger(__name__)
[docs]
def setup_celery_app(app_name: str = "updo") -> object | None:
"""Create and configure a Celery app for the library.
Call this from your project's celery.py::
from updo.celery import setup_celery_app
mt_celery = setup_celery_app()
Or simply autodiscover tasks::
app.autodiscover_tasks([
'src.share_kernel.infrastructure',
'src.module.tenant',
'src.module.authorization',
])
Returns:
The configured Celery app, or None if Celery is not available.
"""
try:
from celery import Celery
app = Celery(app_name)
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks([
"src.share_kernel.infrastructure",
"src.module.tenant",
"src.module.authorization",
])
logger.info("Celery app configured for django-multitenant")
return app
except ImportError:
logger.info("Celery not installed, async features disabled")
return None