1
0
card/api/app/auth/dependencies.py
2025-06-01 21:39:53 +09:00

154 lines
4.3 KiB
Python

"""Authentication dependencies"""
from typing import Optional, Annotated
from fastapi import Depends, HTTPException, Header, Cookie
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from datetime import datetime, timedelta
from app.services.atproto import AtprotoService
from app.core.config import settings
# Bearer token scheme
bearer_scheme = HTTPBearer(auto_error=False)
# JWT settings
SECRET_KEY = settings.secret_key if hasattr(settings, 'secret_key') else "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours
class AuthUser:
"""Authenticated user data"""
def __init__(self, did: str, handle: Optional[str] = None):
self.did = did
self.handle = handle
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def verify_token(token: str) -> Optional[AuthUser]:
"""Verify JWT token and return user"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
did: str = payload.get("did")
if did is None:
return None
handle: Optional[str] = payload.get("handle")
return AuthUser(did=did, handle=handle)
except JWTError:
return None
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme),
token_cookie: Optional[str] = Cookie(None, alias="ai_card_token")
) -> Optional[AuthUser]:
"""
Get current user from JWT token
Supports both Bearer token and cookie
"""
token = None
# Try Bearer token first
if credentials and credentials.credentials:
token = credentials.credentials
# Fall back to cookie
elif token_cookie:
token = token_cookie
if not token:
return None
user = await verify_token(token)
return user
async def require_user(
current_user: Optional[AuthUser] = Depends(get_current_user)
) -> AuthUser:
"""Require authenticated user"""
if not current_user:
raise HTTPException(
status_code=401,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
return current_user
async def get_optional_user(
current_user: Optional[AuthUser] = Depends(get_current_user)
) -> Optional[AuthUser]:
"""Get user if authenticated, None otherwise"""
return current_user
class AtprotoAuth:
"""atproto authentication handler"""
def __init__(self):
self.service = AtprotoService()
async def authenticate(self, identifier: str, password: str) -> Optional[AuthUser]:
"""
Authenticate user with atproto
Args:
identifier: Handle or DID
password: App password
Returns:
AuthUser if successful
"""
try:
# Login to atproto
session = await self.service.login(identifier, password)
# Get user info from session
# The session contains the DID
if self.service.client:
did = self.service.client.did
handle = self.service.client.handle
return AuthUser(did=did, handle=handle)
return None
except Exception:
return None
async def verify_did_ownership(self, did: str, session_string: str) -> bool:
"""
Verify user owns the DID by checking session
Args:
did: DID to verify
session_string: Session string from login
Returns:
True if session is valid for DID
"""
try:
self.service.restore_session(session_string)
if self.service.client and self.service.client.did == did:
return True
return False
except Exception:
return False