Source code for src.module.authorization.infrastructure.condition_engines.cedar_engine

"""Cedar condition engine — evaluates ABAC conditions via cedarpy.

Uses the Cedar policy language for expressive, composable authorization policies.
Install with: pip install cedarpy

Condition format (native Cedar)::

    conditions = {
        "cedar_policies": '''
            permit(
                principal,
                action == Action::"edit",
                resource
            )
            when { resource.owner == principal };
        ''',
        "cedar_entities": [  # optional extra entities
            {
                "uid": {"__entity": {"type": "Role", "id": "admin"}},
                "attrs": {},
                "parents": [],
            }
        ],
    }

Attributes mapping:
- subject_attributes → Principal entity (type "User", id from user_id)
- resource_attributes → Resource entity (type from resource_type or "Resource")
- environment_attributes → Context record
"""

from __future__ import annotations

import logging
from typing import Any

from src.share_kernel.domain.interfaces import ConditionEngine

logger = logging.getLogger(__name__)


[docs] class CedarConditionEngine(ConditionEngine): """Cedar-based condition evaluation engine. Delegates condition evaluation to the cedarpy library for production-grade policy evaluation using the Cedar policy language. """ def __init__(self) -> None: try: import cedarpy # noqa: F401 self._cedar_available = True except ImportError: logger.warning( "cedarpy is not installed. CedarConditionEngine will deny all requests. " "Install with: pip install cedarpy" ) self._cedar_available = False def evaluate(self, conditions: dict[str, Any], attributes: dict[str, Any]) -> bool: """Evaluate conditions using Cedar policy language. Args: conditions: Dict containing ``cedar_policies`` (str) with Cedar policy text, and optionally ``cedar_entities`` (list) with additional entity definitions. attributes: Flat dict of attribute values. Expected keys vary by context: - For subject conditions: user_id, email, roles, etc. - For resource conditions: resource_type, status, owner_id, etc. - For environment conditions: current_hour, ip_address, etc. """ if not self._cedar_available: logger.error("CedarConditionEngine: cedarpy not installed, denying request") return False if not conditions: return True policies = conditions.get("cedar_policies") if not policies or not isinstance(policies, str): logger.warning("CedarConditionEngine: no cedar_policies in conditions, denying") return False request = self._build_request(attributes) entities = self._build_entities(attributes, conditions.get("cedar_entities")) return self._authorize(policies, request, entities) def _build_request(self, attributes: dict[str, Any]) -> dict[str, Any]: """Build a Cedar authorization request from attributes.""" user_id = str(attributes.get("user_id", "anonymous")) action = str(attributes.get("action", "access")) resource_type = str(attributes.get("resource_type", "Resource")) resource_id = str(attributes.get("resource_id", "unknown")) # Build context from all attributes that are not used for entity refs reserved = {"user_id", "action", "resource_type", "resource_id"} context: dict[str, Any] = {} for key, value in attributes.items(): if key not in reserved: context[key] = self._cedar_safe_value(value) return { "principal": f'{resource_type}User::"{user_id}"' if resource_type != "Resource" else f'User::"{user_id}"', "action": f'Action::"{action}"', "resource": f'{resource_type}::"{resource_id}"', "context": context, } def _build_entities( self, attributes: dict[str, Any], extra_entities: list[dict[str, Any]] | None = None, ) -> list[dict[str, Any]]: """Build Cedar entity list from attributes and optional extras.""" entities: list[dict[str, Any]] = [] # Build principal entity user_id = str(attributes.get("user_id", "anonymous")) resource_type = str(attributes.get("resource_type", "Resource")) principal_type = f"{resource_type}User" if resource_type != "Resource" else "User" principal_attrs: dict[str, Any] = {} for key in ("email", "roles", "department", "membership_status", "is_superuser", "is_active"): if key in attributes: principal_attrs[key] = self._cedar_safe_value(attributes[key]) entities.append( { "uid": {"__entity": {"type": principal_type, "id": user_id}}, "attrs": principal_attrs, "parents": [], } ) # Build resource entity resource_id = str(attributes.get("resource_id", "unknown")) resource_attrs: dict[str, Any] = {} for key in ("status", "owner_id", "is_owner", "created_at", "updated_at"): if key in attributes: resource_attrs[key] = self._cedar_safe_value(attributes[key]) entities.append( { "uid": {"__entity": {"type": resource_type, "id": resource_id}}, "attrs": resource_attrs, "parents": [], } ) if extra_entities: entities.extend(extra_entities) return entities def _cedar_safe_value(self, value: Any) -> Any: """Convert a Python value to a Cedar-compatible value.""" if isinstance(value, (str, int, bool)): return value if isinstance(value, (list, tuple)): return [self._cedar_safe_value(v) for v in value] if isinstance(value, dict): return {k: self._cedar_safe_value(v) for k, v in value.items()} return str(value) def _authorize( self, policies: str, request: dict[str, Any], entities: list[dict[str, Any]], ) -> bool: """Run Cedar authorization.""" import cedarpy try: result = cedarpy.is_authorized(request, policies, entities) return result.allowed # type: ignore[no-any-return] except Exception: logger.exception("CedarConditionEngine: evaluation failed") return False def validate_conditions(self, conditions: dict[str, Any]) -> list[str]: """Validate Cedar conditions structure.""" errors: list[str] = [] if not isinstance(conditions, dict): errors.append("Conditions must be a dict") return errors policies = conditions.get("cedar_policies") if not policies: errors.append("Missing required key: cedar_policies") return errors if not isinstance(policies, str): errors.append("cedar_policies must be a string containing Cedar policy text") return errors if self._cedar_available: try: import cedarpy result = cedarpy.validate_policies(policies) if hasattr(result, "errors") and result.errors: for err in result.errors: errors.append(f"Cedar policy validation error: {err}") except Exception as exc: errors.append(f"Cedar policy parse error: {exc}") extra = conditions.get("cedar_entities") if extra is not None and not isinstance(extra, list): errors.append("cedar_entities must be a list of entity dicts") return errors