"""Cedar condition engine — evaluates ABAC conditions via cedarpy.
Uses the Cedar policy language for expressive, composable authorization policies.
Install with: pip install cedarpy
Condition format (native Cedar)::
conditions = {
"cedar_policies": '''
permit(
principal,
action == Action::"edit",
resource
)
when { resource.owner == principal };
''',
"cedar_entities": [ # optional extra entities
{
"uid": {"__entity": {"type": "Role", "id": "admin"}},
"attrs": {},
"parents": [],
}
],
}
Attributes mapping:
- subject_attributes → Principal entity (type "User", id from user_id)
- resource_attributes → Resource entity (type from resource_type or "Resource")
- environment_attributes → Context record
"""
from __future__ import annotations
import logging
from typing import Any
from src.share_kernel.domain.interfaces import ConditionEngine
logger = logging.getLogger(__name__)
[docs]
class CedarConditionEngine(ConditionEngine):
"""Cedar-based condition evaluation engine.
Delegates condition evaluation to the cedarpy library for production-grade
policy evaluation using the Cedar policy language.
"""
def __init__(self) -> None:
try:
import cedarpy # noqa: F401
self._cedar_available = True
except ImportError:
logger.warning(
"cedarpy is not installed. CedarConditionEngine will deny all requests. "
"Install with: pip install cedarpy"
)
self._cedar_available = False
def evaluate(self, conditions: dict[str, Any], attributes: dict[str, Any]) -> bool:
"""Evaluate conditions using Cedar policy language.
Args:
conditions: Dict containing ``cedar_policies`` (str) with Cedar policy text,
and optionally ``cedar_entities`` (list) with additional entity definitions.
attributes: Flat dict of attribute values. Expected keys vary by context:
- For subject conditions: user_id, email, roles, etc.
- For resource conditions: resource_type, status, owner_id, etc.
- For environment conditions: current_hour, ip_address, etc.
"""
if not self._cedar_available:
logger.error("CedarConditionEngine: cedarpy not installed, denying request")
return False
if not conditions:
return True
policies = conditions.get("cedar_policies")
if not policies or not isinstance(policies, str):
logger.warning("CedarConditionEngine: no cedar_policies in conditions, denying")
return False
request = self._build_request(attributes)
entities = self._build_entities(attributes, conditions.get("cedar_entities"))
return self._authorize(policies, request, entities)
def _build_request(self, attributes: dict[str, Any]) -> dict[str, Any]:
"""Build a Cedar authorization request from attributes."""
user_id = str(attributes.get("user_id", "anonymous"))
action = str(attributes.get("action", "access"))
resource_type = str(attributes.get("resource_type", "Resource"))
resource_id = str(attributes.get("resource_id", "unknown"))
# Build context from all attributes that are not used for entity refs
reserved = {"user_id", "action", "resource_type", "resource_id"}
context: dict[str, Any] = {}
for key, value in attributes.items():
if key not in reserved:
context[key] = self._cedar_safe_value(value)
return {
"principal": f'{resource_type}User::"{user_id}"'
if resource_type != "Resource"
else f'User::"{user_id}"',
"action": f'Action::"{action}"',
"resource": f'{resource_type}::"{resource_id}"',
"context": context,
}
def _build_entities(
self,
attributes: dict[str, Any],
extra_entities: list[dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
"""Build Cedar entity list from attributes and optional extras."""
entities: list[dict[str, Any]] = []
# Build principal entity
user_id = str(attributes.get("user_id", "anonymous"))
resource_type = str(attributes.get("resource_type", "Resource"))
principal_type = f"{resource_type}User" if resource_type != "Resource" else "User"
principal_attrs: dict[str, Any] = {}
for key in ("email", "roles", "department", "membership_status", "is_superuser", "is_active"):
if key in attributes:
principal_attrs[key] = self._cedar_safe_value(attributes[key])
entities.append(
{
"uid": {"__entity": {"type": principal_type, "id": user_id}},
"attrs": principal_attrs,
"parents": [],
}
)
# Build resource entity
resource_id = str(attributes.get("resource_id", "unknown"))
resource_attrs: dict[str, Any] = {}
for key in ("status", "owner_id", "is_owner", "created_at", "updated_at"):
if key in attributes:
resource_attrs[key] = self._cedar_safe_value(attributes[key])
entities.append(
{
"uid": {"__entity": {"type": resource_type, "id": resource_id}},
"attrs": resource_attrs,
"parents": [],
}
)
if extra_entities:
entities.extend(extra_entities)
return entities
def _cedar_safe_value(self, value: Any) -> Any:
"""Convert a Python value to a Cedar-compatible value."""
if isinstance(value, (str, int, bool)):
return value
if isinstance(value, (list, tuple)):
return [self._cedar_safe_value(v) for v in value]
if isinstance(value, dict):
return {k: self._cedar_safe_value(v) for k, v in value.items()}
return str(value)
def _authorize(
self,
policies: str,
request: dict[str, Any],
entities: list[dict[str, Any]],
) -> bool:
"""Run Cedar authorization."""
import cedarpy
try:
result = cedarpy.is_authorized(request, policies, entities)
return result.allowed # type: ignore[no-any-return]
except Exception:
logger.exception("CedarConditionEngine: evaluation failed")
return False
def validate_conditions(self, conditions: dict[str, Any]) -> list[str]:
"""Validate Cedar conditions structure."""
errors: list[str] = []
if not isinstance(conditions, dict):
errors.append("Conditions must be a dict")
return errors
policies = conditions.get("cedar_policies")
if not policies:
errors.append("Missing required key: cedar_policies")
return errors
if not isinstance(policies, str):
errors.append("cedar_policies must be a string containing Cedar policy text")
return errors
if self._cedar_available:
try:
import cedarpy
result = cedarpy.validate_policies(policies)
if hasattr(result, "errors") and result.errors:
for err in result.errors:
errors.append(f"Cedar policy validation error: {err}")
except Exception as exc:
errors.append(f"Cedar policy parse error: {exc}")
extra = conditions.get("cedar_entities")
if extra is not None and not isinstance(extra, list):
errors.append("cedar_entities must be a list of entity dicts")
return errors