Source code for src.module.identity.domain.otp_service

"""OTP service — pipeline for multi-factor authentication.

Supports three device types:
- **Email OTP**: sends a 6-digit code to the user's email
- **TOTP**: standard RFC 6238 time-based OTP (Google Authenticator, Authy, etc.)
- **Static codes**: one-time recovery codes

The OTP pipeline:
1. User logs in with email/password -> gets a partial token (``otp_required=true``)
2. User submits OTP code -> gets a full JWT token pair
"""

from __future__ import annotations

import hashlib
import hmac
import random
import secrets
import struct
import time
from typing import Any

from src.share_kernel.settings import get_setting


def is_otp_enabled() -> bool:
    """Check if OTP is globally enabled."""
    return bool(get_setting("OTP_ENABLED"))


def get_enabled_device_types() -> list[str]:
    """Return list of enabled OTP device types."""
    types = []
    if get_setting("OTP_EMAIL_ENABLED"):
        types.append("email")
    if get_setting("OTP_TOTP_ENABLED"):
        types.append("totp")
    if get_setting("OTP_STATIC_ENABLED"):
        types.append("static")
    return types


def user_has_confirmed_device(user: Any) -> bool:
    """Check if a user has at least one confirmed, active OTP device."""
    from src.module.identity.models import OTPDevice

    return OTPDevice.objects.filter(
        user=user, confirmed=True, is_active=True
    ).exists()


def generate_email_code() -> str:
    """Generate a random 6-digit OTP code."""
    return f"{random.SystemRandom().randint(0, 999999):06d}"


def create_email_challenge(user: Any) -> str:
    """Create and send an email OTP challenge.

    Returns the challenge ID (for reference, not the code).
    """
    from django.core.mail import send_mail

    from src.module.identity.models import OTPChallenge, OTPDevice

    # Get or create email device
    device, _ = OTPDevice.objects.get_or_create(
        user=user,
        device_type="email",
        name="default",
        defaults={"confirmed": True, "is_active": True},
    )

    # Invalidate previous challenges
    OTPChallenge.objects.filter(device=device, used=False).update(used=True)

    code = generate_email_code()
    challenge = OTPChallenge(device=device, code=code)
    challenge.save()

    # Send the code via email
    send_mail(
        subject="Your verification code",
        message=f"Your verification code is: {code}\n\nThis code expires in "
        f"{int(get_setting('OTP_EMAIL_TOKEN_VALIDITY_SECONDS')) // 60} minutes.",
        from_email=None,  # uses DEFAULT_FROM_EMAIL
        recipient_list=[user.email],
        fail_silently=False,
    )

    return str(challenge.id)


def verify_email_code(user: Any, code: str) -> bool:
    """Verify an email OTP code."""
    from src.module.identity.models import OTPChallenge, OTPDevice

    device = OTPDevice.objects.filter(
        user=user, device_type="email", is_active=True
    ).first()
    if not device:
        return False

    challenge = OTPChallenge.objects.filter(
        device=device, code=code, used=False
    ).order_by("-created_at").first()

    if not challenge or not challenge.is_valid:
        return False

    challenge.used = True
    challenge.save(update_fields=["used"])
    return True


# --- TOTP ---

def generate_totp_secret() -> str:
    """Generate a base32-encoded TOTP secret."""
    import base64

    return base64.b32encode(secrets.token_bytes(20)).decode("ascii")


def get_totp_provisioning_uri(user: Any, secret: str) -> str:
    """Generate an otpauth:// URI for QR code display."""
    import urllib.parse

    issuer = str(get_setting("OTP_TOTP_ISSUER_NAME"))
    label = urllib.parse.quote(f"{issuer}:{user.email}")
    params = urllib.parse.urlencode({
        "secret": secret,
        "issuer": issuer,
        "algorithm": "SHA1",
        "digits": "6",
        "period": "30",
    })
    return f"otpauth://totp/{label}?{params}"


