"""OTP service — pipeline for multi-factor authentication.
Supports three device types:
- **Email OTP**: sends a 6-digit code to the user's email
- **TOTP**: standard RFC 6238 time-based OTP (Google Authenticator, Authy, etc.)
- **Static codes**: one-time recovery codes
The OTP pipeline:
1. User logs in with email/password -> gets a partial token (``otp_required=true``)
2. User submits OTP code -> gets a full JWT token pair
"""
from __future__ import annotations
import hashlib
import hmac
import random
import secrets
import struct
import time
from typing import Any
from src.share_kernel.settings import get_setting
def is_otp_enabled() -> bool:
"""Check if OTP is globally enabled."""
return bool(get_setting("OTP_ENABLED"))
def get_enabled_device_types() -> list[str]:
"""Return list of enabled OTP device types."""
types = []
if get_setting("OTP_EMAIL_ENABLED"):
types.append("email")
if get_setting("OTP_TOTP_ENABLED"):
types.append("totp")
if get_setting("OTP_STATIC_ENABLED"):
types.append("static")
return types
def user_has_confirmed_device(user: Any) -> bool:
"""Check if a user has at least one confirmed, active OTP device."""
from src.module.identity.models import OTPDevice
return OTPDevice.objects.filter(
user=user, confirmed=True, is_active=True
).exists()
def generate_email_code() -> str:
"""Generate a random 6-digit OTP code."""
return f"{random.SystemRandom().randint(0, 999999):06d}"
def create_email_challenge(user: Any) -> str:
"""Create and send an email OTP challenge.
Returns the challenge ID (for reference, not the code).
"""
from django.core.mail import send_mail
from src.module.identity.models import OTPChallenge, OTPDevice
# Get or create email device
device, _ = OTPDevice.objects.get_or_create(
user=user,
device_type="email",
name="default",
defaults={"confirmed": True, "is_active": True},
)
# Invalidate previous challenges
OTPChallenge.objects.filter(device=device, used=False).update(used=True)
code = generate_email_code()
challenge = OTPChallenge(device=device, code=code)
challenge.save()
# Send the code via email
send_mail(
subject="Your verification code",
message=f"Your verification code is: {code}\n\nThis code expires in "
f"{int(get_setting('OTP_EMAIL_TOKEN_VALIDITY_SECONDS')) // 60} minutes.",
from_email=None, # uses DEFAULT_FROM_EMAIL
recipient_list=[user.email],
fail_silently=False,
)
return str(challenge.id)
def verify_email_code(user: Any, code: str) -> bool:
"""Verify an email OTP code."""
from src.module.identity.models import OTPChallenge, OTPDevice
device = OTPDevice.objects.filter(
user=user, device_type="email", is_active=True
).first()
if not device:
return False
challenge = OTPChallenge.objects.filter(
device=device, code=code, used=False
).order_by("-created_at").first()
if not challenge or not challenge.is_valid:
return False
challenge.used = True
challenge.save(update_fields=["used"])
return True
# --- TOTP ---
def generate_totp_secret() -> str:
"""Generate a base32-encoded TOTP secret."""
import base64
return base64.b32encode(secrets.token_bytes(20)).decode("ascii")
def get_totp_provisioning_uri(user: Any, secret: str) -> str:
"""Generate an otpauth:// URI for QR code display."""
import urllib.parse
issuer = str(get_setting("OTP_TOTP_ISSUER_NAME"))
label = urllib.parse.quote(f"{issuer}:{user.email}")
params = urllib.parse.urlencode({
"secret": secret,
"issuer": issuer,
"algorithm": "SHA1",
"digits": "6",
"period": "30",
})
return f"otpauth://totp/{label}?{params}"
def _compute_totp(secret_b32: str, time_step: int) -> str:
"""Compute a TOTP value for a given time step (RFC 6238)."""
import base64
key = base64.b32decode(secret_b32, casefold=True)
msg = struct.pack(">Q", time_step)
h = hmac.new(key, msg, hashlib.sha1).digest()
offset = h[-1] & 0x0F
code = struct.unpack(">I", h[offset:offset + 4])[0] & 0x7FFFFFFF
return f"{code % 10**6:06d}"
[docs]
def verify_totp(user: Any, code: str) -> bool:
"""Verify a TOTP code against the user's TOTP device.
Allows a 1-step window (previous, current, next) to account for clock skew.
"""
from src.module.identity.models import OTPDevice
device = OTPDevice.objects.filter(
user=user, device_type="totp", confirmed=True, is_active=True
).first()
if not device or not device.totp_secret:
return False
current_step = int(time.time()) // 30
for offset in (-1, 0, 1):
expected = _compute_totp(device.totp_secret, current_step + offset)
if hmac.compare_digest(expected, code):
return True
return False
def setup_totp_device(user: Any) -> dict[str, str]:
"""Create a TOTP device for a user (not yet confirmed).
Returns the secret and provisioning URI for QR code display.
"""
from src.module.identity.models import OTPDevice
secret = generate_totp_secret()
device, created = OTPDevice.objects.update_or_create(
user=user,
device_type="totp",
name="default",
defaults={
"totp_secret": secret,
"confirmed": False,
"is_active": True,
},
)
return {
"secret": secret,
"provisioning_uri": get_totp_provisioning_uri(user, secret),
"device_id": str(device.id),
}
def confirm_totp_device(user: Any, code: str) -> bool:
"""Confirm a TOTP device by verifying a code.
This proves the user has successfully configured their authenticator app.
"""
from src.module.identity.models import OTPDevice
device = OTPDevice.objects.filter(
user=user, device_type="totp", confirmed=False, is_active=True
).first()
if not device or not device.totp_secret:
return False
current_step = int(time.time()) // 30
for offset in (-1, 0, 1):
expected = _compute_totp(device.totp_secret, current_step + offset)
if hmac.compare_digest(expected, code):
device.confirmed = True
device.save(update_fields=["confirmed"])
return True
return False
# --- Static Recovery Codes ---
[docs]
def generate_static_codes(user: Any) -> list[str]:
"""Generate static recovery codes and store their hashes.
Returns the plaintext codes (shown once to the user).
"""
from src.module.identity.models import OTPDevice
count = int(get_setting("OTP_STATIC_TOKEN_COUNT"))
codes = [f"{random.SystemRandom().randint(0, 99999999):08d}" for _ in range(count)]
hashed = [hashlib.sha256(c.encode()).hexdigest() for c in codes]
device, _ = OTPDevice.objects.update_or_create(
user=user,
device_type="static",
name="default",
defaults={
"static_codes": hashed,
"confirmed": True,
"is_active": True,
},
)
return codes
[docs]
def verify_static_code(user: Any, code: str) -> bool:
"""Verify and consume a static recovery code."""
from src.module.identity.models import OTPDevice
device = OTPDevice.objects.filter(
user=user, device_type="static", confirmed=True, is_active=True
).first()
if not device:
return False
hashed = hashlib.sha256(code.encode()).hexdigest()
codes = list(device.static_codes)
if hashed in codes:
codes.remove(hashed)
device.static_codes = codes
device.save(update_fields=["static_codes"])
return True
return False
# --- Unified Verify ---
[docs]
def verify_otp(user: Any, code: str, device_type: str | None = None) -> bool:
"""Verify an OTP code, trying the specified device type or all enabled types.
Args:
user: The user to verify for.
code: The OTP code to verify.
device_type: Optional device type to verify against.
If None, tries all enabled types in order: totp, email, static.
Returns:
True if the code is valid, False otherwise.
"""
if device_type:
verifiers = {
"email": verify_email_code,
"totp": verify_totp,
"static": verify_static_code,
}
verifier = verifiers.get(device_type)
if verifier:
return verifier(user, code)
return False
# Try in priority order: TOTP first (most common), then email, then static
if verify_totp(user, code):
return True
if verify_email_code(user, code):
return True
if verify_static_code(user, code):
return True
return False