Source code for src.module.authorization.infrastructure.decorators

"""ABAC decorator for views and service methods."""

from __future__ import annotations

from collections.abc import Callable
from functools import wraps
from typing import Any

from rest_framework.exceptions import PermissionDenied

from src.share_kernel.domain.exceptions import ApprovalRequiredError


[docs] def abac_required( action: str, resource_type: str, resource_id: str | None = None, ) -> Callable[..., Any]: """Decorator that enforces ABAC evaluation before executing a view or function. Usage:: @abac_required(action='export', resource_type='Report') def export_report(request, pk): ... If the PDP returns deny, raises PermissionDenied. If requires_approval, raises ApprovalRequiredError (HTTP 202). """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @wraps(func) def wrapper(request: Any, *args: Any, **kwargs: Any) -> Any: from src.module.authorization.infrastructure.permissions import evaluate_abac user = getattr(request, "user", None) if user is None or not getattr(user, "is_authenticated", False): raise PermissionDenied("Authentication required.") if getattr(user, "is_superuser", False): return func(request, *args, **kwargs) from src.share_kernel.settings import get_setting if not get_setting("ABAC_ENABLED"): return func(request, *args, **kwargs) # Determine resource_id from kwargs if not provided rid = resource_id or kwargs.get("pk") or kwargs.get("id") result = evaluate_abac( user=user, action=action, resource_type=resource_type, resource_id=str(rid) if rid else None, request=request, ) if result.decision == "deny": raise PermissionDenied(result.reason) if result.requires_approval: if not get_setting("APPROVAL_ENABLED"): return func(request, *args, **kwargs) from src.module.authorization.application.approval_service import approval_service from src.share_kernel.infrastructure.context import get_current_tenant tenant = get_current_tenant() approval_req = approval_service.create_approval_request( policy_id=result.matching_policy_id or "", policy_name=result.matching_policy_name or "", requester_id=str(user.id), tenant_id=str(tenant.id) if tenant else "", action=action, resource_type=resource_type, resource_id=str(rid) if rid else None, request_data={}, approval_config=result.approval_config or {}, ) raise ApprovalRequiredError( approval_request_id=str(approval_req.id), message=f"Action '{action}' on '{resource_type}' requires approval.", ) return func(request, *args, **kwargs) return wrapper return decorator