def _compute_totp(secret_b32: str, time_step: int) -> str:
    """Compute a TOTP value for a given time step (RFC 6238)."""
    import base64

    key = base64.b32decode(secret_b32, casefold=True)
    msg = struct.pack(">Q", time_step)
    h = hmac.new(key, msg, hashlib.sha1).digest()
    offset = h[-1] & 0x0F
    code = struct.unpack(">I", h[offset:offset + 4])[0] & 0x7FFFFFFF
    return f"{code % 10**6:06d}"


[docs] def verify_totp(user: Any, code: str) -> bool: """Verify a TOTP code against the user's TOTP device. Allows a 1-step window (previous, current, next) to account for clock skew. """ from src.module.identity.models import OTPDevice device = OTPDevice.objects.filter( user=user, device_type="totp", confirmed=True, is_active=True ).first() if not device or not device.totp_secret: return False current_step = int(time.time()) // 30 for offset in (-1, 0, 1): expected = _compute_totp(device.totp_secret, current_step + offset) if hmac.compare_digest(expected, code): return True return False
def setup_totp_device(user: Any) -> dict[str, str]: """Create a TOTP device for a user (not yet confirmed). Returns the secret and provisioning URI for QR code display. """ from src.module.identity.models import OTPDevice secret = generate_totp_secret() device, created = OTPDevice.objects.update_or_create( user=user, device_type="totp", name="default", defaults={ "totp_secret": secret, "confirmed": False, "is_active": True, }, ) return { "secret": secret, "provisioning_uri": get_totp_provisioning_uri(user, secret), "device_id": str(device.id), } def confirm_totp_device(user: Any, code: str) -> bool: """Confirm a TOTP device by verifying a code. This proves the user has successfully configured their authenticator app. """ from src.module.identity.models import OTPDevice device = OTPDevice.objects.filter( user=user, device_type="totp", confirmed=False, is_active=True ).first() if not device or not device.totp_secret: return False current_step = int(time.time()) // 30 for offset in (-1, 0, 1): expected = _compute_totp(device.totp_secret, current_step + offset) if hmac.compare_digest(expected, code): device.confirmed = True device.save(update_fields=["confirmed"]) return True return False # --- Static Recovery Codes ---
[docs] def generate_static_codes(user: Any) -> list[str]: """Generate static recovery codes and store their hashes. Returns the plaintext codes (shown once to the user). """ from src.module.identity.models import OTPDevice count = int(get_setting("OTP_STATIC_TOKEN_COUNT")) codes = [f"{random.SystemRandom().randint(0, 99999999):08d}" for _ in range(count)] hashed = [hashlib.sha256(c.encode()).hexdigest() for c in codes] device, _ = OTPDevice.objects.update_or_create( user=user, device_type="static", name="default", defaults={ "static_codes": hashed, "confirmed": True, "is_active": True, }, ) return codes
[docs] def verify_static_code(user: Any, code: str) -> bool: """Verify and consume a static recovery code.""" from src.module.identity.models import OTPDevice device = OTPDevice.objects.filter( user=user, device_type="static", confirmed=True, is_active=True ).first() if not device: return False hashed = hashlib.sha256(code.encode()).hexdigest() codes = list(device.static_codes) if hashed in codes: codes.remove(hashed) device.static_codes = codes device.save(update_fields=["static_codes"]) return True return False
# --- Unified Verify ---
[docs] def verify_otp(user: Any, code: str, device_type: str | None = None) -> bool: """Verify an OTP code, trying the specified device type or all enabled types. Args: user: The user to verify for. code: The OTP code to verify. device_type: Optional device type to verify against. If None, tries all enabled types in order: totp, email, static. Returns: True if the code is valid, False otherwise. """ if device_type: verifiers = { "email": verify_email_code, "totp": verify_totp, "static": verify_static_code, } verifier = verifiers.get(device_type) if verifier: return verifier(user, code) return False # Try in priority order: TOTP first (most common), then email, then static if verify_totp(user, code): return True if verify_email_code(user, code): return True if verify_static_code(user, code): return True return False