1
0
card/api/app/auth/dependencies.py
2025-06-02 18:24:43 +09:00

116 lines
3.5 KiB
Python

"""Authentication dependencies"""
from typing import Optional, Annotated
from fastapi import Depends, HTTPException, Header, Cookie
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from datetime import datetime, timedelta
# from app.services.atproto import AtprotoService # Temporarily disabled
from app.core.config import settings
# Bearer token scheme
bearer_scheme = HTTPBearer(auto_error=False)
# JWT settings
SECRET_KEY = settings.secret_key if hasattr(settings, 'secret_key') else "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours
class AuthUser:
"""Authenticated user data"""
def __init__(self, did: str, handle: Optional[str] = None):
self.did = did
self.handle = handle
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def verify_token(token: str) -> Optional[AuthUser]:
"""Verify JWT token and return user"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
did: str = payload.get("did")
if did is None:
return None
handle: Optional[str] = payload.get("handle")
return AuthUser(did=did, handle=handle)
except JWTError:
return None
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme),
token_cookie: Optional[str] = Cookie(None, alias="ai_card_token")
) -> Optional[AuthUser]:
"""
Get current user from JWT token
Supports both Bearer token and cookie
"""
token = None
# Try Bearer token first
if credentials and credentials.credentials:
token = credentials.credentials
# Fall back to cookie
elif token_cookie:
token = token_cookie
if not token:
return None
user = await verify_token(token)
return user
async def require_user(
current_user: Optional[AuthUser] = Depends(get_current_user)
) -> AuthUser:
"""Require authenticated user"""
if not current_user:
raise HTTPException(
status_code=401,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
return current_user
async def get_optional_user(
current_user: Optional[AuthUser] = Depends(get_current_user)
) -> Optional[AuthUser]:
"""Get user if authenticated, None otherwise"""
return current_user
# Temporarily disabled due to atproto dependency issues
class AtprotoAuth:
"""atproto authentication handler (mock implementation)"""
def __init__(self):
pass # self.service = AtprotoService()
async def authenticate(self, identifier: str, password: str) -> Optional[AuthUser]:
"""Mock authentication - always returns test user"""
# Mock implementation for testing
if identifier and password:
return AuthUser(did="did:plc:test123", handle=identifier)
return None
async def verify_did_ownership(self, did: str, session_string: str) -> bool:
"""Mock verification - always returns True for test"""
return True