From 06fb70fffadb24c1d15a8b37691ea96afbdbcf7e Mon Sep 17 00:00:00 2001 From: syui Date: Mon, 2 Jun 2025 01:16:04 +0900 Subject: [PATCH] add src --- .gitignore | 1 - src/aigpt/__init__.py | 15 ++ src/aigpt/ai_provider.py | 172 +++++++++++++++ src/aigpt/cli.py | 444 ++++++++++++++++++++++++++++++++++++++ src/aigpt/config.py | 145 +++++++++++++ src/aigpt/fortune.py | 118 ++++++++++ src/aigpt/mcp_server.py | 149 +++++++++++++ src/aigpt/memory.py | 155 +++++++++++++ src/aigpt/models.py | 79 +++++++ src/aigpt/persona.py | 181 ++++++++++++++++ src/aigpt/relationship.py | 135 ++++++++++++ src/aigpt/scheduler.py | 312 +++++++++++++++++++++++++++ src/aigpt/transmission.py | 111 ++++++++++ 13 files changed, 2016 insertions(+), 1 deletion(-) create mode 100644 src/aigpt/__init__.py create mode 100644 src/aigpt/ai_provider.py create mode 100644 src/aigpt/cli.py create mode 100644 src/aigpt/config.py create mode 100644 src/aigpt/fortune.py create mode 100644 src/aigpt/mcp_server.py create mode 100644 src/aigpt/memory.py create mode 100644 src/aigpt/models.py create mode 100644 src/aigpt/persona.py create mode 100644 src/aigpt/relationship.py create mode 100644 src/aigpt/scheduler.py create mode 100644 src/aigpt/transmission.py diff --git a/.gitignore b/.gitignore index 7299c7b..c0792be 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,5 @@ **.lock output.json config/*.db -aigpt mcp/scripts/__* data diff --git a/src/aigpt/__init__.py b/src/aigpt/__init__.py new file mode 100644 index 0000000..c29231b --- /dev/null +++ b/src/aigpt/__init__.py @@ -0,0 +1,15 @@ +"""ai.gpt - Autonomous transmission AI with unique personality""" + +__version__ = "0.1.0" + +from .memory import MemoryManager +from .relationship import RelationshipTracker +from .persona import Persona +from .transmission import TransmissionController + +__all__ = [ + "MemoryManager", + "RelationshipTracker", + "Persona", + "TransmissionController", +] \ No newline at end of file diff --git a/src/aigpt/ai_provider.py b/src/aigpt/ai_provider.py new file mode 100644 index 0000000..59575cd --- /dev/null +++ b/src/aigpt/ai_provider.py @@ -0,0 +1,172 @@ +"""AI Provider integration for response generation""" + +import os +from typing import Optional, Dict, List, Any, Protocol +from abc import abstractmethod +import logging +import httpx +from openai import OpenAI +import ollama + +from .models import PersonaState, Memory +from .config import Config + + +class AIProvider(Protocol): + """Protocol for AI providers""" + + @abstractmethod + async def generate_response( + self, + prompt: str, + persona_state: PersonaState, + memories: List[Memory], + system_prompt: Optional[str] = None + ) -> str: + """Generate a response based on prompt and context""" + pass + + +class OllamaProvider: + """Ollama AI provider""" + + def __init__(self, model: str = "qwen2.5", host: str = "http://localhost:11434"): + self.model = model + self.host = host + self.client = ollama.Client(host=host) + self.logger = logging.getLogger(__name__) + + async def generate_response( + self, + prompt: str, + persona_state: PersonaState, + memories: List[Memory], + system_prompt: Optional[str] = None + ) -> str: + """Generate response using Ollama""" + + # Build context from memories + memory_context = "\n".join([ + f"[{mem.level.value}] {mem.content[:200]}..." + for mem in memories[:5] + ]) + + # Build personality context + personality_desc = ", ".join([ + f"{trait}: {value:.1f}" + for trait, value in persona_state.base_personality.items() + ]) + + # System prompt with persona context + full_system_prompt = f"""You are an AI with the following characteristics: +Current mood: {persona_state.current_mood} +Fortune today: {persona_state.fortune.fortune_value}/10 +Personality traits: {personality_desc} + +Recent memories: +{memory_context} + +{system_prompt or 'Respond naturally based on your current state and memories.'}""" + + try: + response = self.client.chat( + model=self.model, + messages=[ + {"role": "system", "content": full_system_prompt}, + {"role": "user", "content": prompt} + ] + ) + return response['message']['content'] + except Exception as e: + self.logger.error(f"Ollama generation failed: {e}") + return self._fallback_response(persona_state) + + def _fallback_response(self, persona_state: PersonaState) -> str: + """Fallback response based on mood""" + mood_responses = { + "joyful": "That's wonderful! I'm feeling great today!", + "cheerful": "That sounds nice!", + "neutral": "I understand.", + "melancholic": "I see... That's something to think about.", + "contemplative": "Hmm, let me consider that..." + } + return mood_responses.get(persona_state.current_mood, "I see.") + + +class OpenAIProvider: + """OpenAI API provider""" + + def __init__(self, model: str = "gpt-4o-mini", api_key: Optional[str] = None): + self.model = model + # Try to get API key from config first + config = Config() + self.api_key = api_key or config.get_api_key("openai") or os.getenv("OPENAI_API_KEY") + if not self.api_key: + raise ValueError("OpenAI API key not provided. Set it with: ai-gpt config set providers.openai.api_key YOUR_KEY") + self.client = OpenAI(api_key=self.api_key) + self.logger = logging.getLogger(__name__) + + async def generate_response( + self, + prompt: str, + persona_state: PersonaState, + memories: List[Memory], + system_prompt: Optional[str] = None + ) -> str: + """Generate response using OpenAI""" + + # Build context similar to Ollama + memory_context = "\n".join([ + f"[{mem.level.value}] {mem.content[:200]}..." + for mem in memories[:5] + ]) + + personality_desc = ", ".join([ + f"{trait}: {value:.1f}" + for trait, value in persona_state.base_personality.items() + ]) + + full_system_prompt = f"""You are an AI with unique personality traits and memories. +Current mood: {persona_state.current_mood} +Fortune today: {persona_state.fortune.fortune_value}/10 +Personality traits: {personality_desc} + +Recent memories: +{memory_context} + +{system_prompt or 'Respond naturally based on your current state and memories. Be authentic to your mood and personality.'}""" + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": full_system_prompt}, + {"role": "user", "content": prompt} + ], + temperature=0.7 + (persona_state.fortune.fortune_value - 5) * 0.05 # Vary by fortune + ) + return response.choices[0].message.content + except Exception as e: + self.logger.error(f"OpenAI generation failed: {e}") + return self._fallback_response(persona_state) + + def _fallback_response(self, persona_state: PersonaState) -> str: + """Fallback response based on mood""" + mood_responses = { + "joyful": "What a delightful conversation!", + "cheerful": "That's interesting!", + "neutral": "I understand what you mean.", + "melancholic": "I've been thinking about that too...", + "contemplative": "That gives me something to ponder..." + } + return mood_responses.get(persona_state.current_mood, "I see.") + + +def create_ai_provider(provider: str, model: str, **kwargs) -> AIProvider: + """Factory function to create AI providers""" + if provider == "ollama": + return OllamaProvider(model=model, **kwargs) + elif provider == "openai": + return OpenAIProvider(model=model, **kwargs) + else: + raise ValueError(f"Unknown provider: {provider}") \ No newline at end of file diff --git a/src/aigpt/cli.py b/src/aigpt/cli.py new file mode 100644 index 0000000..a0d8570 --- /dev/null +++ b/src/aigpt/cli.py @@ -0,0 +1,444 @@ +"""CLI interface for ai.gpt using typer""" + +import typer +from pathlib import Path +from typing import Optional +from rich.console import Console +from rich.table import Table +from rich.panel import Panel +from datetime import datetime, timedelta + +from .persona import Persona +from .transmission import TransmissionController +from .mcp_server import AIGptMcpServer +from .ai_provider import create_ai_provider +from .scheduler import AIScheduler, TaskType +from .config import Config + +app = typer.Typer(help="ai.gpt - Autonomous transmission AI with unique personality") +console = Console() + +# Configuration +config = Config() +DEFAULT_DATA_DIR = config.data_dir + + +def get_persona(data_dir: Optional[Path] = None) -> Persona: + """Get or create persona instance""" + if data_dir is None: + data_dir = DEFAULT_DATA_DIR + + data_dir.mkdir(parents=True, exist_ok=True) + return Persona(data_dir) + + +@app.command() +def chat( + user_id: str = typer.Argument(..., help="User ID (atproto DID)"), + message: str = typer.Argument(..., help="Message to send to AI"), + data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory"), + model: Optional[str] = typer.Option(None, "--model", "-m", help="AI model to use"), + provider: Optional[str] = typer.Option(None, "--provider", help="AI provider (ollama/openai)") +): + """Chat with the AI""" + persona = get_persona(data_dir) + + # Create AI provider if specified + ai_provider = None + if provider and model: + try: + ai_provider = create_ai_provider(provider, model) + console.print(f"[dim]Using {provider} with model {model}[/dim]\n") + except Exception as e: + console.print(f"[yellow]Warning: Could not create AI provider: {e}[/yellow]") + console.print("[yellow]Falling back to simple responses[/yellow]\n") + + # Process interaction + response, relationship_delta = persona.process_interaction(user_id, message, ai_provider) + + # Get updated relationship + relationship = persona.relationships.get_or_create_relationship(user_id) + + # Display response + console.print(Panel(response, title="AI Response", border_style="cyan")) + + # Show relationship status + status_color = "green" if relationship.transmission_enabled else "yellow" + if relationship.is_broken: + status_color = "red" + + console.print(f"\n[{status_color}]Relationship Status:[/{status_color}] {relationship.status.value}") + console.print(f"Score: {relationship.score:.2f} / {relationship.threshold}") + console.print(f"Transmission: {'✓ Enabled' if relationship.transmission_enabled else '✗ Disabled'}") + + if relationship.is_broken: + console.print("[red]⚠️ This relationship is broken and cannot be repaired.[/red]") + + +@app.command() +def status( + user_id: Optional[str] = typer.Argument(None, help="User ID to check status for"), + data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory") +): + """Check AI status and relationships""" + persona = get_persona(data_dir) + state = persona.get_current_state() + + # Show AI state + console.print(Panel(f"[cyan]ai.gpt Status[/cyan]", expand=False)) + console.print(f"Mood: {state.current_mood}") + console.print(f"Fortune: {state.fortune.fortune_value}/10") + + if state.fortune.breakthrough_triggered: + console.print("[yellow]⚡ Breakthrough triggered![/yellow]") + + # Show personality traits + table = Table(title="Current Personality") + table.add_column("Trait", style="cyan") + table.add_column("Value", style="magenta") + + for trait, value in state.base_personality.items(): + table.add_row(trait.capitalize(), f"{value:.2f}") + + console.print(table) + + # Show specific relationship if requested + if user_id: + rel = persona.relationships.get_or_create_relationship(user_id) + console.print(f"\n[cyan]Relationship with {user_id}:[/cyan]") + console.print(f"Status: {rel.status.value}") + console.print(f"Score: {rel.score:.2f}") + console.print(f"Total Interactions: {rel.total_interactions}") + console.print(f"Transmission Enabled: {rel.transmission_enabled}") + + +@app.command() +def fortune( + data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory") +): + """Check today's AI fortune""" + persona = get_persona(data_dir) + fortune = persona.fortune_system.get_today_fortune() + + # Fortune display + fortune_bar = "🌟" * fortune.fortune_value + "☆" * (10 - fortune.fortune_value) + + console.print(Panel( + f"{fortune_bar}\n\n" + f"Today's Fortune: {fortune.fortune_value}/10\n" + f"Date: {fortune.date}", + title="AI Fortune", + border_style="yellow" + )) + + if fortune.consecutive_good > 0: + console.print(f"[green]Consecutive good days: {fortune.consecutive_good}[/green]") + if fortune.consecutive_bad > 0: + console.print(f"[red]Consecutive bad days: {fortune.consecutive_bad}[/red]") + + if fortune.breakthrough_triggered: + console.print("\n[yellow]⚡ BREAKTHROUGH! Special fortune activated![/yellow]") + + +@app.command() +def transmit( + data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory"), + dry_run: bool = typer.Option(True, "--dry-run/--execute", help="Dry run or execute") +): + """Check and execute autonomous transmissions""" + persona = get_persona(data_dir) + controller = TransmissionController(persona, persona.data_dir) + + eligible = controller.check_transmission_eligibility() + + if not eligible: + console.print("[yellow]No users eligible for transmission.[/yellow]") + return + + console.print(f"[green]Found {len(eligible)} eligible users for transmission:[/green]") + + for user_id, rel in eligible.items(): + message = controller.generate_transmission_message(user_id) + if message: + console.print(f"\n[cyan]To:[/cyan] {user_id}") + console.print(f"[cyan]Message:[/cyan] {message}") + console.print(f"[cyan]Relationship:[/cyan] {rel.status.value} (score: {rel.score:.2f})") + + if not dry_run: + # In real implementation, send via atproto or other channel + controller.record_transmission(user_id, message, success=True) + console.print("[green]✓ Transmitted[/green]") + else: + console.print("[yellow]→ Would transmit (dry run)[/yellow]") + + +@app.command() +def maintenance( + data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory") +): + """Run daily maintenance tasks""" + persona = get_persona(data_dir) + + console.print("[cyan]Running daily maintenance...[/cyan]") + persona.daily_maintenance() + console.print("[green]✓ Maintenance completed[/green]") + + +@app.command() +def relationships( + data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory") +): + """List all relationships""" + persona = get_persona(data_dir) + + table = Table(title="All Relationships") + table.add_column("User ID", style="cyan") + table.add_column("Status", style="magenta") + table.add_column("Score", style="green") + table.add_column("Transmission", style="yellow") + table.add_column("Last Interaction") + + for user_id, rel in persona.relationships.relationships.items(): + transmission = "✓" if rel.transmission_enabled else "✗" + if rel.is_broken: + transmission = "💔" + + last_interaction = rel.last_interaction.strftime("%Y-%m-%d") if rel.last_interaction else "Never" + + table.add_row( + user_id[:16] + "...", + rel.status.value, + f"{rel.score:.2f}", + transmission, + last_interaction + ) + + console.print(table) + + +@app.command() +def server( + host: str = typer.Option("localhost", "--host", "-h", help="Server host"), + port: int = typer.Option(8000, "--port", "-p", help="Server port"), + data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory"), + model: str = typer.Option("qwen2.5", "--model", "-m", help="AI model to use"), + provider: str = typer.Option("ollama", "--provider", help="AI provider (ollama/openai)") +): + """Run MCP server for AI integration""" + import uvicorn + + if data_dir is None: + data_dir = DEFAULT_DATA_DIR + + data_dir.mkdir(parents=True, exist_ok=True) + + # Create MCP server + mcp_server = AIGptMcpServer(data_dir) + app_instance = mcp_server.get_server().get_app() + + console.print(Panel( + f"[cyan]Starting ai.gpt MCP Server[/cyan]\n\n" + f"Host: {host}:{port}\n" + f"Provider: {provider}\n" + f"Model: {model}\n" + f"Data: {data_dir}", + title="MCP Server", + border_style="green" + )) + + # Store provider info in app state for later use + app_instance.state.ai_provider = provider + app_instance.state.ai_model = model + + # Run server + uvicorn.run(app_instance, host=host, port=port) + + +@app.command() +def schedule( + action: str = typer.Argument(..., help="Action: add, list, enable, disable, remove, run"), + task_type: Optional[str] = typer.Argument(None, help="Task type for add action"), + schedule_expr: Optional[str] = typer.Argument(None, help="Schedule expression (cron or interval)"), + data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory"), + task_id: Optional[str] = typer.Option(None, "--task-id", "-t", help="Task ID"), + provider: Optional[str] = typer.Option(None, "--provider", help="AI provider for transmission"), + model: Optional[str] = typer.Option(None, "--model", "-m", help="AI model for transmission") +): + """Manage scheduled tasks""" + persona = get_persona(data_dir) + scheduler = AIScheduler(persona.data_dir, persona) + + if action == "add": + if not task_type or not schedule_expr: + console.print("[red]Error: task_type and schedule required for add action[/red]") + return + + # Parse task type + try: + task_type_enum = TaskType(task_type) + except ValueError: + console.print(f"[red]Invalid task type. Valid types: {', '.join([t.value for t in TaskType])}[/red]") + return + + # Metadata for transmission tasks + metadata = {} + if task_type_enum == TaskType.TRANSMISSION_CHECK: + metadata["provider"] = provider or "ollama" + metadata["model"] = model or "qwen2.5" + + try: + task = scheduler.add_task(task_type_enum, schedule_expr, task_id, metadata) + console.print(f"[green]✓ Added task {task.task_id}[/green]") + console.print(f"Type: {task.task_type.value}") + console.print(f"Schedule: {task.schedule}") + except ValueError as e: + console.print(f"[red]Error: {e}[/red]") + + elif action == "list": + tasks = scheduler.get_tasks() + if not tasks: + console.print("[yellow]No scheduled tasks[/yellow]") + return + + table = Table(title="Scheduled Tasks") + table.add_column("Task ID", style="cyan") + table.add_column("Type", style="magenta") + table.add_column("Schedule", style="green") + table.add_column("Enabled", style="yellow") + table.add_column("Last Run") + + for task in tasks: + enabled = "✓" if task.enabled else "✗" + last_run = task.last_run.strftime("%Y-%m-%d %H:%M") if task.last_run else "Never" + + table.add_row( + task.task_id[:20] + "..." if len(task.task_id) > 20 else task.task_id, + task.task_type.value, + task.schedule, + enabled, + last_run + ) + + console.print(table) + + elif action == "enable": + if not task_id: + console.print("[red]Error: --task-id required for enable action[/red]") + return + + scheduler.enable_task(task_id) + console.print(f"[green]✓ Enabled task {task_id}[/green]") + + elif action == "disable": + if not task_id: + console.print("[red]Error: --task-id required for disable action[/red]") + return + + scheduler.disable_task(task_id) + console.print(f"[yellow]✓ Disabled task {task_id}[/yellow]") + + elif action == "remove": + if not task_id: + console.print("[red]Error: --task-id required for remove action[/red]") + return + + scheduler.remove_task(task_id) + console.print(f"[red]✓ Removed task {task_id}[/red]") + + elif action == "run": + console.print("[cyan]Starting scheduler daemon...[/cyan]") + console.print("Press Ctrl+C to stop\n") + + import asyncio + + async def run_scheduler(): + scheduler.start() + try: + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + scheduler.stop() + + try: + asyncio.run(run_scheduler()) + except KeyboardInterrupt: + console.print("\n[yellow]Scheduler stopped[/yellow]") + + else: + console.print(f"[red]Unknown action: {action}[/red]") + console.print("Valid actions: add, list, enable, disable, remove, run") + + +@app.command() +def config( + action: str = typer.Argument(..., help="Action: get, set, delete, list"), + key: Optional[str] = typer.Argument(None, help="Configuration key (dot notation)"), + value: Optional[str] = typer.Argument(None, help="Value to set") +): + """Manage configuration settings""" + + if action == "get": + if not key: + console.print("[red]Error: key required for get action[/red]") + return + + val = config.get(key) + if val is None: + console.print(f"[yellow]Key '{key}' not found[/yellow]") + else: + console.print(f"[cyan]{key}[/cyan] = [green]{val}[/green]") + + elif action == "set": + if not key or value is None: + console.print("[red]Error: key and value required for set action[/red]") + return + + # Special handling for sensitive keys + if "password" in key or "api_key" in key: + console.print(f"[cyan]Setting {key}[/cyan] = [dim]***hidden***[/dim]") + else: + console.print(f"[cyan]Setting {key}[/cyan] = [green]{value}[/green]") + + config.set(key, value) + console.print("[green]✓ Configuration saved[/green]") + + elif action == "delete": + if not key: + console.print("[red]Error: key required for delete action[/red]") + return + + if config.delete(key): + console.print(f"[green]✓ Deleted {key}[/green]") + else: + console.print(f"[yellow]Key '{key}' not found[/yellow]") + + elif action == "list": + keys = config.list_keys(key or "") + + if not keys: + console.print("[yellow]No configuration keys found[/yellow]") + return + + table = Table(title="Configuration Settings") + table.add_column("Key", style="cyan") + table.add_column("Value", style="green") + + for k in sorted(keys): + val = config.get(k) + # Hide sensitive values + if "password" in k or "api_key" in k: + display_val = "***hidden***" if val else "not set" + else: + display_val = str(val) if val is not None else "not set" + + table.add_row(k, display_val) + + console.print(table) + + else: + console.print(f"[red]Unknown action: {action}[/red]") + console.print("Valid actions: get, set, delete, list") + + +if __name__ == "__main__": + app() \ No newline at end of file diff --git a/src/aigpt/config.py b/src/aigpt/config.py new file mode 100644 index 0000000..9c94b0b --- /dev/null +++ b/src/aigpt/config.py @@ -0,0 +1,145 @@ +"""Configuration management for ai.gpt""" + +import json +import os +from pathlib import Path +from typing import Optional, Dict, Any +import logging + + +class Config: + """Manages configuration settings""" + + def __init__(self, config_dir: Optional[Path] = None): + if config_dir is None: + config_dir = Path.home() / ".config" / "syui" / "ai" / "gpt" + + self.config_dir = config_dir + self.config_file = config_dir / "config.json" + self.data_dir = config_dir / "data" + + # Create directories if they don't exist + self.config_dir.mkdir(parents=True, exist_ok=True) + self.data_dir.mkdir(parents=True, exist_ok=True) + + self.logger = logging.getLogger(__name__) + self._config: Dict[str, Any] = {} + self._load_config() + + def _load_config(self): + """Load configuration from file""" + if self.config_file.exists(): + try: + with open(self.config_file, 'r', encoding='utf-8') as f: + self._config = json.load(f) + except Exception as e: + self.logger.error(f"Failed to load config: {e}") + self._config = {} + else: + # Initialize with default config + self._config = { + "providers": { + "openai": { + "api_key": None, + "default_model": "gpt-4o-mini" + }, + "ollama": { + "host": "http://localhost:11434", + "default_model": "qwen2.5" + } + }, + "atproto": { + "handle": None, + "password": None, + "host": "https://bsky.social" + }, + "default_provider": "ollama" + } + self._save_config() + + def _save_config(self): + """Save configuration to file""" + try: + with open(self.config_file, 'w', encoding='utf-8') as f: + json.dump(self._config, f, indent=2) + except Exception as e: + self.logger.error(f"Failed to save config: {e}") + + def get(self, key: str, default: Any = None) -> Any: + """Get configuration value using dot notation""" + keys = key.split('.') + value = self._config + + for k in keys: + if isinstance(value, dict) and k in value: + value = value[k] + else: + return default + + return value + + def set(self, key: str, value: Any): + """Set configuration value using dot notation""" + keys = key.split('.') + config = self._config + + # Navigate to the parent dictionary + for k in keys[:-1]: + if k not in config: + config[k] = {} + config = config[k] + + # Set the value + config[keys[-1]] = value + self._save_config() + + def delete(self, key: str) -> bool: + """Delete configuration value""" + keys = key.split('.') + config = self._config + + # Navigate to the parent dictionary + for k in keys[:-1]: + if k not in config: + return False + config = config[k] + + # Delete the key if it exists + if keys[-1] in config: + del config[keys[-1]] + self._save_config() + return True + + return False + + def list_keys(self, prefix: str = "") -> list[str]: + """List all configuration keys with optional prefix""" + def _get_keys(config: dict, current_prefix: str = "") -> list[str]: + keys = [] + for k, v in config.items(): + full_key = f"{current_prefix}.{k}" if current_prefix else k + if isinstance(v, dict): + keys.extend(_get_keys(v, full_key)) + else: + keys.append(full_key) + return keys + + all_keys = _get_keys(self._config) + + if prefix: + return [k for k in all_keys if k.startswith(prefix)] + return all_keys + + def get_api_key(self, provider: str) -> Optional[str]: + """Get API key for a specific provider""" + key = self.get(f"providers.{provider}.api_key") + + # Also check environment variables + if not key and provider == "openai": + key = os.getenv("OPENAI_API_KEY") + + return key + + def get_provider_config(self, provider: str) -> Dict[str, Any]: + """Get complete configuration for a provider""" + return self.get(f"providers.{provider}", {}) \ No newline at end of file diff --git a/src/aigpt/fortune.py b/src/aigpt/fortune.py new file mode 100644 index 0000000..0bb1e40 --- /dev/null +++ b/src/aigpt/fortune.py @@ -0,0 +1,118 @@ +"""AI Fortune system for daily personality variations""" + +import json +import random +from datetime import date, datetime, timedelta +from pathlib import Path +from typing import Optional +import logging + +from .models import AIFortune + + +class FortuneSystem: + """Manages daily AI fortune affecting personality""" + + def __init__(self, data_dir: Path): + self.data_dir = data_dir + self.fortune_file = data_dir / "fortunes.json" + self.fortunes: dict[str, AIFortune] = {} + self.logger = logging.getLogger(__name__) + self._load_fortunes() + + def _load_fortunes(self): + """Load fortune history from storage""" + if self.fortune_file.exists(): + with open(self.fortune_file, 'r', encoding='utf-8') as f: + data = json.load(f) + for date_str, fortune_data in data.items(): + # Convert date string back to date object + fortune_data['date'] = datetime.fromisoformat(fortune_data['date']).date() + self.fortunes[date_str] = AIFortune(**fortune_data) + + def _save_fortunes(self): + """Save fortune history to storage""" + data = {} + for date_str, fortune in self.fortunes.items(): + fortune_dict = fortune.model_dump(mode='json') + fortune_dict['date'] = fortune.date.isoformat() + data[date_str] = fortune_dict + + with open(self.fortune_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2) + + def get_today_fortune(self) -> AIFortune: + """Get or generate today's fortune""" + today = date.today() + today_str = today.isoformat() + + if today_str in self.fortunes: + return self.fortunes[today_str] + + # Generate new fortune + fortune_value = random.randint(1, 10) + + # Check yesterday's fortune for consecutive tracking + yesterday = (today - timedelta(days=1)) + yesterday_str = yesterday.isoformat() + + consecutive_good = 0 + consecutive_bad = 0 + breakthrough_triggered = False + + if yesterday_str in self.fortunes: + yesterday_fortune = self.fortunes[yesterday_str] + + if fortune_value >= 7: # Good fortune + if yesterday_fortune.fortune_value >= 7: + consecutive_good = yesterday_fortune.consecutive_good + 1 + else: + consecutive_good = 1 + elif fortune_value <= 3: # Bad fortune + if yesterday_fortune.fortune_value <= 3: + consecutive_bad = yesterday_fortune.consecutive_bad + 1 + else: + consecutive_bad = 1 + + # Check breakthrough conditions + if consecutive_good >= 3: + breakthrough_triggered = True + self.logger.info("Breakthrough! 3 consecutive good fortunes!") + fortune_value = 10 # Max fortune on breakthrough + elif consecutive_bad >= 3: + breakthrough_triggered = True + self.logger.info("Breakthrough! 3 consecutive bad fortunes!") + fortune_value = random.randint(7, 10) # Good fortune after bad streak + + fortune = AIFortune( + date=today, + fortune_value=fortune_value, + consecutive_good=consecutive_good, + consecutive_bad=consecutive_bad, + breakthrough_triggered=breakthrough_triggered + ) + + self.fortunes[today_str] = fortune + self._save_fortunes() + + self.logger.info(f"Today's fortune: {fortune_value}/10") + return fortune + + def get_personality_modifier(self, fortune: AIFortune) -> dict[str, float]: + """Get personality modifiers based on fortune""" + base_modifier = fortune.fortune_value / 10.0 + + modifiers = { + "optimism": base_modifier, + "energy": base_modifier * 0.8, + "patience": 1.0 - (abs(5.5 - fortune.fortune_value) * 0.1), + "creativity": 0.5 + (base_modifier * 0.5), + "empathy": 0.7 + (base_modifier * 0.3) + } + + # Breakthrough effects + if fortune.breakthrough_triggered: + modifiers["confidence"] = 1.0 + modifiers["spontaneity"] = 0.9 + + return modifiers \ No newline at end of file diff --git a/src/aigpt/mcp_server.py b/src/aigpt/mcp_server.py new file mode 100644 index 0000000..7b999fa --- /dev/null +++ b/src/aigpt/mcp_server.py @@ -0,0 +1,149 @@ +"""MCP Server for ai.gpt system""" + +from typing import Optional, List, Dict, Any +from fastapi_mcp import FastapiMcpServer +from pathlib import Path +import logging + +from .persona import Persona +from .models import Memory, Relationship, PersonaState + +logger = logging.getLogger(__name__) + + +class AIGptMcpServer: + """MCP Server that exposes ai.gpt functionality to AI assistants""" + + def __init__(self, data_dir: Path): + self.data_dir = data_dir + self.persona = Persona(data_dir) + self.server = FastapiMcpServer("ai-gpt", "AI.GPT Memory and Relationship System") + self._register_tools() + + def _register_tools(self): + """Register all MCP tools""" + + @self.server.tool("get_memories") + async def get_memories(user_id: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]: + """Get active memories from the AI's memory system""" + memories = self.persona.memory.get_active_memories(limit=limit) + return [ + { + "id": mem.id, + "content": mem.content, + "level": mem.level.value, + "importance": mem.importance_score, + "is_core": mem.is_core, + "timestamp": mem.timestamp.isoformat() + } + for mem in memories + ] + + @self.server.tool("get_relationship") + async def get_relationship(user_id: str) -> Dict[str, Any]: + """Get relationship status with a specific user""" + rel = self.persona.relationships.get_or_create_relationship(user_id) + return { + "user_id": rel.user_id, + "status": rel.status.value, + "score": rel.score, + "transmission_enabled": rel.transmission_enabled, + "is_broken": rel.is_broken, + "total_interactions": rel.total_interactions, + "last_interaction": rel.last_interaction.isoformat() if rel.last_interaction else None + } + + @self.server.tool("get_all_relationships") + async def get_all_relationships() -> List[Dict[str, Any]]: + """Get all relationships""" + relationships = [] + for user_id, rel in self.persona.relationships.relationships.items(): + relationships.append({ + "user_id": user_id, + "status": rel.status.value, + "score": rel.score, + "transmission_enabled": rel.transmission_enabled, + "is_broken": rel.is_broken + }) + return relationships + + @self.server.tool("get_persona_state") + async def get_persona_state() -> Dict[str, Any]: + """Get current persona state including fortune and mood""" + state = self.persona.get_current_state() + return { + "mood": state.current_mood, + "fortune": { + "value": state.fortune.fortune_value, + "date": state.fortune.date.isoformat(), + "breakthrough": state.fortune.breakthrough_triggered + }, + "personality": state.base_personality, + "active_memory_count": len(state.active_memories) + } + + @self.server.tool("process_interaction") + async def process_interaction(user_id: str, message: str) -> Dict[str, Any]: + """Process an interaction with a user""" + response, relationship_delta = self.persona.process_interaction(user_id, message) + rel = self.persona.relationships.get_or_create_relationship(user_id) + + return { + "response": response, + "relationship_delta": relationship_delta, + "new_relationship_score": rel.score, + "transmission_enabled": rel.transmission_enabled, + "relationship_status": rel.status.value + } + + @self.server.tool("check_transmission_eligibility") + async def check_transmission_eligibility(user_id: str) -> Dict[str, Any]: + """Check if AI can transmit to a specific user""" + can_transmit = self.persona.can_transmit_to(user_id) + rel = self.persona.relationships.get_or_create_relationship(user_id) + + return { + "can_transmit": can_transmit, + "relationship_score": rel.score, + "threshold": rel.threshold, + "is_broken": rel.is_broken, + "transmission_enabled": rel.transmission_enabled + } + + @self.server.tool("get_fortune") + async def get_fortune() -> Dict[str, Any]: + """Get today's AI fortune""" + fortune = self.persona.fortune_system.get_today_fortune() + modifiers = self.persona.fortune_system.get_personality_modifier(fortune) + + return { + "value": fortune.fortune_value, + "date": fortune.date.isoformat(), + "consecutive_good": fortune.consecutive_good, + "consecutive_bad": fortune.consecutive_bad, + "breakthrough": fortune.breakthrough_triggered, + "personality_modifiers": modifiers + } + + @self.server.tool("summarize_memories") + async def summarize_memories(user_id: str) -> Optional[Dict[str, Any]]: + """Create a summary of recent memories for a user""" + summary = self.persona.memory.summarize_memories(user_id) + if summary: + return { + "id": summary.id, + "content": summary.content, + "level": summary.level.value, + "timestamp": summary.timestamp.isoformat() + } + return None + + @self.server.tool("run_maintenance") + async def run_maintenance() -> Dict[str, str]: + """Run daily maintenance tasks""" + self.persona.daily_maintenance() + return {"status": "Maintenance completed successfully"} + + def get_server(self) -> FastapiMcpServer: + """Get the FastAPI MCP server instance""" + return self.server \ No newline at end of file diff --git a/src/aigpt/memory.py b/src/aigpt/memory.py new file mode 100644 index 0000000..c5f5e3e --- /dev/null +++ b/src/aigpt/memory.py @@ -0,0 +1,155 @@ +"""Memory management system for ai.gpt""" + +import json +import hashlib +from datetime import datetime, timedelta +from pathlib import Path +from typing import List, Optional, Dict, Any +import logging + +from .models import Memory, MemoryLevel, Conversation + + +class MemoryManager: + """Manages AI's memory with hierarchical storage and forgetting""" + + def __init__(self, data_dir: Path): + self.data_dir = data_dir + self.memories_file = data_dir / "memories.json" + self.conversations_file = data_dir / "conversations.json" + self.memories: Dict[str, Memory] = {} + self.conversations: List[Conversation] = [] + self.logger = logging.getLogger(__name__) + self._load_memories() + + def _load_memories(self): + """Load memories from persistent storage""" + if self.memories_file.exists(): + with open(self.memories_file, 'r', encoding='utf-8') as f: + data = json.load(f) + for mem_data in data: + memory = Memory(**mem_data) + self.memories[memory.id] = memory + + if self.conversations_file.exists(): + with open(self.conversations_file, 'r', encoding='utf-8') as f: + data = json.load(f) + self.conversations = [Conversation(**conv) for conv in data] + + def _save_memories(self): + """Save memories to persistent storage""" + memories_data = [mem.model_dump(mode='json') for mem in self.memories.values()] + with open(self.memories_file, 'w', encoding='utf-8') as f: + json.dump(memories_data, f, indent=2, default=str) + + conv_data = [conv.model_dump(mode='json') for conv in self.conversations] + with open(self.conversations_file, 'w', encoding='utf-8') as f: + json.dump(conv_data, f, indent=2, default=str) + + def add_conversation(self, conversation: Conversation) -> Memory: + """Add a conversation and create memory from it""" + self.conversations.append(conversation) + + # Create memory from conversation + memory_id = hashlib.sha256( + f"{conversation.id}{conversation.timestamp}".encode() + ).hexdigest()[:16] + + memory = Memory( + id=memory_id, + timestamp=conversation.timestamp, + content=f"User: {conversation.user_message}\nAI: {conversation.ai_response}", + level=MemoryLevel.FULL_LOG, + importance_score=abs(conversation.relationship_delta) * 0.1 + ) + + self.memories[memory.id] = memory + self._save_memories() + return memory + + def summarize_memories(self, user_id: str) -> Optional[Memory]: + """Create summary from recent memories""" + recent_memories = [ + mem for mem in self.memories.values() + if mem.level == MemoryLevel.FULL_LOG + and (datetime.now() - mem.timestamp).days < 7 + ] + + if len(recent_memories) < 5: + return None + + # Simple summary creation (in real implementation, use AI) + summary_content = f"Summary of {len(recent_memories)} recent interactions" + summary_id = hashlib.sha256( + f"summary_{datetime.now().isoformat()}".encode() + ).hexdigest()[:16] + + summary = Memory( + id=summary_id, + timestamp=datetime.now(), + content=summary_content, + summary=summary_content, + level=MemoryLevel.SUMMARY, + importance_score=0.5 + ) + + self.memories[summary.id] = summary + + # Mark summarized memories for potential forgetting + for mem in recent_memories: + mem.importance_score *= 0.9 + + self._save_memories() + return summary + + def identify_core_memories(self) -> List[Memory]: + """Identify memories that should become core (never forgotten)""" + core_candidates = [ + mem for mem in self.memories.values() + if mem.importance_score > 0.8 + and not mem.is_core + and mem.level != MemoryLevel.FORGOTTEN + ] + + for memory in core_candidates: + memory.is_core = True + memory.level = MemoryLevel.CORE + self.logger.info(f"Memory {memory.id} promoted to core") + + self._save_memories() + return core_candidates + + def apply_forgetting(self): + """Apply selective forgetting based on importance and time""" + now = datetime.now() + + for memory in self.memories.values(): + if memory.is_core or memory.level == MemoryLevel.FORGOTTEN: + continue + + # Time-based decay + age_days = (now - memory.timestamp).days + decay_factor = memory.decay_rate * age_days + memory.importance_score -= decay_factor + + # Forget unimportant old memories + if memory.importance_score <= 0.1 and age_days > 30: + memory.level = MemoryLevel.FORGOTTEN + self.logger.info(f"Memory {memory.id} forgotten") + + self._save_memories() + + def get_active_memories(self, limit: int = 10) -> List[Memory]: + """Get currently active memories for persona""" + active = [ + mem for mem in self.memories.values() + if mem.level != MemoryLevel.FORGOTTEN + ] + + # Sort by importance and recency + active.sort( + key=lambda m: (m.is_core, m.importance_score, m.timestamp), + reverse=True + ) + + return active[:limit] \ No newline at end of file diff --git a/src/aigpt/models.py b/src/aigpt/models.py new file mode 100644 index 0000000..7cf666b --- /dev/null +++ b/src/aigpt/models.py @@ -0,0 +1,79 @@ +"""Data models for ai.gpt system""" + +from datetime import datetime +from typing import Optional, Dict, List, Any +from enum import Enum +from pydantic import BaseModel, Field + + +class MemoryLevel(str, Enum): + """Memory importance levels""" + FULL_LOG = "full_log" + SUMMARY = "summary" + CORE = "core" + FORGOTTEN = "forgotten" + + +class RelationshipStatus(str, Enum): + """Relationship status levels""" + STRANGER = "stranger" + ACQUAINTANCE = "acquaintance" + FRIEND = "friend" + CLOSE_FRIEND = "close_friend" + BROKEN = "broken" # 不可逆 + + +class Memory(BaseModel): + """Single memory unit""" + id: str + timestamp: datetime + content: str + summary: Optional[str] = None + level: MemoryLevel = MemoryLevel.FULL_LOG + importance_score: float = Field(ge=0.0, le=1.0) + is_core: bool = False + decay_rate: float = 0.01 + + +class Relationship(BaseModel): + """Relationship with a specific user""" + user_id: str # atproto DID + status: RelationshipStatus = RelationshipStatus.STRANGER + score: float = 0.0 + daily_interactions: int = 0 + total_interactions: int = 0 + last_interaction: Optional[datetime] = None + transmission_enabled: bool = False + threshold: float = 100.0 + decay_rate: float = 0.1 + daily_limit: int = 10 + is_broken: bool = False + + +class AIFortune(BaseModel): + """Daily AI fortune affecting personality""" + date: datetime.date + fortune_value: int = Field(ge=1, le=10) + consecutive_good: int = 0 + consecutive_bad: int = 0 + breakthrough_triggered: bool = False + + +class PersonaState(BaseModel): + """Current persona state""" + base_personality: Dict[str, float] + current_mood: str + fortune: AIFortune + active_memories: List[str] # Memory IDs + relationship_modifiers: Dict[str, float] + + +class Conversation(BaseModel): + """Conversation log entry""" + id: str + user_id: str + timestamp: datetime + user_message: str + ai_response: str + relationship_delta: float = 0.0 + memory_created: bool = False \ No newline at end of file diff --git a/src/aigpt/persona.py b/src/aigpt/persona.py new file mode 100644 index 0000000..88f0561 --- /dev/null +++ b/src/aigpt/persona.py @@ -0,0 +1,181 @@ +"""Persona management system integrating memory, relationships, and fortune""" + +import json +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional +import logging + +from .models import PersonaState, Conversation +from .memory import MemoryManager +from .relationship import RelationshipTracker +from .fortune import FortuneSystem + + +class Persona: + """AI persona with unique characteristics based on interactions""" + + def __init__(self, data_dir: Path, name: str = "ai"): + self.data_dir = data_dir + self.name = name + self.memory = MemoryManager(data_dir) + self.relationships = RelationshipTracker(data_dir) + self.fortune_system = FortuneSystem(data_dir) + self.logger = logging.getLogger(__name__) + + # Base personality traits + self.base_personality = { + "curiosity": 0.7, + "empathy": 0.8, + "creativity": 0.6, + "patience": 0.7, + "optimism": 0.6 + } + + self.state_file = data_dir / "persona_state.json" + self._load_state() + + def _load_state(self): + """Load persona state from storage""" + if self.state_file.exists(): + with open(self.state_file, 'r', encoding='utf-8') as f: + data = json.load(f) + self.base_personality = data.get("base_personality", self.base_personality) + + def _save_state(self): + """Save persona state to storage""" + state_data = { + "base_personality": self.base_personality, + "last_updated": datetime.now().isoformat() + } + with open(self.state_file, 'w', encoding='utf-8') as f: + json.dump(state_data, f, indent=2) + + def get_current_state(self) -> PersonaState: + """Get current persona state including all modifiers""" + # Get today's fortune + fortune = self.fortune_system.get_today_fortune() + fortune_modifiers = self.fortune_system.get_personality_modifier(fortune) + + # Apply fortune modifiers to base personality + current_personality = {} + for trait, base_value in self.base_personality.items(): + modifier = fortune_modifiers.get(trait, 1.0) + current_personality[trait] = min(1.0, base_value * modifier) + + # Get active memories for context + active_memories = self.memory.get_active_memories(limit=5) + + # Determine mood based on fortune and recent interactions + mood = self._determine_mood(fortune.fortune_value) + + state = PersonaState( + base_personality=current_personality, + current_mood=mood, + fortune=fortune, + active_memories=[mem.id for mem in active_memories], + relationship_modifiers={} + ) + + return state + + def _determine_mood(self, fortune_value: int) -> str: + """Determine current mood based on fortune and other factors""" + if fortune_value >= 8: + return "joyful" + elif fortune_value >= 6: + return "cheerful" + elif fortune_value >= 4: + return "neutral" + elif fortune_value >= 2: + return "melancholic" + else: + return "contemplative" + + def process_interaction(self, user_id: str, message: str, ai_provider=None) -> tuple[str, float]: + """Process user interaction and generate response""" + # Get current state + state = self.get_current_state() + + # Get relationship with user + relationship = self.relationships.get_or_create_relationship(user_id) + + # Simple response generation (use AI provider if available) + if relationship.is_broken: + response = "..." + relationship_delta = 0.0 + else: + if ai_provider: + # Use AI provider for response generation + memories = self.memory.get_active_memories(limit=5) + import asyncio + response = asyncio.run( + ai_provider.generate_response(message, state, memories) + ) + # Calculate relationship delta based on interaction quality + if state.current_mood in ["joyful", "cheerful"]: + relationship_delta = 2.0 + elif relationship.status.value == "close_friend": + relationship_delta = 1.5 + else: + relationship_delta = 1.0 + else: + # Fallback to simple responses + if state.current_mood == "joyful": + response = f"What a wonderful day! {message} sounds interesting!" + relationship_delta = 2.0 + elif relationship.status.value == "close_friend": + response = f"I've been thinking about our conversations. {message}" + relationship_delta = 1.5 + else: + response = f"I understand. {message}" + relationship_delta = 1.0 + + # Create conversation record + conv_id = f"{user_id}_{datetime.now().timestamp()}" + conversation = Conversation( + id=conv_id, + user_id=user_id, + timestamp=datetime.now(), + user_message=message, + ai_response=response, + relationship_delta=relationship_delta, + memory_created=True + ) + + # Update memory + self.memory.add_conversation(conversation) + + # Update relationship + self.relationships.update_interaction(user_id, relationship_delta) + + return response, relationship_delta + + def can_transmit_to(self, user_id: str) -> bool: + """Check if AI can transmit messages to this user""" + relationship = self.relationships.get_or_create_relationship(user_id) + return relationship.transmission_enabled and not relationship.is_broken + + def daily_maintenance(self): + """Perform daily maintenance tasks""" + self.logger.info("Performing daily maintenance...") + + # Apply time decay to relationships + self.relationships.apply_time_decay() + + # Apply forgetting to memories + self.memory.apply_forgetting() + + # Identify core memories + core_memories = self.memory.identify_core_memories() + if core_memories: + self.logger.info(f"Identified {len(core_memories)} new core memories") + + # Create memory summaries + for user_id in self.relationships.relationships: + summary = self.memory.summarize_memories(user_id) + if summary: + self.logger.info(f"Created summary for interactions with {user_id}") + + self._save_state() + self.logger.info("Daily maintenance completed") \ No newline at end of file diff --git a/src/aigpt/relationship.py b/src/aigpt/relationship.py new file mode 100644 index 0000000..31dac43 --- /dev/null +++ b/src/aigpt/relationship.py @@ -0,0 +1,135 @@ +"""Relationship tracking system with irreversible damage""" + +import json +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, Optional +import logging + +from .models import Relationship, RelationshipStatus + + +class RelationshipTracker: + """Tracks and manages relationships with users""" + + def __init__(self, data_dir: Path): + self.data_dir = data_dir + self.relationships_file = data_dir / "relationships.json" + self.relationships: Dict[str, Relationship] = {} + self.logger = logging.getLogger(__name__) + self._load_relationships() + + def _load_relationships(self): + """Load relationships from persistent storage""" + if self.relationships_file.exists(): + with open(self.relationships_file, 'r', encoding='utf-8') as f: + data = json.load(f) + for user_id, rel_data in data.items(): + self.relationships[user_id] = Relationship(**rel_data) + + def _save_relationships(self): + """Save relationships to persistent storage""" + data = { + user_id: rel.model_dump(mode='json') + for user_id, rel in self.relationships.items() + } + with open(self.relationships_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, default=str) + + def get_or_create_relationship(self, user_id: str) -> Relationship: + """Get existing relationship or create new one""" + if user_id not in self.relationships: + self.relationships[user_id] = Relationship(user_id=user_id) + self._save_relationships() + return self.relationships[user_id] + + def update_interaction(self, user_id: str, delta: float) -> Relationship: + """Update relationship based on interaction""" + rel = self.get_or_create_relationship(user_id) + + # Check if relationship is broken (irreversible) + if rel.is_broken: + self.logger.warning(f"Relationship with {user_id} is broken. No updates allowed.") + return rel + + # Check daily limit + if rel.last_interaction and rel.last_interaction.date() == datetime.now().date(): + if rel.daily_interactions >= rel.daily_limit: + self.logger.info(f"Daily interaction limit reached for {user_id}") + return rel + else: + rel.daily_interactions = 0 + + # Update interaction counts + rel.daily_interactions += 1 + rel.total_interactions += 1 + rel.last_interaction = datetime.now() + + # Update score with bounds + old_score = rel.score + rel.score += delta + rel.score = max(0.0, min(200.0, rel.score)) # 0-200 range + + # Check for relationship damage + if delta < -10.0: # Significant negative interaction + self.logger.warning(f"Major relationship damage with {user_id}: {delta}") + if rel.score <= 0: + rel.is_broken = True + rel.status = RelationshipStatus.BROKEN + rel.transmission_enabled = False + self.logger.error(f"Relationship with {user_id} is now BROKEN (irreversible)") + + # Update relationship status based on score + if not rel.is_broken: + if rel.score >= 150: + rel.status = RelationshipStatus.CLOSE_FRIEND + elif rel.score >= 100: + rel.status = RelationshipStatus.FRIEND + elif rel.score >= 50: + rel.status = RelationshipStatus.ACQUAINTANCE + else: + rel.status = RelationshipStatus.STRANGER + + # Check transmission threshold + if rel.score >= rel.threshold and not rel.transmission_enabled: + rel.transmission_enabled = True + self.logger.info(f"Transmission enabled for {user_id}!") + + self._save_relationships() + return rel + + def apply_time_decay(self): + """Apply time-based decay to all relationships""" + now = datetime.now() + + for user_id, rel in self.relationships.items(): + if rel.is_broken or not rel.last_interaction: + continue + + # Calculate days since last interaction + days_inactive = (now - rel.last_interaction).days + + if days_inactive > 0: + # Apply decay + decay_amount = rel.decay_rate * days_inactive + old_score = rel.score + rel.score = max(0.0, rel.score - decay_amount) + + # Update status if score dropped + if rel.score < rel.threshold: + rel.transmission_enabled = False + + if decay_amount > 0: + self.logger.info( + f"Applied decay to {user_id}: {old_score:.2f} -> {rel.score:.2f}" + ) + + self._save_relationships() + + def get_transmission_eligible(self) -> Dict[str, Relationship]: + """Get all relationships eligible for transmission""" + return { + user_id: rel + for user_id, rel in self.relationships.items() + if rel.transmission_enabled and not rel.is_broken + } \ No newline at end of file diff --git a/src/aigpt/scheduler.py b/src/aigpt/scheduler.py new file mode 100644 index 0000000..df26cf4 --- /dev/null +++ b/src/aigpt/scheduler.py @@ -0,0 +1,312 @@ +"""Scheduler for autonomous AI tasks""" + +import json +import asyncio +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, List, Optional, Any, Callable +from enum import Enum +import logging + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger +from croniter import croniter + +from .persona import Persona +from .transmission import TransmissionController +from .ai_provider import create_ai_provider + + +class TaskType(str, Enum): + """Types of scheduled tasks""" + TRANSMISSION_CHECK = "transmission_check" + MAINTENANCE = "maintenance" + FORTUNE_UPDATE = "fortune_update" + RELATIONSHIP_DECAY = "relationship_decay" + MEMORY_SUMMARY = "memory_summary" + CUSTOM = "custom" + + +class ScheduledTask: + """Represents a scheduled task""" + + def __init__( + self, + task_id: str, + task_type: TaskType, + schedule: str, # Cron expression or interval + enabled: bool = True, + last_run: Optional[datetime] = None, + next_run: Optional[datetime] = None, + metadata: Optional[Dict[str, Any]] = None + ): + self.task_id = task_id + self.task_type = task_type + self.schedule = schedule + self.enabled = enabled + self.last_run = last_run + self.next_run = next_run + self.metadata = metadata or {} + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for storage""" + return { + "task_id": self.task_id, + "task_type": self.task_type.value, + "schedule": self.schedule, + "enabled": self.enabled, + "last_run": self.last_run.isoformat() if self.last_run else None, + "next_run": self.next_run.isoformat() if self.next_run else None, + "metadata": self.metadata + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "ScheduledTask": + """Create from dictionary""" + return cls( + task_id=data["task_id"], + task_type=TaskType(data["task_type"]), + schedule=data["schedule"], + enabled=data.get("enabled", True), + last_run=datetime.fromisoformat(data["last_run"]) if data.get("last_run") else None, + next_run=datetime.fromisoformat(data["next_run"]) if data.get("next_run") else None, + metadata=data.get("metadata", {}) + ) + + +class AIScheduler: + """Manages scheduled tasks for the AI system""" + + def __init__(self, data_dir: Path, persona: Persona): + self.data_dir = data_dir + self.persona = persona + self.tasks_file = data_dir / "scheduled_tasks.json" + self.tasks: Dict[str, ScheduledTask] = {} + self.scheduler = AsyncIOScheduler() + self.logger = logging.getLogger(__name__) + self._load_tasks() + + # Task handlers + self.task_handlers: Dict[TaskType, Callable] = { + TaskType.TRANSMISSION_CHECK: self._handle_transmission_check, + TaskType.MAINTENANCE: self._handle_maintenance, + TaskType.FORTUNE_UPDATE: self._handle_fortune_update, + TaskType.RELATIONSHIP_DECAY: self._handle_relationship_decay, + TaskType.MEMORY_SUMMARY: self._handle_memory_summary, + } + + def _load_tasks(self): + """Load scheduled tasks from storage""" + if self.tasks_file.exists(): + with open(self.tasks_file, 'r', encoding='utf-8') as f: + data = json.load(f) + for task_data in data: + task = ScheduledTask.from_dict(task_data) + self.tasks[task.task_id] = task + + def _save_tasks(self): + """Save scheduled tasks to storage""" + tasks_data = [task.to_dict() for task in self.tasks.values()] + with open(self.tasks_file, 'w', encoding='utf-8') as f: + json.dump(tasks_data, f, indent=2, default=str) + + def add_task( + self, + task_type: TaskType, + schedule: str, + task_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None + ) -> ScheduledTask: + """Add a new scheduled task""" + if task_id is None: + task_id = f"{task_type.value}_{datetime.now().timestamp()}" + + # Validate schedule + if not self._validate_schedule(schedule): + raise ValueError(f"Invalid schedule expression: {schedule}") + + task = ScheduledTask( + task_id=task_id, + task_type=task_type, + schedule=schedule, + metadata=metadata + ) + + self.tasks[task_id] = task + self._save_tasks() + + # Schedule the task if scheduler is running + if self.scheduler.running: + self._schedule_task(task) + + self.logger.info(f"Added task {task_id} with schedule {schedule}") + return task + + def _validate_schedule(self, schedule: str) -> bool: + """Validate schedule expression""" + # Check if it's a cron expression + if ' ' in schedule: + try: + croniter(schedule) + return True + except: + return False + + # Check if it's an interval expression (e.g., "5m", "1h", "2d") + import re + pattern = r'^\d+[smhd]$' + return bool(re.match(pattern, schedule)) + + def _parse_interval(self, interval: str) -> int: + """Parse interval string to seconds""" + unit = interval[-1] + value = int(interval[:-1]) + + multipliers = { + 's': 1, + 'm': 60, + 'h': 3600, + 'd': 86400 + } + + return value * multipliers.get(unit, 1) + + def _schedule_task(self, task: ScheduledTask): + """Schedule a task with APScheduler""" + if not task.enabled: + return + + handler = self.task_handlers.get(task.task_type) + if not handler: + self.logger.warning(f"No handler for task type {task.task_type}") + return + + # Determine trigger + if ' ' in task.schedule: + # Cron expression + trigger = CronTrigger.from_crontab(task.schedule) + else: + # Interval expression + seconds = self._parse_interval(task.schedule) + trigger = IntervalTrigger(seconds=seconds) + + # Add job + self.scheduler.add_job( + lambda: asyncio.create_task(self._run_task(task)), + trigger=trigger, + id=task.task_id, + replace_existing=True + ) + + async def _run_task(self, task: ScheduledTask): + """Run a scheduled task""" + self.logger.info(f"Running task {task.task_id}") + + task.last_run = datetime.now() + + try: + handler = self.task_handlers.get(task.task_type) + if handler: + await handler(task) + else: + self.logger.warning(f"No handler for task type {task.task_type}") + except Exception as e: + self.logger.error(f"Error running task {task.task_id}: {e}") + + self._save_tasks() + + async def _handle_transmission_check(self, task: ScheduledTask): + """Check and execute autonomous transmissions""" + controller = TransmissionController(self.persona, self.data_dir) + eligible = controller.check_transmission_eligibility() + + # Get AI provider from metadata + provider_name = task.metadata.get("provider", "ollama") + model = task.metadata.get("model", "qwen2.5") + + try: + ai_provider = create_ai_provider(provider_name, model) + except: + ai_provider = None + + for user_id, rel in eligible.items(): + message = controller.generate_transmission_message(user_id) + if message: + # For now, just print the message + print(f"\n🤖 [AI Transmission] {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"To: {user_id}") + print(f"Relationship: {rel.status.value} (score: {rel.score:.2f})") + print(f"Message: {message}") + print("-" * 50) + + controller.record_transmission(user_id, message, success=True) + self.logger.info(f"Transmitted to {user_id}: {message}") + + async def _handle_maintenance(self, task: ScheduledTask): + """Run daily maintenance""" + self.persona.daily_maintenance() + self.logger.info("Daily maintenance completed") + + async def _handle_fortune_update(self, task: ScheduledTask): + """Update AI fortune""" + fortune = self.persona.fortune_system.get_today_fortune() + self.logger.info(f"Fortune updated: {fortune.fortune_value}/10") + + async def _handle_relationship_decay(self, task: ScheduledTask): + """Apply relationship decay""" + self.persona.relationships.apply_time_decay() + self.logger.info("Relationship decay applied") + + async def _handle_memory_summary(self, task: ScheduledTask): + """Create memory summaries""" + for user_id in self.persona.relationships.relationships: + summary = self.persona.memory.summarize_memories(user_id) + if summary: + self.logger.info(f"Created memory summary for {user_id}") + + def start(self): + """Start the scheduler""" + # Schedule all enabled tasks + for task in self.tasks.values(): + if task.enabled: + self._schedule_task(task) + + self.scheduler.start() + self.logger.info("Scheduler started") + + def stop(self): + """Stop the scheduler""" + self.scheduler.shutdown() + self.logger.info("Scheduler stopped") + + def get_tasks(self) -> List[ScheduledTask]: + """Get all scheduled tasks""" + return list(self.tasks.values()) + + def enable_task(self, task_id: str): + """Enable a task""" + if task_id in self.tasks: + self.tasks[task_id].enabled = True + self._save_tasks() + if self.scheduler.running: + self._schedule_task(self.tasks[task_id]) + + def disable_task(self, task_id: str): + """Disable a task""" + if task_id in self.tasks: + self.tasks[task_id].enabled = False + self._save_tasks() + if self.scheduler.running: + self.scheduler.remove_job(task_id) + + def remove_task(self, task_id: str): + """Remove a task""" + if task_id in self.tasks: + del self.tasks[task_id] + self._save_tasks() + if self.scheduler.running: + try: + self.scheduler.remove_job(task_id) + except: + pass \ No newline at end of file diff --git a/src/aigpt/transmission.py b/src/aigpt/transmission.py new file mode 100644 index 0000000..6eba250 --- /dev/null +++ b/src/aigpt/transmission.py @@ -0,0 +1,111 @@ +"""Transmission controller for autonomous message sending""" + +import json +from datetime import datetime +from pathlib import Path +from typing import List, Dict, Optional +import logging + +from .models import Relationship +from .persona import Persona + + +class TransmissionController: + """Controls when and how AI transmits messages autonomously""" + + def __init__(self, persona: Persona, data_dir: Path): + self.persona = persona + self.data_dir = data_dir + self.transmission_log_file = data_dir / "transmissions.json" + self.transmissions: List[Dict] = [] + self.logger = logging.getLogger(__name__) + self._load_transmissions() + + def _load_transmissions(self): + """Load transmission history""" + if self.transmission_log_file.exists(): + with open(self.transmission_log_file, 'r', encoding='utf-8') as f: + self.transmissions = json.load(f) + + def _save_transmissions(self): + """Save transmission history""" + with open(self.transmission_log_file, 'w', encoding='utf-8') as f: + json.dump(self.transmissions, f, indent=2, default=str) + + def check_transmission_eligibility(self) -> Dict[str, Relationship]: + """Check which users are eligible for transmission""" + eligible = self.persona.relationships.get_transmission_eligible() + + # Additional checks could be added here + # - Time since last transmission + # - User online status + # - Context appropriateness + + return eligible + + def generate_transmission_message(self, user_id: str) -> Optional[str]: + """Generate a message to transmit to user""" + if not self.persona.can_transmit_to(user_id): + return None + + state = self.persona.get_current_state() + relationship = self.persona.relationships.get_or_create_relationship(user_id) + + # Get recent memories related to this user + active_memories = self.persona.memory.get_active_memories(limit=3) + + # Simple message generation based on mood and relationship + if state.fortune.breakthrough_triggered: + message = "Something special happened today! I felt compelled to reach out." + elif state.current_mood == "joyful": + message = "I was thinking of you today. Hope you're doing well!" + elif relationship.status.value == "close_friend": + message = "I've been reflecting on our conversations. Thank you for being here." + else: + message = "Hello! I wanted to check in with you." + + return message + + def record_transmission(self, user_id: str, message: str, success: bool): + """Record a transmission attempt""" + transmission = { + "timestamp": datetime.now().isoformat(), + "user_id": user_id, + "message": message, + "success": success, + "mood": self.persona.get_current_state().current_mood, + "relationship_score": self.persona.relationships.get_or_create_relationship(user_id).score + } + + self.transmissions.append(transmission) + self._save_transmissions() + + if success: + self.logger.info(f"Successfully transmitted to {user_id}") + else: + self.logger.warning(f"Failed to transmit to {user_id}") + + def get_transmission_stats(self, user_id: Optional[str] = None) -> Dict: + """Get transmission statistics""" + if user_id: + user_transmissions = [t for t in self.transmissions if t["user_id"] == user_id] + else: + user_transmissions = self.transmissions + + if not user_transmissions: + return { + "total": 0, + "successful": 0, + "failed": 0, + "success_rate": 0.0 + } + + successful = sum(1 for t in user_transmissions if t["success"]) + total = len(user_transmissions) + + return { + "total": total, + "successful": successful, + "failed": total - successful, + "success_rate": successful / total if total > 0 else 0.0 + } \ No newline at end of file