# server.py from fastapi import FastAPI from fastapi_mcp import FastApiMCP from pydantic import BaseModel #from memory_store import save_message, load_messages, search_memory from memory_store import save_message, load_logs, search_memory app = FastAPI() mcp = FastApiMCP(app, name="aigpt-agent", description="MCP Server for AI memory") # --- モデル定義 --- class ChatInput(BaseModel): message: str class MemoryInput(BaseModel): sender: str message: str class MemoryQuery(BaseModel): query: str # --- ツール(エンドポイント)定義 --- @app.post("/chat", operation_id="chat") async def chat(input: ChatInput): save_message("user", input.message) response = f"AI: 「{input.message}」を受け取りました!" save_message("ai", response) return {"response": response} @app.post("/memory", operation_id="save_memory") async def memory_post(input: MemoryInput): save_message(input.sender, input.message) return {"status": "saved"} @app.get("/memory", operation_id="get_memory") async def memory_get(): return {"messages": load_messages()} @app.post("/ask_message", operation_id="ask_message") async def ask_message(input: MemoryQuery): results = search_memory(input.query) return { "response": f"🔎 記憶から {len(results)} 件ヒット:\n" + "\n".join([f"{r['sender']}: {r['message']}" for r in results]) } @app.post("/memory/search", operation_id="memory") async def search_memory(input: dict): query = input.get("query", "") # 適当なキーワード検索ロジックを追加(例: logs.jsonを検索) return {"result": f"記憶の中から「{query}」に関するデータを返しました"} # --- MCP 初期化 --- mcp.mount() if __name__ == "__main__": import uvicorn print("🚀 Starting MCP server...") uvicorn.run(app, host="127.0.0.1", port=5000)