290 lines
12 KiB
Python
290 lines
12 KiB
Python
"""MCP Server for ai.card system"""
|
|
|
|
from typing import Optional, List, Dict, Any
|
|
from mcp.server.fastmcp import FastMCP
|
|
from fastapi import FastAPI, Depends, HTTPException
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from pathlib import Path
|
|
import logging
|
|
|
|
from app.core.config import settings
|
|
from app.db.base import get_session
|
|
from app.models.card import Card, CardRarity, CardDrawResult
|
|
from app.repositories.card import CardRepository, UniqueCardRepository
|
|
from app.repositories.user import UserRepository
|
|
from app.services.gacha import GachaService
|
|
# from app.services.card_sync import CardSyncService # Temporarily disabled
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AICardMcpServer:
|
|
"""MCP Server that exposes ai.card functionality to AI assistants"""
|
|
|
|
def __init__(self, enable_mcp: bool = True):
|
|
self.enable_mcp = enable_mcp
|
|
|
|
# Create FastAPI app
|
|
self.app = FastAPI(
|
|
title="AI.Card - Card Game System",
|
|
description="MCP server for ai.card system",
|
|
version=settings.app_version
|
|
)
|
|
|
|
# Create MCP server with FastAPI app
|
|
self.server = None
|
|
if enable_mcp:
|
|
self.server = FastMCP("aicard")
|
|
self._register_mcp_tools()
|
|
|
|
def get_app(self) -> FastAPI:
|
|
"""Get the FastAPI app instance"""
|
|
return self.app
|
|
|
|
def _register_mcp_tools(self):
|
|
"""Register all MCP tools"""
|
|
|
|
@self.app.get("/get_user_cards", operation_id="get_user_cards")
|
|
async def get_user_cards(
|
|
did: str,
|
|
limit: int = 10,
|
|
session: AsyncSession = Depends(get_session)
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get all cards owned by a user"""
|
|
try:
|
|
user_repo = UserRepository(session)
|
|
card_repo = CardRepository(session)
|
|
|
|
# Get user
|
|
user = await user_repo.get_by_did(did)
|
|
if not user:
|
|
return []
|
|
|
|
# Get user cards
|
|
user_cards = await card_repo.get_user_cards(user.id, limit=limit)
|
|
|
|
return [
|
|
{
|
|
"id": card.card_id,
|
|
"cp": card.cp,
|
|
"status": card.status,
|
|
"skill": card.skill,
|
|
"owner_did": did,
|
|
"obtained_at": card.obtained_at.isoformat(),
|
|
"is_unique": card.is_unique,
|
|
"unique_id": str(card.unique_id) if card.unique_id else None
|
|
}
|
|
for card in user_cards
|
|
]
|
|
except Exception as e:
|
|
logger.error(f"Error getting user cards: {e}")
|
|
return []
|
|
|
|
@self.app.post("/draw_card", operation_id="draw_card")
|
|
async def draw_card(
|
|
did: str,
|
|
is_paid: bool = False,
|
|
session: AsyncSession = Depends(get_session)
|
|
) -> Dict[str, Any]:
|
|
"""Draw a new card (gacha) for user"""
|
|
try:
|
|
gacha_service = GachaService(session)
|
|
|
|
# Draw card
|
|
card, is_unique = await gacha_service.draw_card(did, is_paid)
|
|
await session.commit()
|
|
|
|
return {
|
|
"success": True,
|
|
"card": {
|
|
"id": card.id,
|
|
"cp": card.cp,
|
|
"status": card.status,
|
|
"skill": card.skill,
|
|
"owner_did": card.owner_did,
|
|
"obtained_at": card.obtained_at.isoformat(),
|
|
"is_unique": card.is_unique,
|
|
"unique_id": card.unique_id
|
|
},
|
|
"is_unique": is_unique,
|
|
"animation_type": "kira" if card.status in [CardRarity.KIRA, CardRarity.UNIQUE] else "normal"
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error drawing card: {e}")
|
|
await session.rollback()
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
@self.app.get("/get_card_details", operation_id="get_card_details")
|
|
async def get_card_details(
|
|
card_id: int,
|
|
session: AsyncSession = Depends(get_session)
|
|
) -> Dict[str, Any]:
|
|
"""Get detailed information about a card type"""
|
|
try:
|
|
# Get card info from gacha service
|
|
gacha_service = GachaService(session)
|
|
|
|
if card_id not in gacha_service.CARD_INFO:
|
|
return {"error": f"Card ID {card_id} not found"}
|
|
|
|
card_info = gacha_service.CARD_INFO[card_id]
|
|
|
|
# Get unique card availability
|
|
unique_repo = UniqueCardRepository(session)
|
|
is_unique_available = await unique_repo.is_card_available(card_id)
|
|
|
|
return {
|
|
"id": card_id,
|
|
"name": card_info["name"],
|
|
"base_cp_range": card_info["base_cp_range"],
|
|
"is_unique_available": is_unique_available,
|
|
"description": f"Card {card_id}: {card_info['name']}"
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting card details: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@self.app.post("/sync_cards_atproto", operation_id="sync_cards_atproto")
|
|
async def sync_cards_atproto(
|
|
did: str,
|
|
session: AsyncSession = Depends(get_session)
|
|
) -> Dict[str, str]:
|
|
"""Sync user's cards with atproto (temporarily disabled)"""
|
|
return {"status": "atproto sync temporarily disabled due to dependency issues"}
|
|
|
|
@self.app.get("/analyze_card_collection", operation_id="analyze_card_collection")
|
|
async def analyze_card_collection(
|
|
did: str,
|
|
session: AsyncSession = Depends(get_session)
|
|
) -> Dict[str, Any]:
|
|
"""Analyze user's card collection"""
|
|
try:
|
|
user_repo = UserRepository(session)
|
|
card_repo = CardRepository(session)
|
|
|
|
# Get user
|
|
user = await user_repo.get_by_did(did)
|
|
if not user:
|
|
return {
|
|
"total_cards": 0,
|
|
"rarity_distribution": {},
|
|
"message": "User not found"
|
|
}
|
|
|
|
# Get all user cards
|
|
user_cards = await card_repo.get_user_cards(user.id, limit=1000)
|
|
|
|
if not user_cards:
|
|
return {
|
|
"total_cards": 0,
|
|
"rarity_distribution": {},
|
|
"message": "No cards found"
|
|
}
|
|
|
|
# Analyze collection
|
|
rarity_count = {}
|
|
total_cp = 0
|
|
card_type_count = {}
|
|
|
|
for card in user_cards:
|
|
# Rarity distribution
|
|
rarity = card.status
|
|
rarity_count[rarity] = rarity_count.get(rarity, 0) + 1
|
|
|
|
# Total CP
|
|
total_cp += card.cp
|
|
|
|
# Card type distribution
|
|
card_type_count[card.card_id] = card_type_count.get(card.card_id, 0) + 1
|
|
|
|
# Find strongest card
|
|
strongest_card = max(user_cards, key=lambda x: x.cp)
|
|
|
|
return {
|
|
"total_cards": len(user_cards),
|
|
"rarity_distribution": rarity_count,
|
|
"card_type_distribution": card_type_count,
|
|
"average_cp": total_cp / len(user_cards) if user_cards else 0,
|
|
"total_cp": total_cp,
|
|
"strongest_card": {
|
|
"id": strongest_card.card_id,
|
|
"cp": strongest_card.cp,
|
|
"status": strongest_card.status,
|
|
"is_unique": strongest_card.is_unique
|
|
},
|
|
"unique_count": len([c for c in user_cards if c.is_unique])
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing collection: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@self.app.get("/get_unique_registry", operation_id="get_unique_registry")
|
|
async def get_unique_registry(
|
|
session: AsyncSession = Depends(get_session)
|
|
) -> Dict[str, Any]:
|
|
"""Get all registered unique cards"""
|
|
try:
|
|
unique_repo = UniqueCardRepository(session)
|
|
|
|
# Get all unique cards
|
|
unique_cards = await unique_repo.get_all_unique_cards()
|
|
|
|
# Get available unique card IDs
|
|
available_ids = await unique_repo.get_available_unique_cards()
|
|
|
|
return {
|
|
"registered_unique_cards": [
|
|
{
|
|
"card_id": card.card_id,
|
|
"unique_id": card.unique_id,
|
|
"owner_did": card.owner_did,
|
|
"obtained_at": card.obtained_at.isoformat()
|
|
}
|
|
for card in unique_cards
|
|
],
|
|
"available_unique_card_ids": available_ids,
|
|
"total_registered": len(unique_cards),
|
|
"total_available": len(available_ids)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting unique registry: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@self.app.get("/get_gacha_stats", operation_id="get_gacha_stats")
|
|
async def get_gacha_stats(
|
|
session: AsyncSession = Depends(get_session)
|
|
) -> Dict[str, Any]:
|
|
"""Get gacha system statistics"""
|
|
try:
|
|
return {
|
|
"rarity_probabilities": {
|
|
"normal": f"{100 - settings.prob_rare}%",
|
|
"rare": f"{settings.prob_rare - settings.prob_super_rare}%",
|
|
"super_rare": f"{settings.prob_super_rare - settings.prob_kira}%",
|
|
"kira": f"{settings.prob_kira - settings.prob_unique}%",
|
|
"unique": f"{settings.prob_unique}%"
|
|
},
|
|
"total_card_types": 16,
|
|
"card_names": [info["name"] for info in GachaService.CARD_INFO.values()],
|
|
"system_info": {
|
|
"daily_limit": "1 free draw per day",
|
|
"paid_gacha": "Enhanced probabilities",
|
|
"unique_system": "First-come-first-served globally unique cards"
|
|
}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting gacha stats: {e}")
|
|
return {"error": str(e)}
|
|
|
|
# MCP server will be run separately, not here
|
|
|
|
def get_server(self) -> Optional[FastMCP]:
|
|
"""Get the FastAPI MCP server instance"""
|
|
return self.server
|
|
|
|
def get_app(self) -> FastAPI:
|
|
"""Get the FastAPI app instance"""
|
|
return self.app |