update
This commit is contained in:
@ -15,3 +15,4 @@ Requires-Dist: openai>=1.0.0
|
||||
Requires-Dist: uvicorn>=0.23.0
|
||||
Requires-Dist: apscheduler>=3.10.0
|
||||
Requires-Dist: croniter>=1.3.0
|
||||
Requires-Dist: prompt-toolkit>=3.0.0
|
||||
|
@ -2,10 +2,12 @@ README.md
|
||||
pyproject.toml
|
||||
src/aigpt/__init__.py
|
||||
src/aigpt/ai_provider.py
|
||||
src/aigpt/card_integration.py
|
||||
src/aigpt/cli.py
|
||||
src/aigpt/config.py
|
||||
src/aigpt/fortune.py
|
||||
src/aigpt/mcp_server.py
|
||||
src/aigpt/mcp_server_simple.py
|
||||
src/aigpt/memory.py
|
||||
src/aigpt/models.py
|
||||
src/aigpt/persona.py
|
||||
|
@ -10,3 +10,4 @@ openai>=1.0.0
|
||||
uvicorn>=0.23.0
|
||||
apscheduler>=3.10.0
|
||||
croniter>=1.3.0
|
||||
prompt-toolkit>=3.0.0
|
||||
|
@ -102,7 +102,7 @@ class OpenAIProvider:
|
||||
config = Config()
|
||||
self.api_key = api_key or config.get_api_key("openai") or os.getenv("OPENAI_API_KEY")
|
||||
if not self.api_key:
|
||||
raise ValueError("OpenAI API key not provided. Set it with: ai-gpt config set providers.openai.api_key YOUR_KEY")
|
||||
raise ValueError("OpenAI API key not provided. Set it with: aigpt config set providers.openai.api_key YOUR_KEY")
|
||||
self.client = OpenAI(api_key=self.api_key)
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
@ -169,4 +169,4 @@ def create_ai_provider(provider: str, model: str, **kwargs) -> AIProvider:
|
||||
elif provider == "openai":
|
||||
return OpenAIProvider(model=model, **kwargs)
|
||||
else:
|
||||
raise ValueError(f"Unknown provider: {provider}")
|
||||
raise ValueError(f"Unknown provider: {provider}")
|
||||
|
150
src/aigpt/card_integration.py
Normal file
150
src/aigpt/card_integration.py
Normal file
@ -0,0 +1,150 @@
|
||||
"""ai.card integration module for ai.gpt MCP server"""
|
||||
|
||||
from typing import Dict, Any, List, Optional
|
||||
import httpx
|
||||
from pathlib import Path
|
||||
import json
|
||||
from datetime import datetime
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CardIntegration:
|
||||
"""Integration with ai.card system"""
|
||||
|
||||
def __init__(self, api_base_url: str = "http://localhost:8001"):
|
||||
self.api_base_url = api_base_url
|
||||
self.client = httpx.AsyncClient()
|
||||
|
||||
async def get_user_cards(self, did: str) -> List[Dict[str, Any]]:
|
||||
"""Get cards for a specific user by DID"""
|
||||
try:
|
||||
response = await self.client.get(
|
||||
f"{self.api_base_url}/api/v1/cards/user/{did}"
|
||||
)
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
logger.error(f"Failed to get cards: {response.status_code}")
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting user cards: {e}")
|
||||
return []
|
||||
|
||||
async def draw_card(self, did: str) -> Optional[Dict[str, Any]]:
|
||||
"""Draw a new card for user (gacha)"""
|
||||
try:
|
||||
response = await self.client.post(
|
||||
f"{self.api_base_url}/api/v1/gacha/draw",
|
||||
json={"did": did}
|
||||
)
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
logger.error(f"Failed to draw card: {response.status_code}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error drawing card: {e}")
|
||||
return None
|
||||
|
||||
async def get_card_info(self, card_id: int) -> Optional[Dict[str, Any]]:
|
||||
"""Get detailed information about a specific card"""
|
||||
try:
|
||||
response = await self.client.get(
|
||||
f"{self.api_base_url}/api/v1/cards/{card_id}"
|
||||
)
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting card info: {e}")
|
||||
return None
|
||||
|
||||
async def sync_with_atproto(self, did: str) -> bool:
|
||||
"""Sync card data with atproto"""
|
||||
try:
|
||||
response = await self.client.post(
|
||||
f"{self.api_base_url}/api/v1/sync/atproto",
|
||||
json={"did": did}
|
||||
)
|
||||
return response.status_code == 200
|
||||
except Exception as e:
|
||||
logger.error(f"Error syncing with atproto: {e}")
|
||||
return False
|
||||
|
||||
async def close(self):
|
||||
"""Close the HTTP client"""
|
||||
await self.client.aclose()
|
||||
|
||||
|
||||
def register_card_tools(app, card_integration: CardIntegration):
|
||||
"""Register ai.card tools to FastAPI app"""
|
||||
|
||||
@app.get("/get_user_cards", operation_id="get_user_cards")
|
||||
async def get_user_cards(did: str) -> List[Dict[str, Any]]:
|
||||
"""Get all cards owned by a user"""
|
||||
cards = await card_integration.get_user_cards(did)
|
||||
return cards
|
||||
|
||||
@app.post("/draw_card", operation_id="draw_card")
|
||||
async def draw_card(did: str) -> Dict[str, Any]:
|
||||
"""Draw a new card (gacha) for user"""
|
||||
result = await card_integration.draw_card(did)
|
||||
if result:
|
||||
return {
|
||||
"success": True,
|
||||
"card": result
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Failed to draw card"
|
||||
}
|
||||
|
||||
@app.get("/get_card_details", operation_id="get_card_details")
|
||||
async def get_card_details(card_id: int) -> Dict[str, Any]:
|
||||
"""Get detailed information about a card"""
|
||||
info = await card_integration.get_card_info(card_id)
|
||||
if info:
|
||||
return info
|
||||
else:
|
||||
return {"error": f"Card {card_id} not found"}
|
||||
|
||||
@app.post("/sync_cards_atproto", operation_id="sync_cards_atproto")
|
||||
async def sync_cards_atproto(did: str) -> Dict[str, str]:
|
||||
"""Sync user's cards with atproto"""
|
||||
success = await card_integration.sync_with_atproto(did)
|
||||
if success:
|
||||
return {"status": "Cards synced successfully"}
|
||||
else:
|
||||
return {"status": "Failed to sync cards"}
|
||||
|
||||
@app.get("/analyze_card_collection", operation_id="analyze_card_collection")
|
||||
async def analyze_card_collection(did: str) -> Dict[str, Any]:
|
||||
"""Analyze user's card collection"""
|
||||
cards = await card_integration.get_user_cards(did)
|
||||
|
||||
if not cards:
|
||||
return {
|
||||
"total_cards": 0,
|
||||
"rarity_distribution": {},
|
||||
"message": "No cards found"
|
||||
}
|
||||
|
||||
# Analyze collection
|
||||
rarity_count = {}
|
||||
total_power = 0
|
||||
|
||||
for card in cards:
|
||||
rarity = card.get("rarity", "common")
|
||||
rarity_count[rarity] = rarity_count.get(rarity, 0) + 1
|
||||
total_power += card.get("power", 0)
|
||||
|
||||
return {
|
||||
"total_cards": len(cards),
|
||||
"rarity_distribution": rarity_count,
|
||||
"average_power": total_power / len(cards) if cards else 0,
|
||||
"strongest_card": max(cards, key=lambda x: x.get("power", 0)) if cards else None
|
||||
}
|
101
src/aigpt/cli.py
101
src/aigpt/cli.py
@ -9,7 +9,7 @@ from rich.panel import Panel
|
||||
from datetime import datetime, timedelta
|
||||
import subprocess
|
||||
import shlex
|
||||
from prompt_toolkit import prompt
|
||||
from prompt_toolkit import prompt as ptk_prompt
|
||||
from prompt_toolkit.completion import WordCompleter
|
||||
from prompt_toolkit.history import FileHistory
|
||||
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
|
||||
@ -228,7 +228,8 @@ def server(
|
||||
port: int = typer.Option(8000, "--port", "-p", help="Server port"),
|
||||
data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory"),
|
||||
model: str = typer.Option("qwen2.5", "--model", "-m", help="AI model to use"),
|
||||
provider: str = typer.Option("ollama", "--provider", help="AI provider (ollama/openai)")
|
||||
provider: str = typer.Option("ollama", "--provider", help="AI provider (ollama/openai)"),
|
||||
enable_card: bool = typer.Option(False, "--enable-card", help="Enable ai.card integration")
|
||||
):
|
||||
"""Run MCP server for AI integration"""
|
||||
import uvicorn
|
||||
@ -239,15 +240,16 @@ def server(
|
||||
data_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create MCP server
|
||||
mcp_server = AIGptMcpServer(data_dir)
|
||||
app_instance = mcp_server.get_server().get_app()
|
||||
mcp_server = AIGptMcpServer(data_dir, enable_card_integration=enable_card)
|
||||
app_instance = mcp_server.app
|
||||
|
||||
console.print(Panel(
|
||||
f"[cyan]Starting ai.gpt MCP Server[/cyan]\n\n"
|
||||
f"Host: {host}:{port}\n"
|
||||
f"Provider: {provider}\n"
|
||||
f"Model: {model}\n"
|
||||
f"Data: {data_dir}",
|
||||
f"Data: {data_dir}\n"
|
||||
f"Card Integration: {'✓ Enabled' if enable_card else '✗ Disabled'}",
|
||||
title="MCP Server",
|
||||
border_style="green"
|
||||
))
|
||||
@ -410,12 +412,22 @@ def shell(
|
||||
border_style="green"
|
||||
))
|
||||
|
||||
# Command completer
|
||||
commands = ['help', 'exit', 'quit', 'chat', 'status', 'clear', 'fortune', 'relationships']
|
||||
completer = WordCompleter(commands)
|
||||
# Command completer with shell commands
|
||||
builtin_commands = ['help', 'exit', 'quit', 'chat', 'status', 'clear', 'fortune', 'relationships', 'load']
|
||||
|
||||
# Add common shell commands
|
||||
shell_commands = ['ls', 'cd', 'pwd', 'cat', 'echo', 'grep', 'find', 'mkdir', 'rm', 'cp', 'mv',
|
||||
'git', 'python', 'pip', 'npm', 'node', 'cargo', 'rustc', 'docker', 'kubectl']
|
||||
|
||||
# AI-specific commands
|
||||
ai_commands = ['analyze', 'generate', 'explain', 'optimize', 'refactor', 'test', 'document']
|
||||
|
||||
all_commands = builtin_commands + ['!' + cmd for cmd in shell_commands] + ai_commands
|
||||
completer = WordCompleter(all_commands, ignore_case=True)
|
||||
|
||||
# History file
|
||||
history_file = data_dir / "shell_history.txt"
|
||||
actual_data_dir = data_dir if data_dir else DEFAULT_DATA_DIR
|
||||
history_file = actual_data_dir / "shell_history.txt"
|
||||
history = FileHistory(str(history_file))
|
||||
|
||||
# Main shell loop
|
||||
@ -424,7 +436,7 @@ def shell(
|
||||
while True:
|
||||
try:
|
||||
# Get input with completion
|
||||
user_input = prompt(
|
||||
user_input = ptk_prompt(
|
||||
"ai.shell> ",
|
||||
completer=completer,
|
||||
history=history,
|
||||
@ -450,7 +462,12 @@ def shell(
|
||||
" status - Show AI status\n"
|
||||
" fortune - Check AI fortune\n"
|
||||
" relationships - List all relationships\n"
|
||||
" clear - Clear the screen\n\n"
|
||||
" clear - Clear the screen\n"
|
||||
" load - Load aishell.md project file\n\n"
|
||||
"[cyan]AI Commands:[/cyan]\n"
|
||||
" analyze <file> - Analyze a file with AI\n"
|
||||
" generate <desc> - Generate code from description\n"
|
||||
" explain <topic> - Get AI explanation\n\n"
|
||||
"You can also type any message to chat with AI\n"
|
||||
"Use Tab for command completion",
|
||||
title="Help",
|
||||
@ -512,6 +529,68 @@ def shell(
|
||||
else:
|
||||
console.print("[yellow]No relationships yet[/yellow]")
|
||||
|
||||
# Load aishell.md command
|
||||
elif user_input.lower() in ['load', 'load aishell.md', 'project']:
|
||||
# Try to find and load aishell.md
|
||||
search_paths = [
|
||||
Path.cwd() / "aishell.md",
|
||||
Path.cwd() / "docs" / "aishell.md",
|
||||
actual_data_dir.parent / "aishell.md",
|
||||
Path.cwd() / "claude.md", # Also check for claude.md
|
||||
]
|
||||
|
||||
loaded = False
|
||||
for path in search_paths:
|
||||
if path.exists():
|
||||
console.print(f"[cyan]Loading project file: {path}[/cyan]")
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
# Process with AI to understand project
|
||||
load_prompt = f"I've loaded the project specification. Please analyze it and understand the project goals:\n\n{content[:3000]}"
|
||||
response, _ = persona.process_interaction(current_user, load_prompt, ai_provider)
|
||||
console.print(f"\n[green]Project loaded successfully![/green]")
|
||||
console.print(f"[cyan]AI Understanding:[/cyan]\n{response}")
|
||||
loaded = True
|
||||
break
|
||||
|
||||
if not loaded:
|
||||
console.print("[yellow]No aishell.md or claude.md found in project.[/yellow]")
|
||||
console.print("Create aishell.md to define project goals and AI instructions.")
|
||||
|
||||
# AI-powered commands
|
||||
elif user_input.lower().startswith('analyze '):
|
||||
# Analyze file or code
|
||||
target = user_input[8:].strip()
|
||||
if os.path.exists(target):
|
||||
console.print(f"[cyan]Analyzing {target}...[/cyan]")
|
||||
with open(target, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
analysis_prompt = f"Analyze this file and provide insights:\n\n{content[:2000]}"
|
||||
response, _ = persona.process_interaction(current_user, analysis_prompt, ai_provider)
|
||||
console.print(f"\n[cyan]Analysis:[/cyan]\n{response}")
|
||||
else:
|
||||
console.print(f"[red]File not found: {target}[/red]")
|
||||
|
||||
elif user_input.lower().startswith('generate '):
|
||||
# Generate code
|
||||
gen_prompt = user_input[9:].strip()
|
||||
if gen_prompt:
|
||||
console.print("[cyan]Generating code...[/cyan]")
|
||||
full_prompt = f"Generate code for: {gen_prompt}. Provide clean, well-commented code."
|
||||
response, _ = persona.process_interaction(current_user, full_prompt, ai_provider)
|
||||
console.print(f"\n[cyan]Generated Code:[/cyan]\n{response}")
|
||||
|
||||
elif user_input.lower().startswith('explain '):
|
||||
# Explain code or concept
|
||||
topic = user_input[8:].strip()
|
||||
if topic:
|
||||
console.print(f"[cyan]Explaining {topic}...[/cyan]")
|
||||
full_prompt = f"Explain this in detail: {topic}"
|
||||
response, _ = persona.process_interaction(current_user, full_prompt, ai_provider)
|
||||
console.print(f"\n[cyan]Explanation:[/cyan]\n{response}")
|
||||
|
||||
# Chat command or direct message
|
||||
else:
|
||||
# Remove 'chat' prefix if present
|
||||
|
@ -1,12 +1,18 @@
|
||||
"""MCP Server for ai.gpt system"""
|
||||
|
||||
from typing import Optional, List, Dict, Any
|
||||
from fastapi_mcp import FastapiMcpServer
|
||||
from fastapi_mcp import FastApiMCP
|
||||
from fastapi import FastAPI
|
||||
from pathlib import Path
|
||||
import logging
|
||||
import subprocess
|
||||
import os
|
||||
import shlex
|
||||
from .ai_provider import create_ai_provider
|
||||
|
||||
from .persona import Persona
|
||||
from .models import Memory, Relationship, PersonaState
|
||||
from .card_integration import CardIntegration, register_card_tools
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -14,16 +20,29 @@ logger = logging.getLogger(__name__)
|
||||
class AIGptMcpServer:
|
||||
"""MCP Server that exposes ai.gpt functionality to AI assistants"""
|
||||
|
||||
def __init__(self, data_dir: Path):
|
||||
def __init__(self, data_dir: Path, enable_card_integration: bool = False):
|
||||
self.data_dir = data_dir
|
||||
self.persona = Persona(data_dir)
|
||||
self.server = FastapiMcpServer("ai-gpt", "AI.GPT Memory and Relationship System")
|
||||
|
||||
# Create FastAPI app
|
||||
self.app = FastAPI(
|
||||
title="AI.GPT Memory and Relationship System",
|
||||
description="MCP server for ai.gpt system"
|
||||
)
|
||||
|
||||
# Create MCP server with FastAPI app
|
||||
self.server = FastApiMCP(self.app)
|
||||
self.card_integration = None
|
||||
|
||||
if enable_card_integration:
|
||||
self.card_integration = CardIntegration()
|
||||
|
||||
self._register_tools()
|
||||
|
||||
def _register_tools(self):
|
||||
"""Register all MCP tools"""
|
||||
|
||||
@self.server.tool("get_memories")
|
||||
@self.app.get("/get_memories", operation_id="get_memories")
|
||||
async def get_memories(user_id: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]:
|
||||
"""Get active memories from the AI's memory system"""
|
||||
memories = self.persona.memory.get_active_memories(limit=limit)
|
||||
@ -39,7 +58,7 @@ class AIGptMcpServer:
|
||||
for mem in memories
|
||||
]
|
||||
|
||||
@self.server.tool("get_relationship")
|
||||
@self.app.get("/get_relationship", operation_id="get_relationship")
|
||||
async def get_relationship(user_id: str) -> Dict[str, Any]:
|
||||
"""Get relationship status with a specific user"""
|
||||
rel = self.persona.relationships.get_or_create_relationship(user_id)
|
||||
@ -53,7 +72,7 @@ class AIGptMcpServer:
|
||||
"last_interaction": rel.last_interaction.isoformat() if rel.last_interaction else None
|
||||
}
|
||||
|
||||
@self.server.tool("get_all_relationships")
|
||||
@self.app.get("/get_all_relationships", operation_id="get_all_relationships")
|
||||
async def get_all_relationships() -> List[Dict[str, Any]]:
|
||||
"""Get all relationships"""
|
||||
relationships = []
|
||||
@ -67,7 +86,7 @@ class AIGptMcpServer:
|
||||
})
|
||||
return relationships
|
||||
|
||||
@self.server.tool("get_persona_state")
|
||||
@self.app.get("/get_persona_state", operation_id="get_persona_state")
|
||||
async def get_persona_state() -> Dict[str, Any]:
|
||||
"""Get current persona state including fortune and mood"""
|
||||
state = self.persona.get_current_state()
|
||||
@ -82,7 +101,7 @@ class AIGptMcpServer:
|
||||
"active_memory_count": len(state.active_memories)
|
||||
}
|
||||
|
||||
@self.server.tool("process_interaction")
|
||||
@self.app.post("/process_interaction", operation_id="process_interaction")
|
||||
async def process_interaction(user_id: str, message: str) -> Dict[str, Any]:
|
||||
"""Process an interaction with a user"""
|
||||
response, relationship_delta = self.persona.process_interaction(user_id, message)
|
||||
@ -96,7 +115,7 @@ class AIGptMcpServer:
|
||||
"relationship_status": rel.status.value
|
||||
}
|
||||
|
||||
@self.server.tool("check_transmission_eligibility")
|
||||
@self.app.get("/check_transmission_eligibility", operation_id="check_transmission_eligibility")
|
||||
async def check_transmission_eligibility(user_id: str) -> Dict[str, Any]:
|
||||
"""Check if AI can transmit to a specific user"""
|
||||
can_transmit = self.persona.can_transmit_to(user_id)
|
||||
@ -110,7 +129,7 @@ class AIGptMcpServer:
|
||||
"transmission_enabled": rel.transmission_enabled
|
||||
}
|
||||
|
||||
@self.server.tool("get_fortune")
|
||||
@self.app.get("/get_fortune", operation_id="get_fortune")
|
||||
async def get_fortune() -> Dict[str, Any]:
|
||||
"""Get today's AI fortune"""
|
||||
fortune = self.persona.fortune_system.get_today_fortune()
|
||||
@ -125,7 +144,7 @@ class AIGptMcpServer:
|
||||
"personality_modifiers": modifiers
|
||||
}
|
||||
|
||||
@self.server.tool("summarize_memories")
|
||||
@self.app.post("/summarize_memories", operation_id="summarize_memories")
|
||||
async def summarize_memories(user_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Create a summary of recent memories for a user"""
|
||||
summary = self.persona.memory.summarize_memories(user_id)
|
||||
@ -138,12 +157,162 @@ class AIGptMcpServer:
|
||||
}
|
||||
return None
|
||||
|
||||
@self.server.tool("run_maintenance")
|
||||
@self.app.post("/run_maintenance", operation_id="run_maintenance")
|
||||
async def run_maintenance() -> Dict[str, str]:
|
||||
"""Run daily maintenance tasks"""
|
||||
self.persona.daily_maintenance()
|
||||
return {"status": "Maintenance completed successfully"}
|
||||
|
||||
# Shell integration tools (ai.shell)
|
||||
@self.app.post("/execute_command", operation_id="execute_command")
|
||||
async def execute_command(command: str, working_dir: str = ".") -> Dict[str, Any]:
|
||||
"""Execute a shell command"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
shlex.split(command),
|
||||
cwd=working_dir,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "success" if result.returncode == 0 else "error",
|
||||
"returncode": result.returncode,
|
||||
"stdout": result.stdout,
|
||||
"stderr": result.stderr,
|
||||
"command": command
|
||||
}
|
||||
except subprocess.TimeoutExpired:
|
||||
return {"error": "Command timed out"}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
@self.app.post("/analyze_file", operation_id="analyze_file")
|
||||
async def analyze_file(file_path: str, analysis_prompt: str = "Analyze this file") -> Dict[str, Any]:
|
||||
"""Analyze a file using AI"""
|
||||
try:
|
||||
if not os.path.exists(file_path):
|
||||
return {"error": f"File not found: {file_path}"}
|
||||
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
# Get AI provider from app state
|
||||
ai_provider = getattr(self.app.state, 'ai_provider', 'ollama')
|
||||
ai_model = getattr(self.app.state, 'ai_model', 'qwen2.5')
|
||||
|
||||
provider = create_ai_provider(ai_provider, ai_model)
|
||||
|
||||
# Analyze with AI
|
||||
prompt = f"{analysis_prompt}\n\nFile: {file_path}\n\nContent:\n{content}"
|
||||
analysis = provider.generate_response(prompt, "You are a code analyst.")
|
||||
|
||||
return {
|
||||
"analysis": analysis,
|
||||
"file_path": file_path,
|
||||
"file_size": len(content),
|
||||
"line_count": len(content.split('\n'))
|
||||
}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
@self.app.post("/write_file", operation_id="write_file")
|
||||
async def write_file(file_path: str, content: str, backup: bool = True) -> Dict[str, Any]:
|
||||
"""Write content to a file"""
|
||||
try:
|
||||
file_path_obj = Path(file_path)
|
||||
|
||||
# Create backup if requested
|
||||
backup_path = None
|
||||
if backup and file_path_obj.exists():
|
||||
backup_path = f"{file_path}.backup"
|
||||
with open(file_path, 'r', encoding='utf-8') as src:
|
||||
with open(backup_path, 'w', encoding='utf-8') as dst:
|
||||
dst.write(src.read())
|
||||
|
||||
# Write file
|
||||
file_path_obj.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"file_path": file_path,
|
||||
"backup_path": backup_path,
|
||||
"bytes_written": len(content.encode('utf-8'))
|
||||
}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
@self.app.get("/read_project_file", operation_id="read_project_file")
|
||||
async def read_project_file(file_name: str = "aishell.md") -> Dict[str, Any]:
|
||||
"""Read project files like aishell.md (similar to claude.md)"""
|
||||
try:
|
||||
# Check common locations
|
||||
search_paths = [
|
||||
Path.cwd() / file_name,
|
||||
Path.cwd() / "docs" / file_name,
|
||||
self.data_dir.parent / file_name,
|
||||
]
|
||||
|
||||
for path in search_paths:
|
||||
if path.exists():
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
return {
|
||||
"content": content,
|
||||
"path": str(path),
|
||||
"exists": True
|
||||
}
|
||||
|
||||
return {
|
||||
"exists": False,
|
||||
"searched_paths": [str(p) for p in search_paths],
|
||||
"error": f"{file_name} not found"
|
||||
}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
@self.app.get("/list_files", operation_id="list_files")
|
||||
async def list_files(directory: str = ".", pattern: str = "*") -> Dict[str, Any]:
|
||||
"""List files in a directory"""
|
||||
try:
|
||||
dir_path = Path(directory)
|
||||
if not dir_path.exists():
|
||||
return {"error": f"Directory not found: {directory}"}
|
||||
|
||||
files = []
|
||||
for item in dir_path.glob(pattern):
|
||||
files.append({
|
||||
"name": item.name,
|
||||
"path": str(item),
|
||||
"is_file": item.is_file(),
|
||||
"is_dir": item.is_dir(),
|
||||
"size": item.stat().st_size if item.is_file() else None
|
||||
})
|
||||
|
||||
return {
|
||||
"directory": directory,
|
||||
"pattern": pattern,
|
||||
"files": files,
|
||||
"count": len(files)
|
||||
}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
# Register ai.card tools if integration is enabled
|
||||
if self.card_integration:
|
||||
register_card_tools(self.app, self.card_integration)
|
||||
|
||||
# Mount MCP server
|
||||
self.server.mount()
|
||||
|
||||
def get_server(self) -> FastapiMcpServer:
|
||||
def get_server(self) -> FastApiMCP:
|
||||
"""Get the FastAPI MCP server instance"""
|
||||
return self.server
|
||||
return self.server
|
||||
|
||||
async def close(self):
|
||||
"""Cleanup resources"""
|
||||
if self.card_integration:
|
||||
await self.card_integration.close()
|
146
src/aigpt/mcp_server_simple.py
Normal file
146
src/aigpt/mcp_server_simple.py
Normal file
@ -0,0 +1,146 @@
|
||||
"""Simple MCP Server implementation for ai.gpt"""
|
||||
|
||||
from mcp import Server
|
||||
from mcp.types import Tool, TextContent
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
import json
|
||||
|
||||
from .persona import Persona
|
||||
from .ai_provider import create_ai_provider
|
||||
import subprocess
|
||||
import os
|
||||
|
||||
|
||||
def create_mcp_server(data_dir: Path, enable_card: bool = False) -> Server:
|
||||
"""Create MCP server with ai.gpt tools"""
|
||||
server = Server("aigpt")
|
||||
persona = Persona(data_dir)
|
||||
|
||||
@server.tool()
|
||||
async def get_memories(limit: int = 10) -> List[Dict[str, Any]]:
|
||||
"""Get active memories from the AI's memory system"""
|
||||
memories = persona.memory.get_active_memories(limit=limit)
|
||||
return [
|
||||
{
|
||||
"id": mem.id,
|
||||
"content": mem.content,
|
||||
"level": mem.level.value,
|
||||
"importance": mem.importance_score,
|
||||
"is_core": mem.is_core,
|
||||
"timestamp": mem.timestamp.isoformat()
|
||||
}
|
||||
for mem in memories
|
||||
]
|
||||
|
||||
@server.tool()
|
||||
async def get_relationship(user_id: str) -> Dict[str, Any]:
|
||||
"""Get relationship status with a specific user"""
|
||||
rel = persona.relationships.get_or_create_relationship(user_id)
|
||||
return {
|
||||
"user_id": rel.user_id,
|
||||
"status": rel.status.value,
|
||||
"score": rel.score,
|
||||
"transmission_enabled": rel.transmission_enabled,
|
||||
"is_broken": rel.is_broken,
|
||||
"total_interactions": rel.total_interactions,
|
||||
"last_interaction": rel.last_interaction.isoformat() if rel.last_interaction else None
|
||||
}
|
||||
|
||||
@server.tool()
|
||||
async def process_interaction(user_id: str, message: str, provider: str = "ollama", model: str = "qwen2.5") -> Dict[str, Any]:
|
||||
"""Process an interaction with a user"""
|
||||
ai_provider = create_ai_provider(provider, model)
|
||||
response, relationship_delta = persona.process_interaction(user_id, message, ai_provider)
|
||||
rel = persona.relationships.get_or_create_relationship(user_id)
|
||||
|
||||
return {
|
||||
"response": response,
|
||||
"relationship_delta": relationship_delta,
|
||||
"new_relationship_score": rel.score,
|
||||
"transmission_enabled": rel.transmission_enabled,
|
||||
"relationship_status": rel.status.value
|
||||
}
|
||||
|
||||
@server.tool()
|
||||
async def get_fortune() -> Dict[str, Any]:
|
||||
"""Get today's AI fortune"""
|
||||
fortune = persona.fortune_system.get_today_fortune()
|
||||
modifiers = persona.fortune_system.get_personality_modifier(fortune)
|
||||
|
||||
return {
|
||||
"value": fortune.fortune_value,
|
||||
"date": fortune.date.isoformat(),
|
||||
"consecutive_good": fortune.consecutive_good,
|
||||
"consecutive_bad": fortune.consecutive_bad,
|
||||
"breakthrough": fortune.breakthrough_triggered,
|
||||
"personality_modifiers": modifiers
|
||||
}
|
||||
|
||||
@server.tool()
|
||||
async def execute_command(command: str, working_dir: str = ".") -> Dict[str, Any]:
|
||||
"""Execute a shell command"""
|
||||
try:
|
||||
import shlex
|
||||
result = subprocess.run(
|
||||
shlex.split(command),
|
||||
cwd=working_dir,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "success" if result.returncode == 0 else "error",
|
||||
"returncode": result.returncode,
|
||||
"stdout": result.stdout,
|
||||
"stderr": result.stderr,
|
||||
"command": command
|
||||
}
|
||||
except subprocess.TimeoutExpired:
|
||||
return {"error": "Command timed out"}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
@server.tool()
|
||||
async def analyze_file(file_path: str) -> Dict[str, Any]:
|
||||
"""Analyze a file using AI"""
|
||||
try:
|
||||
if not os.path.exists(file_path):
|
||||
return {"error": f"File not found: {file_path}"}
|
||||
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
ai_provider = create_ai_provider("ollama", "qwen2.5")
|
||||
|
||||
prompt = f"Analyze this file and provide insights:\\n\\nFile: {file_path}\\n\\nContent:\\n{content[:2000]}"
|
||||
analysis = ai_provider.generate_response(prompt, "You are a code analyst.")
|
||||
|
||||
return {
|
||||
"analysis": analysis,
|
||||
"file_path": file_path,
|
||||
"file_size": len(content),
|
||||
"line_count": len(content.split('\\n'))
|
||||
}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
return server
|
||||
|
||||
|
||||
async def main():
|
||||
"""Run MCP server"""
|
||||
import sys
|
||||
from mcp import stdio_server
|
||||
|
||||
data_dir = Path.home() / ".config" / "syui" / "ai" / "gpt" / "data"
|
||||
data_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
server = create_mcp_server(data_dir)
|
||||
await stdio_server(server)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
asyncio.run(main())
|
@ -1,6 +1,6 @@
|
||||
"""Data models for ai.gpt system"""
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import datetime, date
|
||||
from typing import Optional, Dict, List, Any
|
||||
from enum import Enum
|
||||
from pydantic import BaseModel, Field
|
||||
@ -52,7 +52,7 @@ class Relationship(BaseModel):
|
||||
|
||||
class AIFortune(BaseModel):
|
||||
"""Daily AI fortune affecting personality"""
|
||||
date: datetime.date
|
||||
date: date
|
||||
fortune_value: int = Field(ge=1, le=10)
|
||||
consecutive_good: int = 0
|
||||
consecutive_bad: int = 0
|
||||
|
Reference in New Issue
Block a user