Source code for src.module.authorization.domain.services

"""Authorization Context domain services — Policy Decision Point (PDP).

The PDP is the core of ABAC evaluation. It is a pure domain service that
receives all dependencies via parameters.
"""

from __future__ import annotations

import logging

from src.module.authorization.domain.value_objects import (
    EvaluationRequest,
    EvaluationResult,
    PolicyData,
)
from src.share_kernel.domain.interfaces import ConditionEngine

logger = logging.getLogger(__name__)


[docs] class PolicyEvaluationService: """Policy Decision Point (PDP) — evaluates ABAC policies. This is a pure domain service. It receives policies and a condition engine as parameters and produces an EvaluationResult. """ def __init__(self, condition_engine: ConditionEngine) -> None: self.condition_engine = condition_engine
[docs] def evaluate( self, request: EvaluationRequest, policies: list[PolicyData], ) -> EvaluationResult: """Evaluate ABAC policies against the request context. Algorithm: 1. Filter policies by resource_type (exact match or wildcard '*') 2. Filter by action (exact match or wildcard '*') 3. Separate into object-level and type-level groups 4. Sort each group by priority (descending) 5. Evaluate object-level first (absolute priority) 6. Then type-level 7. First matching policy wins 8. No match = deny (deny by default) Returns: EvaluationResult with decision 'allow' or 'deny'. """ # Step 1+2: Filter applicable policies applicable = self._filter_applicable(request, policies) # Step 3: Separate into object-level and type-level object_level = [p for p in applicable if p.resource_id is not None and p.resource_id == request.resource_id] type_level = [p for p in applicable if p.resource_id is None] # Step 4: Sort by priority descending object_level.sort(key=lambda p: p.priority, reverse=True) type_level.sort(key=lambda p: p.priority, reverse=True) # Step 5: Evaluate object-level first result = self._evaluate_group(request, object_level) if result is not None: return result # Step 6: Evaluate type-level result = self._evaluate_group(request, type_level) if result is not None: return result # Step 7: No match = deny by default return EvaluationResult( decision="deny", reason="No matching policy found (deny by default)", )
def _filter_applicable( self, request: EvaluationRequest, policies: list[PolicyData], ) -> list[PolicyData]: """Filter policies by resource_type and action.""" result = [] for policy in policies: # Check resource_type match if policy.resource_type != "*" and policy.resource_type != request.resource_type: continue # Check action match if "*" not in policy.actions and request.action not in policy.actions: continue result.append(policy) return result def _evaluate_group( self, request: EvaluationRequest, policies: list[PolicyData], ) -> EvaluationResult | None: """Evaluate a group of policies in priority order. Returns on first match.""" for policy in policies: if self._policy_matches(request, policy): if policy.effect == "deny": return EvaluationResult( decision="deny", matching_policy_id=policy.id, matching_policy_name=policy.name, reason=f"Denied by policy: {policy.name}", ) else: # allow return EvaluationResult( decision="allow", matching_policy_id=policy.id, matching_policy_name=policy.name, requires_approval=policy.requires_approval, approval_config=policy.approval_config if policy.requires_approval else None, reason=f"Allowed by policy: {policy.name}", ) return None def _policy_matches(self, request: EvaluationRequest, policy: PolicyData) -> bool: """Check if all conditions of a policy match the request context.""" # Evaluate subject conditions if policy.subject_conditions: if not self.condition_engine.evaluate(policy.subject_conditions, request.subject_attributes): return False # Evaluate resource conditions if policy.resource_conditions: if not self.condition_engine.evaluate(policy.resource_conditions, request.resource_attributes): return False # Evaluate environment conditions if policy.environment_conditions: if not self.condition_engine.evaluate(policy.environment_conditions, request.environment_attributes): return False return True