update ai.shell

This commit is contained in:
2025-06-03 01:01:28 +09:00
parent b642588696
commit cba52b6171
17 changed files with 2670 additions and 294 deletions

View File

@ -2,7 +2,7 @@ README.md
pyproject.toml
src/aigpt/__init__.py
src/aigpt/ai_provider.py
src/aigpt/card_integration.py
src/aigpt/chatgpt_importer.py
src/aigpt/cli.py
src/aigpt/config.py
src/aigpt/fortune.py

View File

@ -30,11 +30,16 @@ class AIProvider(Protocol):
class OllamaProvider:
"""Ollama AI provider"""
def __init__(self, model: str = "qwen2.5", host: str = "http://localhost:11434"):
def __init__(self, model: str = "qwen2.5", host: Optional[str] = None):
self.model = model
self.host = host
self.client = ollama.Client(host=host)
# Use environment variable OLLAMA_HOST if available, otherwise use config or default
self.host = host or os.getenv('OLLAMA_HOST', 'http://127.0.0.1:11434')
# Ensure proper URL format
if not self.host.startswith('http'):
self.host = f'http://{self.host}'
self.client = ollama.Client(host=self.host, timeout=60.0) # 60秒タイムアウト
self.logger = logging.getLogger(__name__)
self.logger.info(f"OllamaProvider initialized with host: {self.host}, model: {self.model}")
async def generate_response(
self,
@ -81,6 +86,26 @@ Recent memories:
self.logger.error(f"Ollama generation failed: {e}")
return self._fallback_response(persona_state)
def chat(self, prompt: str, max_tokens: int = 200) -> str:
"""Simple chat interface"""
try:
response = self.client.chat(
model=self.model,
messages=[
{"role": "user", "content": prompt}
],
options={
"num_predict": max_tokens,
"temperature": 0.7,
"top_p": 0.9,
},
stream=False # ストリーミング無効化で安定性向上
)
return response['message']['content']
except Exception as e:
self.logger.error(f"Ollama chat failed (host: {self.host}): {e}")
return "I'm having trouble connecting to the AI model."
def _fallback_response(self, persona_state: PersonaState) -> str:
"""Fallback response based on mood"""
mood_responses = {
@ -162,9 +187,19 @@ Recent memories:
return mood_responses.get(persona_state.current_mood, "I see.")
def create_ai_provider(provider: str, model: str, **kwargs) -> AIProvider:
def create_ai_provider(provider: str = "ollama", model: str = "qwen2.5", **kwargs) -> AIProvider:
"""Factory function to create AI providers"""
if provider == "ollama":
# Try to get host from config if not provided in kwargs
if 'host' not in kwargs:
try:
from .config import Config
config = Config()
config_host = config.get('providers.ollama.host')
if config_host:
kwargs['host'] = config_host
except:
pass # Use environment variable or default
return OllamaProvider(model=model, **kwargs)
elif provider == "openai":
return OpenAIProvider(model=model, **kwargs)

View File

@ -1,150 +0,0 @@
"""ai.card integration module for ai.gpt MCP server"""
from typing import Dict, Any, List, Optional
import httpx
from pathlib import Path
import json
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class CardIntegration:
"""Integration with ai.card system"""
def __init__(self, api_base_url: str = "http://localhost:8001"):
self.api_base_url = api_base_url
self.client = httpx.AsyncClient()
async def get_user_cards(self, did: str) -> List[Dict[str, Any]]:
"""Get cards for a specific user by DID"""
try:
response = await self.client.get(
f"{self.api_base_url}/api/v1/cards/user/{did}"
)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Failed to get cards: {response.status_code}")
return []
except Exception as e:
logger.error(f"Error getting user cards: {e}")
return []
async def draw_card(self, did: str) -> Optional[Dict[str, Any]]:
"""Draw a new card for user (gacha)"""
try:
response = await self.client.post(
f"{self.api_base_url}/api/v1/gacha/draw",
json={"did": did}
)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Failed to draw card: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error drawing card: {e}")
return None
async def get_card_info(self, card_id: int) -> Optional[Dict[str, Any]]:
"""Get detailed information about a specific card"""
try:
response = await self.client.get(
f"{self.api_base_url}/api/v1/cards/{card_id}"
)
if response.status_code == 200:
return response.json()
else:
return None
except Exception as e:
logger.error(f"Error getting card info: {e}")
return None
async def sync_with_atproto(self, did: str) -> bool:
"""Sync card data with atproto"""
try:
response = await self.client.post(
f"{self.api_base_url}/api/v1/sync/atproto",
json={"did": did}
)
return response.status_code == 200
except Exception as e:
logger.error(f"Error syncing with atproto: {e}")
return False
async def close(self):
"""Close the HTTP client"""
await self.client.aclose()
def register_card_tools(app, card_integration: CardIntegration):
"""Register ai.card tools to FastAPI app"""
@app.get("/get_user_cards", operation_id="get_user_cards")
async def get_user_cards(did: str) -> List[Dict[str, Any]]:
"""Get all cards owned by a user"""
cards = await card_integration.get_user_cards(did)
return cards
@app.post("/draw_card", operation_id="draw_card")
async def draw_card(did: str) -> Dict[str, Any]:
"""Draw a new card (gacha) for user"""
result = await card_integration.draw_card(did)
if result:
return {
"success": True,
"card": result
}
else:
return {
"success": False,
"error": "Failed to draw card"
}
@app.get("/get_card_details", operation_id="get_card_details")
async def get_card_details(card_id: int) -> Dict[str, Any]:
"""Get detailed information about a card"""
info = await card_integration.get_card_info(card_id)
if info:
return info
else:
return {"error": f"Card {card_id} not found"}
@app.post("/sync_cards_atproto", operation_id="sync_cards_atproto")
async def sync_cards_atproto(did: str) -> Dict[str, str]:
"""Sync user's cards with atproto"""
success = await card_integration.sync_with_atproto(did)
if success:
return {"status": "Cards synced successfully"}
else:
return {"status": "Failed to sync cards"}
@app.get("/analyze_card_collection", operation_id="analyze_card_collection")
async def analyze_card_collection(did: str) -> Dict[str, Any]:
"""Analyze user's card collection"""
cards = await card_integration.get_user_cards(did)
if not cards:
return {
"total_cards": 0,
"rarity_distribution": {},
"message": "No cards found"
}
# Analyze collection
rarity_count = {}
total_power = 0
for card in cards:
rarity = card.get("rarity", "common")
rarity_count[rarity] = rarity_count.get(rarity, 0) + 1
total_power += card.get("power", 0)
return {
"total_cards": len(cards),
"rarity_distribution": rarity_count,
"average_power": total_power / len(cards) if cards else 0,
"strongest_card": max(cards, key=lambda x: x.get("power", 0)) if cards else None
}

View File

@ -0,0 +1,192 @@
"""ChatGPT conversation data importer for ai.gpt"""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional
import logging
from .models import Memory, MemoryLevel, Conversation
from .memory import MemoryManager
from .relationship import RelationshipTracker
logger = logging.getLogger(__name__)
class ChatGPTImporter:
"""Import ChatGPT conversation data into ai.gpt memory system"""
def __init__(self, data_dir: Path):
self.data_dir = data_dir
self.memory_manager = MemoryManager(data_dir)
self.relationship_tracker = RelationshipTracker(data_dir)
def import_from_file(self, file_path: Path, user_id: str = "chatgpt_user") -> Dict[str, Any]:
"""Import ChatGPT conversations from JSON file
Args:
file_path: Path to ChatGPT export JSON file
user_id: User ID to associate with imported conversations
Returns:
Dict with import statistics
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
chatgpt_data = json.load(f)
return self._import_conversations(chatgpt_data, user_id)
except Exception as e:
logger.error(f"Failed to import ChatGPT data: {e}")
raise
def _import_conversations(self, chatgpt_data: List[Dict], user_id: str) -> Dict[str, Any]:
"""Import multiple conversations from ChatGPT data"""
stats = {
"conversations_imported": 0,
"messages_imported": 0,
"user_messages": 0,
"assistant_messages": 0,
"skipped_messages": 0
}
for conversation_data in chatgpt_data:
try:
conv_stats = self._import_single_conversation(conversation_data, user_id)
# Update overall stats
stats["conversations_imported"] += 1
stats["messages_imported"] += conv_stats["messages"]
stats["user_messages"] += conv_stats["user_messages"]
stats["assistant_messages"] += conv_stats["assistant_messages"]
stats["skipped_messages"] += conv_stats["skipped"]
except Exception as e:
logger.warning(f"Failed to import conversation '{conversation_data.get('title', 'Unknown')}': {e}")
continue
logger.info(f"Import completed: {stats}")
return stats
def _import_single_conversation(self, conversation_data: Dict, user_id: str) -> Dict[str, int]:
"""Import a single conversation from ChatGPT"""
title = conversation_data.get("title", "Untitled")
create_time = conversation_data.get("create_time")
mapping = conversation_data.get("mapping", {})
stats = {"messages": 0, "user_messages": 0, "assistant_messages": 0, "skipped": 0}
# Extract messages in chronological order
messages = self._extract_messages_from_mapping(mapping)
for msg in messages:
try:
role = msg["author"]["role"]
content = self._extract_content(msg["content"])
create_time_msg = msg.get("create_time")
if not content or role not in ["user", "assistant"]:
stats["skipped"] += 1
continue
# Convert to ai.gpt format
if role == "user":
# User message - create memory entry
self._add_user_message(user_id, content, create_time_msg, title)
stats["user_messages"] += 1
elif role == "assistant":
# Assistant message - create AI response memory
self._add_assistant_message(user_id, content, create_time_msg, title)
stats["assistant_messages"] += 1
stats["messages"] += 1
except Exception as e:
logger.warning(f"Failed to process message in '{title}': {e}")
stats["skipped"] += 1
continue
logger.info(f"Imported conversation '{title}': {stats}")
return stats
def _extract_messages_from_mapping(self, mapping: Dict) -> List[Dict]:
"""Extract messages from ChatGPT mapping structure in chronological order"""
messages = []
for node_id, node_data in mapping.items():
message = node_data.get("message")
if message and message.get("author", {}).get("role") in ["user", "assistant"]:
# Skip system messages and hidden messages
metadata = message.get("metadata", {})
if not metadata.get("is_visually_hidden_from_conversation", False):
messages.append(message)
# Sort by create_time if available
messages.sort(key=lambda x: x.get("create_time") or 0)
return messages
def _extract_content(self, content_data: Dict) -> Optional[str]:
"""Extract text content from ChatGPT content structure"""
if not content_data:
return None
content_type = content_data.get("content_type")
if content_type == "text":
parts = content_data.get("parts", [])
if parts and parts[0]:
return parts[0].strip()
elif content_type == "user_editable_context":
# User context/instructions
user_instructions = content_data.get("user_instructions", "")
if user_instructions:
return f"[User Context] {user_instructions}"
return None
def _add_user_message(self, user_id: str, content: str, create_time: Optional[float], conversation_title: str):
"""Add user message to ai.gpt memory system"""
timestamp = datetime.fromtimestamp(create_time) if create_time else datetime.now()
# Create conversation record
conversation = Conversation(
id=str(uuid.uuid4()),
user_id=user_id,
user_message=content,
ai_response="", # Will be filled by next assistant message
timestamp=timestamp,
context={"source": "chatgpt_import", "conversation_title": conversation_title}
)
# Add to memory with CORE level (imported data is important)
memory = Memory(
id=str(uuid.uuid4()),
timestamp=timestamp,
content=content,
level=MemoryLevel.CORE,
importance_score=0.8 # High importance for imported data
)
self.memory_manager.add_memory(memory)
# Update relationship (positive interaction)
self.relationship_tracker.update_interaction(user_id, 1.0)
def _add_assistant_message(self, user_id: str, content: str, create_time: Optional[float], conversation_title: str):
"""Add assistant message to ai.gpt memory system"""
timestamp = datetime.fromtimestamp(create_time) if create_time else datetime.now()
# Add assistant response as memory (AI's own responses can inform future behavior)
memory = Memory(
id=str(uuid.uuid4()),
timestamp=timestamp,
content=f"[AI Response] {content}",
level=MemoryLevel.SUMMARY,
importance_score=0.6 # Medium importance for AI responses
)
self.memory_manager.add_memory(memory)

View File

@ -20,6 +20,7 @@ from .mcp_server import AIGptMcpServer
from .ai_provider import create_ai_provider
from .scheduler import AIScheduler, TaskType
from .config import Config
from .project_manager import ContinuousDeveloper
app = typer.Typer(help="ai.gpt - Autonomous transmission AI with unique personality")
console = Console()
@ -53,7 +54,7 @@ def chat(
ai_provider = None
if provider and model:
try:
ai_provider = create_ai_provider(provider, model)
ai_provider = create_ai_provider(provider=provider, model=model)
console.print(f"[dim]Using {provider} with model {model}[/dim]\n")
except Exception as e:
console.print(f"[yellow]Warning: Could not create AI provider: {e}[/yellow]")
@ -228,8 +229,7 @@ def server(
port: int = typer.Option(8000, "--port", "-p", help="Server port"),
data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory"),
model: str = typer.Option("qwen2.5", "--model", "-m", help="AI model to use"),
provider: str = typer.Option("ollama", "--provider", help="AI provider (ollama/openai)"),
enable_card: bool = typer.Option(False, "--enable-card", help="Enable ai.card integration")
provider: str = typer.Option("ollama", "--provider", help="AI provider (ollama/openai)")
):
"""Run MCP server for AI integration"""
import uvicorn
@ -240,7 +240,7 @@ def server(
data_dir.mkdir(parents=True, exist_ok=True)
# Create MCP server
mcp_server = AIGptMcpServer(data_dir, enable_card_integration=enable_card)
mcp_server = AIGptMcpServer(data_dir)
app_instance = mcp_server.app
console.print(Panel(
@ -248,8 +248,7 @@ def server(
f"Host: {host}:{port}\n"
f"Provider: {provider}\n"
f"Model: {model}\n"
f"Data: {data_dir}\n"
f"Card Integration: {'✓ Enabled' if enable_card else '✗ Disabled'}",
f"Data: {data_dir}",
title="MCP Server",
border_style="green"
))
@ -390,7 +389,7 @@ def shell(
ai_provider = None
if provider and model:
try:
ai_provider = create_ai_provider(provider, model)
ai_provider = create_ai_provider(provider=provider, model=model)
console.print(f"[dim]Using {provider} with model {model}[/dim]\n")
except Exception as e:
console.print(f"[yellow]Warning: Could not create AI provider: {e}[/yellow]")
@ -422,7 +421,13 @@ def shell(
# AI-specific commands
ai_commands = ['analyze', 'generate', 'explain', 'optimize', 'refactor', 'test', 'document']
all_commands = builtin_commands + ['!' + cmd for cmd in shell_commands] + ai_commands
# Remote execution commands (ai.bot integration)
remote_commands = ['remote', 'isolated', 'aibot-status']
# Project management commands (Claude Code-like)
project_commands = ['project-status', 'suggest-next', 'continuous']
all_commands = builtin_commands + ['!' + cmd for cmd in shell_commands] + ai_commands + remote_commands + project_commands
completer = WordCompleter(all_commands, ignore_case=True)
# History file
@ -468,6 +473,14 @@ def shell(
" analyze <file> - Analyze a file with AI\n"
" generate <desc> - Generate code from description\n"
" explain <topic> - Get AI explanation\n\n"
"[cyan]Remote Commands (ai.bot):[/cyan]\n"
" remote <command> - Execute command in isolated container\n"
" isolated <code> - Run Python code in isolated environment\n"
" aibot-status - Check ai.bot server status\n\n"
"[cyan]Project Commands (Claude Code-like):[/cyan]\n"
" project-status - Analyze current project structure\n"
" suggest-next - AI suggests next development steps\n"
" continuous - Enable continuous development mode\n\n"
"You can also type any message to chat with AI\n"
"Use Tab for command completion",
title="Help",
@ -560,27 +573,38 @@ def shell(
# AI-powered commands
elif user_input.lower().startswith('analyze '):
# Analyze file or code
# Analyze file or code with project context
target = user_input[8:].strip()
if os.path.exists(target):
console.print(f"[cyan]Analyzing {target}...[/cyan]")
with open(target, 'r') as f:
content = f.read()
analysis_prompt = f"Analyze this file and provide insights:\n\n{content[:2000]}"
response, _ = persona.process_interaction(current_user, analysis_prompt, ai_provider)
console.print(f"\n[cyan]Analysis:[/cyan]\n{response}")
console.print(f"[cyan]Analyzing {target} with project context...[/cyan]")
try:
developer = ContinuousDeveloper(Path.cwd(), ai_provider)
analysis = developer.analyze_file(target)
console.print(f"\n[cyan]Analysis:[/cyan]\n{analysis}")
except Exception as e:
# Fallback to simple analysis
with open(target, 'r') as f:
content = f.read()
analysis_prompt = f"Analyze this file and provide insights:\n\n{content[:2000]}"
response, _ = persona.process_interaction(current_user, analysis_prompt, ai_provider)
console.print(f"\n[cyan]Analysis:[/cyan]\n{response}")
else:
console.print(f"[red]File not found: {target}[/red]")
elif user_input.lower().startswith('generate '):
# Generate code
# Generate code with project context
gen_prompt = user_input[9:].strip()
if gen_prompt:
console.print("[cyan]Generating code...[/cyan]")
full_prompt = f"Generate code for: {gen_prompt}. Provide clean, well-commented code."
response, _ = persona.process_interaction(current_user, full_prompt, ai_provider)
console.print(f"\n[cyan]Generated Code:[/cyan]\n{response}")
console.print("[cyan]Generating code with project context...[/cyan]")
try:
developer = ContinuousDeveloper(Path.cwd(), ai_provider)
generated_code = developer.generate_code(gen_prompt)
console.print(f"\n[cyan]Generated Code:[/cyan]\n{generated_code}")
except Exception as e:
# Fallback to simple generation
full_prompt = f"Generate code for: {gen_prompt}. Provide clean, well-commented code."
response, _ = persona.process_interaction(current_user, full_prompt, ai_provider)
console.print(f"\n[cyan]Generated Code:[/cyan]\n{response}")
elif user_input.lower().startswith('explain '):
# Explain code or concept
@ -591,6 +615,152 @@ def shell(
response, _ = persona.process_interaction(current_user, full_prompt, ai_provider)
console.print(f"\n[cyan]Explanation:[/cyan]\n{response}")
# Remote execution commands (ai.bot integration)
elif user_input.lower().startswith('remote '):
# Execute command in ai.bot isolated container
command = user_input[7:].strip()
if command:
console.print(f"[cyan]Executing remotely:[/cyan] {command}")
try:
import httpx
import asyncio
async def execute_remote():
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
"http://localhost:8080/sh",
json={"command": command},
headers={"Content-Type": "application/json"}
)
return response
response = asyncio.run(execute_remote())
if response.status_code == 200:
result = response.json()
console.print(f"[green]Output:[/green]\n{result.get('output', '')}")
if result.get('error'):
console.print(f"[red]Error:[/red] {result.get('error')}")
console.print(f"[dim]Exit code: {result.get('exit_code', 0)} | Execution time: {result.get('execution_time', 'N/A')}[/dim]")
else:
console.print(f"[red]ai.bot error: HTTP {response.status_code}[/red]")
except Exception as e:
console.print(f"[red]Failed to connect to ai.bot: {e}[/red]")
elif user_input.lower().startswith('isolated '):
# Execute Python code in isolated environment
code = user_input[9:].strip()
if code:
console.print(f"[cyan]Running Python code in isolated container...[/cyan]")
try:
import httpx
import asyncio
async def execute_python():
python_command = f'python3 -c "{code.replace('"', '\\"')}"'
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
"http://localhost:8080/sh",
json={"command": python_command},
headers={"Content-Type": "application/json"}
)
return response
response = asyncio.run(execute_python())
if response.status_code == 200:
result = response.json()
console.print(f"[green]Python Output:[/green]\n{result.get('output', '')}")
if result.get('error'):
console.print(f"[red]Error:[/red] {result.get('error')}")
else:
console.print(f"[red]ai.bot error: HTTP {response.status_code}[/red]")
except Exception as e:
console.print(f"[red]Failed to execute Python code: {e}[/red]")
elif user_input.lower() == 'aibot-status':
# Check ai.bot server status
console.print("[cyan]Checking ai.bot server status...[/cyan]")
try:
import httpx
import asyncio
async def check_status():
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get("http://localhost:8080/status")
return response
response = asyncio.run(check_status())
if response.status_code == 200:
result = response.json()
console.print(f"[green]ai.bot is online![/green]")
console.print(f"Server info: {result}")
else:
console.print(f"[yellow]ai.bot responded with status {response.status_code}[/yellow]")
except Exception as e:
console.print(f"[red]ai.bot is offline: {e}[/red]")
console.print("[dim]Make sure ai.bot is running on localhost:8080[/dim]")
# Project management commands (Claude Code-like)
elif user_input.lower() == 'project-status':
# プロジェクト構造分析
console.print("[cyan]Analyzing project structure...[/cyan]")
try:
developer = ContinuousDeveloper(Path.cwd(), ai_provider)
analysis = developer.analyze_project_structure()
changes = developer.project_state.detect_changes()
console.print(f"[green]Project Analysis:[/green]")
console.print(f"Language: {analysis['language']}")
console.print(f"Framework: {analysis['framework']}")
console.print(f"Structure: {analysis['structure']}")
console.print(f"Dependencies: {analysis['dependencies']}")
console.print(f"Code Patterns: {analysis['patterns']}")
if changes:
console.print(f"\n[yellow]Recent Changes:[/yellow]")
for file_path, change_type in changes.items():
console.print(f" {change_type}: {file_path}")
else:
console.print(f"\n[dim]No recent changes detected[/dim]")
except Exception as e:
console.print(f"[red]Error analyzing project: {e}[/red]")
elif user_input.lower() == 'suggest-next':
# 次のステップを提案
console.print("[cyan]AI is analyzing project and suggesting next steps...[/cyan]")
try:
developer = ContinuousDeveloper(Path.cwd(), ai_provider)
suggestions = developer.suggest_next_steps()
console.print(f"[green]Suggested Next Steps:[/green]")
for i, suggestion in enumerate(suggestions, 1):
console.print(f" {i}. {suggestion}")
except Exception as e:
console.print(f"[red]Error generating suggestions: {e}[/red]")
elif user_input.lower().startswith('continuous'):
# 継続開発モード
console.print("[cyan]Enabling continuous development mode...[/cyan]")
console.print("[yellow]Continuous mode is experimental. Type 'exit-continuous' to exit.[/yellow]")
try:
developer = ContinuousDeveloper(Path.cwd(), ai_provider)
context = developer.load_project_context()
console.print(f"[green]Project context loaded:[/green]")
console.print(f"Context: {len(context)} characters")
# Add to session memory for continuous context
persona.process_interaction(current_user, f"Continuous development mode started for project: {context[:500]}", ai_provider)
console.print("[dim]Project context added to AI memory for continuous development.[/dim]")
except Exception as e:
console.print(f"[red]Error starting continuous mode: {e}[/red]")
# Chat command or direct message
else:
# Remove 'chat' prefix if present
@ -668,7 +838,8 @@ def config(
console.print(f"[yellow]Key '{key}' not found[/yellow]")
elif action == "list":
keys = config.list_keys(key or "")
config_instance = Config()
keys = config_instance.list_keys(key or "")
if not keys:
console.print("[yellow]No configuration keys found[/yellow]")
@ -679,7 +850,7 @@ def config(
table.add_column("Value", style="green")
for k in sorted(keys):
val = config.get(k)
val = config_instance.get(k)
# Hide sensitive values
if "password" in k or "api_key" in k:
display_val = "***hidden***" if val else "not set"
@ -695,5 +866,56 @@ def config(
console.print("Valid actions: get, set, delete, list")
@app.command()
def import_chatgpt(
file_path: Path = typer.Argument(..., help="Path to ChatGPT export JSON file"),
user_id: str = typer.Option("chatgpt_user", "--user-id", "-u", help="User ID for imported conversations"),
data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory")
):
"""Import ChatGPT conversation data into ai.gpt memory system"""
from .chatgpt_importer import ChatGPTImporter
if data_dir is None:
data_dir = DEFAULT_DATA_DIR
data_dir.mkdir(parents=True, exist_ok=True)
if not file_path.exists():
console.print(f"[red]Error: File not found: {file_path}[/red]")
raise typer.Exit(1)
console.print(f"[cyan]Importing ChatGPT data from {file_path}[/cyan]")
console.print(f"User ID: {user_id}")
console.print(f"Data directory: {data_dir}")
try:
importer = ChatGPTImporter(data_dir)
stats = importer.import_from_file(file_path, user_id)
# Display results
table = Table(title="Import Results")
table.add_column("Metric", style="cyan")
table.add_column("Count", style="green")
table.add_row("Conversations imported", str(stats["conversations_imported"]))
table.add_row("Total messages", str(stats["messages_imported"]))
table.add_row("User messages", str(stats["user_messages"]))
table.add_row("Assistant messages", str(stats["assistant_messages"]))
table.add_row("Skipped messages", str(stats["skipped_messages"]))
console.print(table)
console.print(f"[green]✓ Import completed successfully![/green]")
# Show next steps
console.print("\n[cyan]Next steps:[/cyan]")
console.print(f"- Check memories: [yellow]aigpt status[/yellow]")
console.print(f"- Chat with AI: [yellow]aigpt chat {user_id} \"hello\"[/yellow]")
console.print(f"- View relationships: [yellow]aigpt relationships[/yellow]")
except Exception as e:
console.print(f"[red]Error during import: {e}[/red]")
raise typer.Exit(1)
if __name__ == "__main__":
app()

View File

@ -8,11 +8,12 @@ import logging
import subprocess
import os
import shlex
import httpx
import json
from .ai_provider import create_ai_provider
from .persona import Persona
from .models import Memory, Relationship, PersonaState
from .card_integration import CardIntegration, register_card_tools
logger = logging.getLogger(__name__)
@ -20,7 +21,7 @@ logger = logging.getLogger(__name__)
class AIGptMcpServer:
"""MCP Server that exposes ai.gpt functionality to AI assistants"""
def __init__(self, data_dir: Path, enable_card_integration: bool = False):
def __init__(self, data_dir: Path):
self.data_dir = data_dir
self.persona = Persona(data_dir)
@ -32,10 +33,6 @@ class AIGptMcpServer:
# Create MCP server with FastAPI app
self.server = FastApiMCP(self.app)
self.card_integration = None
if enable_card_integration:
self.card_integration = CardIntegration()
self._register_tools()
@ -58,6 +55,108 @@ class AIGptMcpServer:
for mem in memories
]
@self.app.get("/get_contextual_memories", operation_id="get_contextual_memories")
async def get_contextual_memories(query: str = "", limit: int = 10) -> Dict[str, List[Dict[str, Any]]]:
"""Get memories organized by priority with contextual relevance"""
memory_groups = self.persona.memory.get_contextual_memories(query=query, limit=limit)
result = {}
for group_name, memories in memory_groups.items():
result[group_name] = [
{
"id": mem.id,
"content": mem.content,
"level": mem.level.value,
"importance": mem.importance_score,
"is_core": mem.is_core,
"timestamp": mem.timestamp.isoformat(),
"summary": mem.summary,
"metadata": mem.metadata
}
for mem in memories
]
return result
@self.app.post("/search_memories", operation_id="search_memories")
async def search_memories(keywords: List[str], memory_types: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Search memories by keywords and optionally filter by memory types"""
from .models import MemoryLevel
# Convert string memory types to enum if provided
level_filter = None
if memory_types:
level_filter = []
for mt in memory_types:
try:
level_filter.append(MemoryLevel(mt))
except ValueError:
pass # Skip invalid memory types
memories = self.persona.memory.search_memories(keywords, memory_types=level_filter)
return [
{
"id": mem.id,
"content": mem.content,
"level": mem.level.value,
"importance": mem.importance_score,
"is_core": mem.is_core,
"timestamp": mem.timestamp.isoformat(),
"summary": mem.summary,
"metadata": mem.metadata
}
for mem in memories
]
@self.app.post("/create_summary", operation_id="create_summary")
async def create_summary(user_id: str) -> Dict[str, Any]:
"""Create an AI-powered summary of recent memories"""
try:
ai_provider = create_ai_provider()
summary = self.persona.memory.create_smart_summary(user_id, ai_provider=ai_provider)
if summary:
return {
"success": True,
"summary": {
"id": summary.id,
"content": summary.content,
"level": summary.level.value,
"importance": summary.importance_score,
"timestamp": summary.timestamp.isoformat(),
"metadata": summary.metadata
}
}
else:
return {"success": False, "reason": "Not enough memories to summarize"}
except Exception as e:
logger.error(f"Failed to create summary: {e}")
return {"success": False, "reason": str(e)}
@self.app.post("/create_core_memory", operation_id="create_core_memory")
async def create_core_memory() -> Dict[str, Any]:
"""Create a core memory by analyzing all existing memories"""
try:
ai_provider = create_ai_provider()
core_memory = self.persona.memory.create_core_memory(ai_provider=ai_provider)
if core_memory:
return {
"success": True,
"core_memory": {
"id": core_memory.id,
"content": core_memory.content,
"level": core_memory.level.value,
"importance": core_memory.importance_score,
"timestamp": core_memory.timestamp.isoformat(),
"metadata": core_memory.metadata
}
}
else:
return {"success": False, "reason": "Not enough memories to create core memory"}
except Exception as e:
logger.error(f"Failed to create core memory: {e}")
return {"success": False, "reason": str(e)}
@self.app.get("/get_relationship", operation_id="get_relationship")
async def get_relationship(user_id: str) -> Dict[str, Any]:
"""Get relationship status with a specific user"""
@ -101,6 +200,21 @@ class AIGptMcpServer:
"active_memory_count": len(state.active_memories)
}
@self.app.post("/get_context_prompt", operation_id="get_context_prompt")
async def get_context_prompt(user_id: str, message: str) -> Dict[str, Any]:
"""Get context-aware prompt for AI response generation"""
try:
context_prompt = self.persona.build_context_prompt(user_id, message)
return {
"success": True,
"context_prompt": context_prompt,
"user_id": user_id,
"message": message
}
except Exception as e:
logger.error(f"Failed to build context prompt: {e}")
return {"success": False, "reason": str(e)}
@self.app.post("/process_interaction", operation_id="process_interaction")
async def process_interaction(user_id: str, message: str) -> Dict[str, Any]:
"""Process an interaction with a user"""
@ -301,9 +415,89 @@ class AIGptMcpServer:
except Exception as e:
return {"error": str(e)}
# Register ai.card tools if integration is enabled
if self.card_integration:
register_card_tools(self.app, self.card_integration)
# ai.bot integration tools
@self.app.post("/remote_shell", operation_id="remote_shell")
async def remote_shell(command: str, ai_bot_url: str = "http://localhost:8080") -> Dict[str, Any]:
"""Execute command via ai.bot /sh functionality (systemd-nspawn isolated execution)"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
# ai.bot の /sh エンドポイントに送信
response = await client.post(
f"{ai_bot_url}/sh",
json={"command": command},
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
result = response.json()
return {
"status": "success",
"command": command,
"output": result.get("output", ""),
"error": result.get("error", ""),
"exit_code": result.get("exit_code", 0),
"execution_time": result.get("execution_time", ""),
"container_id": result.get("container_id", ""),
"isolated": True # systemd-nspawn isolation
}
else:
return {
"status": "error",
"error": f"ai.bot responded with status {response.status_code}",
"response_text": response.text
}
except httpx.TimeoutException:
return {"status": "error", "error": "Request to ai.bot timed out"}
except Exception as e:
return {"status": "error", "error": f"Failed to connect to ai.bot: {str(e)}"}
@self.app.get("/ai_bot_status", operation_id="ai_bot_status")
async def ai_bot_status(ai_bot_url: str = "http://localhost:8080") -> Dict[str, Any]:
"""Check ai.bot server status and available commands"""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{ai_bot_url}/status")
if response.status_code == 200:
result = response.json()
return {
"status": "online",
"ai_bot_url": ai_bot_url,
"server_info": result,
"shell_available": True
}
else:
return {
"status": "error",
"error": f"ai.bot status check failed: {response.status_code}"
}
except Exception as e:
return {
"status": "offline",
"error": f"Cannot connect to ai.bot: {str(e)}",
"ai_bot_url": ai_bot_url
}
@self.app.post("/isolated_python", operation_id="isolated_python")
async def isolated_python(code: str, ai_bot_url: str = "http://localhost:8080") -> Dict[str, Any]:
"""Execute Python code in isolated ai.bot environment"""
# Python コードを /sh 経由で実行
python_command = f'python3 -c "{code.replace('"', '\\"')}"'
return await remote_shell(python_command, ai_bot_url)
@self.app.post("/isolated_analysis", operation_id="isolated_analysis")
async def isolated_analysis(file_path: str, analysis_type: str = "structure", ai_bot_url: str = "http://localhost:8080") -> Dict[str, Any]:
"""Perform code analysis in isolated environment"""
if analysis_type == "structure":
command = f"find {file_path} -type f -name '*.py' | head -20"
elif analysis_type == "lines":
command = f"wc -l {file_path}"
elif analysis_type == "syntax":
command = f"python3 -m py_compile {file_path}"
else:
command = f"file {file_path}"
return await remote_shell(command, ai_bot_url)
# Mount MCP server
self.server.mount()
@ -314,5 +508,4 @@ class AIGptMcpServer:
async def close(self):
"""Cleanup resources"""
if self.card_integration:
await self.card_integration.close()
pass

View File

@ -67,8 +67,13 @@ class MemoryManager:
self._save_memories()
return memory
def summarize_memories(self, user_id: str) -> Optional[Memory]:
"""Create summary from recent memories"""
def add_memory(self, memory: Memory):
"""Add a memory directly to the system"""
self.memories[memory.id] = memory
self._save_memories()
def create_smart_summary(self, user_id: str, ai_provider=None) -> Optional[Memory]:
"""Create AI-powered thematic summary from recent memories"""
recent_memories = [
mem for mem in self.memories.values()
if mem.level == MemoryLevel.FULL_LOG
@ -78,8 +83,40 @@ class MemoryManager:
if len(recent_memories) < 5:
return None
# Simple summary creation (in real implementation, use AI)
summary_content = f"Summary of {len(recent_memories)} recent interactions"
# Sort by timestamp for chronological analysis
recent_memories.sort(key=lambda m: m.timestamp)
# Prepare conversation context for AI analysis
conversations_text = "\n\n".join([
f"[{mem.timestamp.strftime('%Y-%m-%d %H:%M')}] {mem.content}"
for mem in recent_memories
])
summary_prompt = f"""
Analyze these recent conversations and create a thematic summary focusing on:
1. Communication patterns and user preferences
2. Technical topics and problem-solving approaches
3. Relationship progression and trust level
4. Key recurring themes and interests
Conversations:
{conversations_text}
Create a concise summary (2-3 sentences) that captures the essence of this interaction period:
"""
try:
if ai_provider:
summary_content = ai_provider.chat(summary_prompt, max_tokens=200)
else:
# Fallback to pattern-based analysis
themes = self._extract_themes(recent_memories)
summary_content = f"Themes: {', '.join(themes[:3])}. {len(recent_memories)} interactions with focus on technical discussions."
except Exception as e:
self.logger.warning(f"AI summary failed, using fallback: {e}")
themes = self._extract_themes(recent_memories)
summary_content = f"Themes: {', '.join(themes[:3])}. {len(recent_memories)} interactions."
summary_id = hashlib.sha256(
f"summary_{datetime.now().isoformat()}".encode()
).hexdigest()[:16]
@ -87,23 +124,154 @@ class MemoryManager:
summary = Memory(
id=summary_id,
timestamp=datetime.now(),
content=summary_content,
content=f"SUMMARY ({len(recent_memories)} conversations): {summary_content}",
summary=summary_content,
level=MemoryLevel.SUMMARY,
importance_score=0.5
importance_score=0.6,
metadata={
"memory_count": len(recent_memories),
"time_span": f"{recent_memories[0].timestamp.date()} to {recent_memories[-1].timestamp.date()}",
"themes": self._extract_themes(recent_memories)[:5]
}
)
self.memories[summary.id] = summary
# Mark summarized memories for potential forgetting
# Reduce importance of summarized memories
for mem in recent_memories:
mem.importance_score *= 0.9
mem.importance_score *= 0.8
self._save_memories()
return summary
def _extract_themes(self, memories: List[Memory]) -> List[str]:
"""Extract common themes from memory content"""
common_words = {}
for memory in memories:
# Simple keyword extraction
words = memory.content.lower().split()
for word in words:
if len(word) > 4 and word.isalpha():
common_words[word] = common_words.get(word, 0) + 1
# Return most frequent meaningful words
return sorted(common_words.keys(), key=common_words.get, reverse=True)[:10]
def create_core_memory(self, ai_provider=None) -> Optional[Memory]:
"""Analyze all memories to extract core personality-forming elements"""
# Collect all non-forgotten memories for analysis
all_memories = [
mem for mem in self.memories.values()
if mem.level != MemoryLevel.FORGOTTEN
]
if len(all_memories) < 10:
return None
# Sort by importance and timestamp for comprehensive analysis
all_memories.sort(key=lambda m: (m.importance_score, m.timestamp), reverse=True)
# Prepare memory context for AI analysis
memory_context = "\n".join([
f"[{mem.level.value}] {mem.timestamp.strftime('%Y-%m-%d')}: {mem.content[:200]}..."
for mem in all_memories[:20] # Top 20 memories
])
core_prompt = f"""
Analyze these conversations and memories to identify core personality elements that define this user relationship:
1. Communication style and preferences
2. Core values and principles
3. Problem-solving patterns
4. Trust level and relationship depth
5. Unique characteristics that make this relationship special
Memories:
{memory_context}
Extract the essential personality-forming elements (2-3 sentences) that should NEVER be forgotten:
"""
try:
if ai_provider:
core_content = ai_provider.chat(core_prompt, max_tokens=150)
else:
# Fallback to pattern analysis
user_patterns = self._analyze_user_patterns(all_memories)
core_content = f"User shows {user_patterns['communication_style']} communication, focuses on {user_patterns['main_interests']}, and demonstrates {user_patterns['problem_solving']} approach."
except Exception as e:
self.logger.warning(f"AI core analysis failed, using fallback: {e}")
user_patterns = self._analyze_user_patterns(all_memories)
core_content = f"Core pattern: {user_patterns['communication_style']} style, {user_patterns['main_interests']} interests."
# Create core memory
core_id = hashlib.sha256(
f"core_{datetime.now().isoformat()}".encode()
).hexdigest()[:16]
core_memory = Memory(
id=core_id,
timestamp=datetime.now(),
content=f"CORE PERSONALITY: {core_content}",
summary=core_content,
level=MemoryLevel.CORE,
importance_score=1.0,
is_core=True,
metadata={
"source_memories": len(all_memories),
"analysis_date": datetime.now().isoformat(),
"patterns": self._analyze_user_patterns(all_memories)
}
)
self.memories[core_memory.id] = core_memory
self._save_memories()
self.logger.info(f"Core memory created: {core_id}")
return core_memory
def _analyze_user_patterns(self, memories: List[Memory]) -> Dict[str, str]:
"""Analyze patterns in user behavior from memories"""
# Extract patterns from conversation content
all_content = " ".join([mem.content.lower() for mem in memories])
# Simple pattern detection
communication_indicators = {
"technical": ["code", "implementation", "system", "api", "database"],
"casual": ["thanks", "please", "sorry", "help"],
"formal": ["could", "would", "should", "proper"]
}
problem_solving_indicators = {
"systematic": ["first", "then", "next", "step", "plan"],
"experimental": ["try", "test", "experiment", "see"],
"theoretical": ["concept", "design", "architecture", "pattern"]
}
# Score each pattern
communication_style = max(
communication_indicators.keys(),
key=lambda style: sum(all_content.count(word) for word in communication_indicators[style])
)
problem_solving = max(
problem_solving_indicators.keys(),
key=lambda style: sum(all_content.count(word) for word in problem_solving_indicators[style])
)
# Extract main interests from themes
themes = self._extract_themes(memories)
main_interests = ", ".join(themes[:3]) if themes else "general technology"
return {
"communication_style": communication_style,
"problem_solving": problem_solving,
"main_interests": main_interests,
"interaction_count": len(memories)
}
def identify_core_memories(self) -> List[Memory]:
"""Identify memories that should become core (never forgotten)"""
"""Identify existing memories that should become core (legacy method)"""
core_candidates = [
mem for mem in self.memories.values()
if mem.importance_score > 0.8
@ -140,7 +308,7 @@ class MemoryManager:
self._save_memories()
def get_active_memories(self, limit: int = 10) -> List[Memory]:
"""Get currently active memories for persona"""
"""Get currently active memories for persona (legacy method)"""
active = [
mem for mem in self.memories.values()
if mem.level != MemoryLevel.FORGOTTEN
@ -152,4 +320,89 @@ class MemoryManager:
reverse=True
)
return active[:limit]
return active[:limit]
def get_contextual_memories(self, query: str = "", limit: int = 10) -> Dict[str, List[Memory]]:
"""Get memories organized by priority with contextual relevance"""
all_memories = [
mem for mem in self.memories.values()
if mem.level != MemoryLevel.FORGOTTEN
]
# Categorize memories by type and importance
core_memories = [mem for mem in all_memories if mem.level == MemoryLevel.CORE]
summary_memories = [mem for mem in all_memories if mem.level == MemoryLevel.SUMMARY]
recent_memories = [
mem for mem in all_memories
if mem.level == MemoryLevel.FULL_LOG
and (datetime.now() - mem.timestamp).days < 3
]
# Apply keyword relevance if query provided
if query:
query_lower = query.lower()
def relevance_score(memory: Memory) -> float:
content_score = 1 if query_lower in memory.content.lower() else 0
summary_score = 1 if memory.summary and query_lower in memory.summary.lower() else 0
metadata_score = 1 if any(
query_lower in str(v).lower()
for v in (memory.metadata or {}).values()
) else 0
return content_score + summary_score + metadata_score
# Re-rank by relevance while maintaining type priority
core_memories.sort(key=lambda m: (relevance_score(m), m.importance_score), reverse=True)
summary_memories.sort(key=lambda m: (relevance_score(m), m.importance_score), reverse=True)
recent_memories.sort(key=lambda m: (relevance_score(m), m.importance_score), reverse=True)
else:
# Sort by importance and recency
core_memories.sort(key=lambda m: (m.importance_score, m.timestamp), reverse=True)
summary_memories.sort(key=lambda m: (m.importance_score, m.timestamp), reverse=True)
recent_memories.sort(key=lambda m: (m.importance_score, m.timestamp), reverse=True)
# Return organized memory structure
return {
"core": core_memories[:3], # Always include top core memories
"summary": summary_memories[:3], # Recent summaries
"recent": recent_memories[:limit-6], # Fill remaining with recent
"all_active": all_memories[:limit] # Fallback for simple access
}
def search_memories(self, keywords: List[str], memory_types: List[MemoryLevel] = None) -> List[Memory]:
"""Search memories by keywords and optionally filter by memory types"""
if memory_types is None:
memory_types = [MemoryLevel.CORE, MemoryLevel.SUMMARY, MemoryLevel.FULL_LOG]
matching_memories = []
for memory in self.memories.values():
if memory.level not in memory_types or memory.level == MemoryLevel.FORGOTTEN:
continue
# Check if any keyword matches in content, summary, or metadata
content_text = f"{memory.content} {memory.summary or ''}"
if memory.metadata:
content_text += " " + " ".join(str(v) for v in memory.metadata.values())
content_lower = content_text.lower()
# Score by keyword matches
match_score = sum(
keyword.lower() in content_lower
for keyword in keywords
)
if match_score > 0:
# Add match score to memory for sorting
memory_copy = memory.model_copy()
memory_copy.importance_score += match_score * 0.1
matching_memories.append(memory_copy)
# Sort by relevance (match score + importance + core status)
matching_memories.sort(
key=lambda m: (m.is_core, m.importance_score, m.timestamp),
reverse=True
)
return matching_memories

View File

@ -3,7 +3,7 @@
from datetime import datetime, date
from typing import Optional, Dict, List, Any
from enum import Enum
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator
class MemoryLevel(str, Enum):
@ -30,9 +30,18 @@ class Memory(BaseModel):
content: str
summary: Optional[str] = None
level: MemoryLevel = MemoryLevel.FULL_LOG
importance_score: float = Field(ge=0.0, le=1.0)
importance_score: float
is_core: bool = False
decay_rate: float = 0.01
metadata: Optional[Dict[str, Any]] = None
@field_validator('importance_score')
@classmethod
def validate_importance_score(cls, v):
"""Ensure importance_score is within valid range, handle floating point precision issues"""
if abs(v) < 1e-10: # Very close to zero
return 0.0
return max(0.0, min(1.0, v))
class Relationship(BaseModel):

View File

@ -92,27 +92,85 @@ class Persona:
else:
return "contemplative"
def build_context_prompt(self, user_id: str, current_message: str) -> str:
"""Build context-aware prompt with relevant memories"""
# Get contextual memories based on current message
memory_groups = self.memory.get_contextual_memories(query=current_message, limit=8)
# Build context sections
context_parts = []
# Core personality elements (always included)
if memory_groups["core"]:
core_context = "\n".join([mem.content for mem in memory_groups["core"]])
context_parts.append(f"CORE PERSONALITY:\n{core_context}")
# Recent summaries for context
if memory_groups["summary"]:
summary_context = "\n".join([mem.summary or mem.content for mem in memory_groups["summary"]])
context_parts.append(f"RECENT PATTERNS:\n{summary_context}")
# Recent specific interactions
if memory_groups["recent"]:
recent_context = "\n".join([
f"[{mem.timestamp.strftime('%m-%d')}] {mem.content[:100]}..."
for mem in memory_groups["recent"][:3]
])
context_parts.append(f"RECENT INTERACTIONS:\n{recent_context}")
# Get current persona state
state = self.get_current_state()
# Build final prompt
context_prompt = f"""You are an AI with persistent memory and evolving relationships. Your current state:
PERSONALITY: {', '.join([f'{k}={v:.1f}' for k, v in state.base_personality.items()])}
MOOD: {state.current_mood}
FORTUNE: {state.fortune.fortune_value}/10
"""
if context_parts:
context_prompt += "RELEVANT CONTEXT:\n" + "\n\n".join(context_parts) + "\n\n"
context_prompt += f"""Respond to this message while staying true to your personality and the established relationship context:
User: {current_message}
AI:"""
return context_prompt
def process_interaction(self, user_id: str, message: str, ai_provider=None) -> tuple[str, float]:
"""Process user interaction and generate response"""
"""Process user interaction and generate response with enhanced context"""
# Get current state
state = self.get_current_state()
# Get relationship with user
relationship = self.relationships.get_or_create_relationship(user_id)
# Simple response generation (use AI provider if available)
# Enhanced response generation with context awareness
if relationship.is_broken:
response = "..."
relationship_delta = 0.0
else:
if ai_provider:
# Use AI provider for response generation
memories = self.memory.get_active_memories(limit=5)
import asyncio
response = asyncio.run(
ai_provider.generate_response(message, state, memories)
)
# Calculate relationship delta based on interaction quality
# Build context-aware prompt
context_prompt = self.build_context_prompt(user_id, message)
# Generate response using AI with full context
try:
response = ai_provider.chat(context_prompt, max_tokens=200)
# Clean up response if it includes the prompt echo
if "AI:" in response:
response = response.split("AI:")[-1].strip()
except Exception as e:
self.logger.error(f"AI response generation failed: {e}")
response = f"I appreciate your message about {message[:50]}..."
# Calculate relationship delta based on interaction quality and context
if state.current_mood in ["joyful", "cheerful"]:
relationship_delta = 2.0
elif relationship.status.value == "close_friend":
@ -120,8 +178,14 @@ class Persona:
else:
relationship_delta = 1.0
else:
# Fallback to simple responses
if state.current_mood == "joyful":
# Context-aware fallback responses
memory_groups = self.memory.get_contextual_memories(query=message, limit=3)
if memory_groups["core"]:
# Reference core memories for continuity
response = f"Based on our relationship, I think {message.lower()} connects to what we've discussed before."
relationship_delta = 1.5
elif state.current_mood == "joyful":
response = f"What a wonderful day! {message} sounds interesting!"
relationship_delta = 2.0
elif relationship.status.value == "close_friend":
@ -171,11 +235,16 @@ class Persona:
if core_memories:
self.logger.info(f"Identified {len(core_memories)} new core memories")
# Create memory summaries
# Create memory summaries
for user_id in self.relationships.relationships:
summary = self.memory.summarize_memories(user_id)
if summary:
self.logger.info(f"Created summary for interactions with {user_id}")
try:
from .ai_provider import create_ai_provider
ai_provider = create_ai_provider()
summary = self.memory.create_smart_summary(user_id, ai_provider=ai_provider)
if summary:
self.logger.info(f"Created smart summary for interactions with {user_id}")
except Exception as e:
self.logger.warning(f"Could not create AI summary for {user_id}: {e}")
self._save_state()
self.logger.info("Daily maintenance completed")

View File

@ -0,0 +1,321 @@
"""Project management and continuous development logic for ai.shell"""
import json
import os
from pathlib import Path
from typing import Dict, List, Optional, Any
from datetime import datetime
import subprocess
import hashlib
from .models import Memory
from .ai_provider import AIProvider
class ProjectState:
"""プロジェクトの現在状態を追跡"""
def __init__(self, project_root: Path):
self.project_root = project_root
self.files_state: Dict[str, str] = {} # ファイルパス: ハッシュ
self.last_analysis: Optional[datetime] = None
self.project_context: Optional[str] = None
self.development_goals: List[str] = []
self.known_patterns: Dict[str, Any] = {}
def scan_project_files(self) -> Dict[str, str]:
"""プロジェクトファイルをスキャンしてハッシュ計算"""
current_state = {}
# 対象ファイル拡張子
target_extensions = {'.py', '.js', '.ts', '.rs', '.go', '.java', '.cpp', '.c', '.h'}
for file_path in self.project_root.rglob('*'):
if (file_path.is_file() and
file_path.suffix in target_extensions and
not any(part.startswith('.') for part in file_path.parts)):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
file_hash = hashlib.md5(content.encode()).hexdigest()
relative_path = str(file_path.relative_to(self.project_root))
current_state[relative_path] = file_hash
except Exception:
continue
return current_state
def detect_changes(self) -> Dict[str, str]:
"""ファイル変更を検出"""
current_state = self.scan_project_files()
changes = {}
# 新規・変更ファイル
for path, current_hash in current_state.items():
if path not in self.files_state or self.files_state[path] != current_hash:
changes[path] = "modified" if path in self.files_state else "added"
# 削除ファイル
for path in self.files_state:
if path not in current_state:
changes[path] = "deleted"
self.files_state = current_state
return changes
class ContinuousDeveloper:
"""Claude Code的な継続開発機能"""
def __init__(self, project_root: Path, ai_provider: Optional[AIProvider] = None):
self.project_root = project_root
self.ai_provider = ai_provider
self.project_state = ProjectState(project_root)
self.session_memory: List[str] = []
def load_project_context(self) -> str:
"""プロジェクト文脈を読み込み"""
context_files = [
"claude.md", "aishell.md", "README.md",
"pyproject.toml", "package.json", "Cargo.toml"
]
context_parts = []
for filename in context_files:
file_path = self.project_root / filename
if file_path.exists():
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
context_parts.append(f"## {filename}\n{content}")
except Exception:
continue
return "\n\n".join(context_parts)
def analyze_project_structure(self) -> Dict[str, Any]:
"""プロジェクト構造を分析"""
analysis = {
"language": self._detect_primary_language(),
"framework": self._detect_framework(),
"structure": self._analyze_file_structure(),
"dependencies": self._analyze_dependencies(),
"patterns": self._detect_code_patterns()
}
return analysis
def _detect_primary_language(self) -> str:
"""主要言語を検出"""
file_counts = {}
for file_path in self.project_root.rglob('*'):
if file_path.is_file() and file_path.suffix:
ext = file_path.suffix.lower()
file_counts[ext] = file_counts.get(ext, 0) + 1
language_map = {
'.py': 'Python',
'.js': 'JavaScript',
'.ts': 'TypeScript',
'.rs': 'Rust',
'.go': 'Go',
'.java': 'Java'
}
if file_counts:
primary_ext = max(file_counts.items(), key=lambda x: x[1])[0]
return language_map.get(primary_ext, 'Unknown')
return 'Unknown'
def _detect_framework(self) -> str:
"""フレームワークを検出"""
frameworks = {
'fastapi': ['fastapi', 'uvicorn'],
'django': ['django'],
'flask': ['flask'],
'react': ['react'],
'next.js': ['next'],
'rust-actix': ['actix-web'],
}
# pyproject.toml, package.json, Cargo.tomlから依存関係を確認
for config_file in ['pyproject.toml', 'package.json', 'Cargo.toml']:
config_path = self.project_root / config_file
if config_path.exists():
try:
with open(config_path, 'r') as f:
content = f.read().lower()
for framework, keywords in frameworks.items():
if any(keyword in content for keyword in keywords):
return framework
except Exception:
continue
return 'Unknown'
def _analyze_file_structure(self) -> Dict[str, List[str]]:
"""ファイル構造を分析"""
structure = {"directories": [], "key_files": []}
for item in self.project_root.iterdir():
if item.is_dir() and not item.name.startswith('.'):
structure["directories"].append(item.name)
elif item.is_file() and item.name in [
'main.py', 'app.py', 'index.js', 'main.rs', 'main.go'
]:
structure["key_files"].append(item.name)
return structure
def _analyze_dependencies(self) -> List[str]:
"""依存関係を分析"""
deps = []
# Python dependencies
pyproject = self.project_root / "pyproject.toml"
if pyproject.exists():
try:
with open(pyproject, 'r') as f:
content = f.read()
# Simple regex would be better but for now just check for common packages
common_packages = ['fastapi', 'pydantic', 'uvicorn', 'ollama', 'openai']
for package in common_packages:
if package in content:
deps.append(package)
except Exception:
pass
return deps
def _detect_code_patterns(self) -> Dict[str, int]:
"""コードパターンを検出"""
patterns = {
"classes": 0,
"functions": 0,
"api_endpoints": 0,
"async_functions": 0
}
for py_file in self.project_root.rglob('*.py'):
try:
with open(py_file, 'r', encoding='utf-8') as f:
content = f.read()
patterns["classes"] += content.count('class ')
patterns["functions"] += content.count('def ')
patterns["api_endpoints"] += content.count('@app.')
patterns["async_functions"] += content.count('async def')
except Exception:
continue
return patterns
def suggest_next_steps(self, current_task: Optional[str] = None) -> List[str]:
"""次のステップを提案"""
if not self.ai_provider:
return ["AI provider not available for suggestions"]
context = self.load_project_context()
analysis = self.analyze_project_structure()
changes = self.project_state.detect_changes()
prompt = f"""
プロジェクト分析に基づいて、次の開発ステップを3-5個提案してください。
## プロジェクト文脈
{context[:1000]}
## 構造分析
言語: {analysis['language']}
フレームワーク: {analysis['framework']}
パターン: {analysis['patterns']}
## 最近の変更
{changes}
## 現在のタスク
{current_task or "特になし"}
具体的で実行可能なステップを提案してください:
"""
try:
response = self.ai_provider.chat(prompt, max_tokens=300)
# Simple parsing - in real implementation would be more sophisticated
steps = [line.strip() for line in response.split('\n')
if line.strip() and (line.strip().startswith('-') or line.strip().startswith('1.'))]
return steps[:5]
except Exception as e:
return [f"Error generating suggestions: {str(e)}"]
def generate_code(self, description: str, file_path: Optional[str] = None) -> str:
"""コード生成"""
if not self.ai_provider:
return "AI provider not available for code generation"
context = self.load_project_context()
analysis = self.analyze_project_structure()
prompt = f"""
以下の仕様に基づいてコードを生成してください。
## プロジェクト文脈
{context[:800]}
## 言語・フレームワーク
言語: {analysis['language']}
フレームワーク: {analysis['framework']}
既存パターン: {analysis['patterns']}
## 生成要求
{description}
{"ファイルパス: " + file_path if file_path else ""}
プロジェクトの既存コードスタイルと一貫性を保ったコードを生成してください:
"""
try:
return self.ai_provider.chat(prompt, max_tokens=500)
except Exception as e:
return f"Error generating code: {str(e)}"
def analyze_file(self, file_path: str) -> str:
"""ファイル分析"""
full_path = self.project_root / file_path
if not full_path.exists():
return f"File not found: {file_path}"
try:
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
return f"Error reading file: {str(e)}"
if not self.ai_provider:
return f"File contents ({len(content)} chars):\n{content[:200]}..."
context = self.load_project_context()
prompt = f"""
以下のファイルを分析して、改善点や問題点を指摘してください。
## プロジェクト文脈
{context[:500]}
## ファイル: {file_path}
{content[:1500]}
分析内容:
1. コード品質
2. プロジェクトとの整合性
3. 改善提案
4. 潜在的な問題
"""
try:
return self.ai_provider.chat(prompt, max_tokens=400)
except Exception as e:
return f"Error analyzing file: {str(e)}"