"""Authentication dependencies""" from typing import Optional, Annotated from fastapi import Depends, HTTPException, Header, Cookie from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError, jwt from datetime import datetime, timedelta from app.services.atproto import AtprotoService from app.core.config import settings # Bearer token scheme bearer_scheme = HTTPBearer(auto_error=False) # JWT settings SECRET_KEY = settings.secret_key if hasattr(settings, 'secret_key') else "your-secret-key" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours class AuthUser: """Authenticated user data""" def __init__(self, did: str, handle: Optional[str] = None): self.did = did self.handle = handle def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """Create JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def verify_token(token: str) -> Optional[AuthUser]: """Verify JWT token and return user""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) did: str = payload.get("did") if did is None: return None handle: Optional[str] = payload.get("handle") return AuthUser(did=did, handle=handle) except JWTError: return None async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme), token_cookie: Optional[str] = Cookie(None, alias="ai_card_token") ) -> Optional[AuthUser]: """ Get current user from JWT token Supports both Bearer token and cookie """ token = None # Try Bearer token first if credentials and credentials.credentials: token = credentials.credentials # Fall back to cookie elif token_cookie: token = token_cookie if not token: return None user = await verify_token(token) return user async def require_user( current_user: Optional[AuthUser] = Depends(get_current_user) ) -> AuthUser: """Require authenticated user""" if not current_user: raise HTTPException( status_code=401, detail="Not authenticated", headers={"WWW-Authenticate": "Bearer"}, ) return current_user async def get_optional_user( current_user: Optional[AuthUser] = Depends(get_current_user) ) -> Optional[AuthUser]: """Get user if authenticated, None otherwise""" return current_user class AtprotoAuth: """atproto authentication handler""" def __init__(self): self.service = AtprotoService() async def authenticate(self, identifier: str, password: str) -> Optional[AuthUser]: """ Authenticate user with atproto Args: identifier: Handle or DID password: App password Returns: AuthUser if successful """ try: # Login to atproto session = await self.service.login(identifier, password) # Get user info from session # The session contains the DID if self.service.client: did = self.service.client.did handle = self.service.client.handle return AuthUser(did=did, handle=handle) return None except Exception: return None async def verify_did_ownership(self, did: str, session_string: str) -> bool: """ Verify user owns the DID by checking session Args: did: DID to verify session_string: Session string from login Returns: True if session is valid for DID """ try: self.service.restore_session(session_string) if self.service.client and self.service.client.did == did: return True return False except Exception: return False