Source code for src.module.authorization.domain.services
"""Authorization Context domain services — Policy Decision Point (PDP).
The PDP is the core of ABAC evaluation. It is a pure domain service that
receives all dependencies via parameters.
"""
from __future__ import annotations
import logging
from src.module.authorization.domain.value_objects import (
EvaluationRequest,
EvaluationResult,
PolicyData,
)
from src.share_kernel.domain.interfaces import ConditionEngine
logger = logging.getLogger(__name__)
[docs]
class PolicyEvaluationService:
"""Policy Decision Point (PDP) — evaluates ABAC policies.
This is a pure domain service. It receives policies and a condition engine
as parameters and produces an EvaluationResult.
"""
def __init__(self, condition_engine: ConditionEngine) -> None:
self.condition_engine = condition_engine
[docs]
def evaluate(
self,
request: EvaluationRequest,
policies: list[PolicyData],
) -> EvaluationResult:
"""Evaluate ABAC policies against the request context.
Algorithm:
1. Filter policies by resource_type (exact match or wildcard '*')
2. Filter by action (exact match or wildcard '*')
3. Separate into object-level and type-level groups
4. Sort each group by priority (descending)
5. Evaluate object-level first (absolute priority)
6. Then type-level
7. First matching policy wins
8. No match = deny (deny by default)
Returns:
EvaluationResult with decision 'allow' or 'deny'.
"""
# Step 1+2: Filter applicable policies
applicable = self._filter_applicable(request, policies)
# Step 3: Separate into object-level and type-level
object_level = [p for p in applicable if p.resource_id is not None and p.resource_id == request.resource_id]
type_level = [p for p in applicable if p.resource_id is None]
# Step 4: Sort by priority descending
object_level.sort(key=lambda p: p.priority, reverse=True)
type_level.sort(key=lambda p: p.priority, reverse=True)
# Step 5: Evaluate object-level first
result = self._evaluate_group(request, object_level)
if result is not None:
return result
# Step 6: Evaluate type-level
result = self._evaluate_group(request, type_level)
if result is not None:
return result
# Step 7: No match = deny by default
return EvaluationResult(
decision="deny",
reason="No matching policy found (deny by default)",
)
def _filter_applicable(
self,
request: EvaluationRequest,
policies: list[PolicyData],
) -> list[PolicyData]:
"""Filter policies by resource_type and action."""
result = []
for policy in policies:
# Check resource_type match
if policy.resource_type != "*" and policy.resource_type != request.resource_type:
continue
# Check action match
if "*" not in policy.actions and request.action not in policy.actions:
continue
result.append(policy)
return result
def _evaluate_group(
self,
request: EvaluationRequest,
policies: list[PolicyData],
) -> EvaluationResult | None:
"""Evaluate a group of policies in priority order. Returns on first match."""
for policy in policies:
if self._policy_matches(request, policy):
if policy.effect == "deny":
return EvaluationResult(
decision="deny",
matching_policy_id=policy.id,
matching_policy_name=policy.name,
reason=f"Denied by policy: {policy.name}",
)
else: # allow
return EvaluationResult(
decision="allow",
matching_policy_id=policy.id,
matching_policy_name=policy.name,
requires_approval=policy.requires_approval,
approval_config=policy.approval_config if policy.requires_approval else None,
reason=f"Allowed by policy: {policy.name}",
)
return None
def _policy_matches(self, request: EvaluationRequest, policy: PolicyData) -> bool:
"""Check if all conditions of a policy match the request context."""
# Evaluate subject conditions
if policy.subject_conditions:
if not self.condition_engine.evaluate(policy.subject_conditions, request.subject_attributes):
return False
# Evaluate resource conditions
if policy.resource_conditions:
if not self.condition_engine.evaluate(policy.resource_conditions, request.resource_attributes):
return False
# Evaluate environment conditions
if policy.environment_conditions:
if not self.condition_engine.evaluate(policy.environment_conditions, request.environment_attributes):
return False
return True