Source code for src.module.authorization.infrastructure.decorators
"""ABAC decorator for views and service methods."""
from __future__ import annotations
from collections.abc import Callable
from functools import wraps
from typing import Any
from rest_framework.exceptions import PermissionDenied
from src.share_kernel.domain.exceptions import ApprovalRequiredError
[docs]
def abac_required(
action: str,
resource_type: str,
resource_id: str | None = None,
) -> Callable[..., Any]:
"""Decorator that enforces ABAC evaluation before executing a view or function.
Usage::
@abac_required(action='export', resource_type='Report')
def export_report(request, pk):
...
If the PDP returns deny, raises PermissionDenied.
If requires_approval, raises ApprovalRequiredError (HTTP 202).
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
def wrapper(request: Any, *args: Any, **kwargs: Any) -> Any:
from src.module.authorization.infrastructure.permissions import evaluate_abac
user = getattr(request, "user", None)
if user is None or not getattr(user, "is_authenticated", False):
raise PermissionDenied("Authentication required.")
if getattr(user, "is_superuser", False):
return func(request, *args, **kwargs)
from src.share_kernel.settings import get_setting
if not get_setting("ABAC_ENABLED"):
return func(request, *args, **kwargs)
# Determine resource_id from kwargs if not provided
rid = resource_id or kwargs.get("pk") or kwargs.get("id")
result = evaluate_abac(
user=user,
action=action,
resource_type=resource_type,
resource_id=str(rid) if rid else None,
request=request,
)
if result.decision == "deny":
raise PermissionDenied(result.reason)
if result.requires_approval:
if not get_setting("APPROVAL_ENABLED"):
return func(request, *args, **kwargs)
from src.module.authorization.application.approval_service import approval_service
from src.share_kernel.infrastructure.context import get_current_tenant
tenant = get_current_tenant()
approval_req = approval_service.create_approval_request(
policy_id=result.matching_policy_id or "",
policy_name=result.matching_policy_name or "",
requester_id=str(user.id),
tenant_id=str(tenant.id) if tenant else "",
action=action,
resource_type=resource_type,
resource_id=str(rid) if rid else None,
request_data={},
approval_config=result.approval_config or {},
)
raise ApprovalRequiredError(
approval_request_id=str(approval_req.id),
message=f"Action '{action}' on '{resource_type}' requires approval.",
)
return func(request, *args, **kwargs)
return wrapper
return decorator