"""atproto integration service""" import json from typing import Optional, Dict, Any, List from datetime import datetime import httpx from atproto import Client, SessionString from atproto.exceptions import AtProtocolError from app.core.config import settings from app.models.card import Card, CardRarity class AtprotoService: """atproto integration service""" def __init__(self): self.client = None self.session_string = None async def login(self, identifier: str, password: str) -> SessionString: """ Login to atproto PDS Args: identifier: Handle or DID password: App password Returns: Session string for future requests """ self.client = Client() try: self.client.login(identifier, password) self.session_string = self.client.export_session_string() return self.session_string except AtProtocolError as e: raise Exception(f"Failed to login to atproto: {str(e)}") def restore_session(self, session_string: str): """Restore session from string""" self.client = Client() self.client.login_with_session_string(session_string) self.session_string = session_string async def verify_did(self, did: str, handle: Optional[str] = None) -> bool: """ Verify DID is valid Args: did: DID to verify handle: Optional handle to cross-check Returns: True if valid """ try: # Use public API to resolve DID async with httpx.AsyncClient() as client: response = await client.get( f"https://plc.directory/{did}", follow_redirects=True ) if response.status_code != 200: return False data = response.json() # Verify handle if provided if handle and data.get("alsoKnownAs"): expected_handle = f"at://{handle}" return expected_handle in data["alsoKnownAs"] return True except Exception: return False async def get_profile(self, did: str) -> Optional[Dict[str, Any]]: """Get user profile from atproto""" if not self.client: raise Exception("Not logged in") try: profile = self.client.get_profile(did) return { "did": profile.did, "handle": profile.handle, "display_name": profile.display_name, "avatar": profile.avatar, "description": profile.description } except Exception: return None async def create_card_record( self, did: str, card: Card, collection: str = "ai.card.collection" ) -> str: """ Create card record in user's PDS Args: did: User's DID card: Card data collection: Collection name (lexicon) Returns: Record URI """ if not self.client: raise Exception("Not logged in") # Prepare card data for atproto record_data = { "$type": collection, "cardId": card.id, "cp": card.cp, "status": card.status.value, "skill": card.skill, "obtainedAt": card.obtained_at.isoformat(), "isUnique": card.is_unique, "uniqueId": card.unique_id, "createdAt": datetime.utcnow().isoformat() } try: # Create record response = self.client.com.atproto.repo.create_record( repo=did, collection=collection, record=record_data ) return response.uri except AtProtocolError as e: raise Exception(f"Failed to create card record: {str(e)}") async def get_user_cards( self, did: str, collection: str = "ai.card.collection", limit: int = 100 ) -> List[Dict[str, Any]]: """ Get user's cards from PDS Args: did: User's DID collection: Collection name limit: Maximum records to fetch Returns: List of card records """ if not self.client: raise Exception("Not logged in") try: # List records response = self.client.com.atproto.repo.list_records( repo=did, collection=collection, limit=limit ) cards = [] for record in response.records: card_data = record.value card_data["uri"] = record.uri card_data["cid"] = record.cid cards.append(card_data) return cards except AtProtocolError: # Collection might not exist yet return [] async def delete_card_record(self, did: str, record_uri: str): """Delete a card record from PDS""" if not self.client: raise Exception("Not logged in") try: # Parse collection and rkey from URI # Format: at://did/collection/rkey parts = record_uri.split("/") if len(parts) < 5: raise ValueError("Invalid record URI") collection = parts[3] rkey = parts[4] self.client.com.atproto.repo.delete_record( repo=did, collection=collection, rkey=rkey ) except AtProtocolError as e: raise Exception(f"Failed to delete record: {str(e)}") async def create_oauth_session(self, code: str, redirect_uri: str) -> Dict[str, Any]: """ Handle OAuth callback and create session Args: code: Authorization code redirect_uri: Redirect URI used in authorization Returns: Session data including DID and access token """ # TODO: Implement when atproto OAuth is available raise NotImplementedError("OAuth support is not yet available in atproto") class CardLexicon: """Card collection lexicon definition""" LEXICON_ID = "ai.card.collection" @staticmethod def get_lexicon() -> Dict[str, Any]: """Get lexicon definition for card collection""" return { "lexicon": 1, "id": CardLexicon.LEXICON_ID, "defs": { "main": { "type": "record", "description": "A collectible card", "key": "tid", "record": { "type": "object", "required": ["cardId", "cp", "status", "obtainedAt", "createdAt"], "properties": { "cardId": { "type": "integer", "description": "Card type ID (0-15)", "minimum": 0, "maximum": 15 }, "cp": { "type": "integer", "description": "Card power", "minimum": 1, "maximum": 999 }, "status": { "type": "string", "description": "Card rarity", "enum": ["normal", "rare", "super_rare", "kira", "unique"] }, "skill": { "type": "string", "description": "Card skill", "maxLength": 1000 }, "obtainedAt": { "type": "string", "format": "datetime", "description": "When the card was obtained" }, "isUnique": { "type": "boolean", "description": "Whether this is a unique card", "default": False }, "uniqueId": { "type": "string", "description": "Global unique identifier", "format": "uuid" }, "createdAt": { "type": "string", "format": "datetime", "description": "Record creation time" } } } } } }