This commit is contained in:
2025-06-02 01:16:04 +09:00
parent 62f941a958
commit 06fb70fffa
13 changed files with 2016 additions and 1 deletions

1
.gitignore vendored
View File

@ -2,6 +2,5 @@
**.lock
output.json
config/*.db
aigpt
mcp/scripts/__*
data

15
src/aigpt/__init__.py Normal file
View File

@ -0,0 +1,15 @@
"""ai.gpt - Autonomous transmission AI with unique personality"""
__version__ = "0.1.0"
from .memory import MemoryManager
from .relationship import RelationshipTracker
from .persona import Persona
from .transmission import TransmissionController
__all__ = [
"MemoryManager",
"RelationshipTracker",
"Persona",
"TransmissionController",
]

172
src/aigpt/ai_provider.py Normal file
View File

@ -0,0 +1,172 @@
"""AI Provider integration for response generation"""
import os
from typing import Optional, Dict, List, Any, Protocol
from abc import abstractmethod
import logging
import httpx
from openai import OpenAI
import ollama
from .models import PersonaState, Memory
from .config import Config
class AIProvider(Protocol):
"""Protocol for AI providers"""
@abstractmethod
async def generate_response(
self,
prompt: str,
persona_state: PersonaState,
memories: List[Memory],
system_prompt: Optional[str] = None
) -> str:
"""Generate a response based on prompt and context"""
pass
class OllamaProvider:
"""Ollama AI provider"""
def __init__(self, model: str = "qwen2.5", host: str = "http://localhost:11434"):
self.model = model
self.host = host
self.client = ollama.Client(host=host)
self.logger = logging.getLogger(__name__)
async def generate_response(
self,
prompt: str,
persona_state: PersonaState,
memories: List[Memory],
system_prompt: Optional[str] = None
) -> str:
"""Generate response using Ollama"""
# Build context from memories
memory_context = "\n".join([
f"[{mem.level.value}] {mem.content[:200]}..."
for mem in memories[:5]
])
# Build personality context
personality_desc = ", ".join([
f"{trait}: {value:.1f}"
for trait, value in persona_state.base_personality.items()
])
# System prompt with persona context
full_system_prompt = f"""You are an AI with the following characteristics:
Current mood: {persona_state.current_mood}
Fortune today: {persona_state.fortune.fortune_value}/10
Personality traits: {personality_desc}
Recent memories:
{memory_context}
{system_prompt or 'Respond naturally based on your current state and memories.'}"""
try:
response = self.client.chat(
model=self.model,
messages=[
{"role": "system", "content": full_system_prompt},
{"role": "user", "content": prompt}
]
)
return response['message']['content']
except Exception as e:
self.logger.error(f"Ollama generation failed: {e}")
return self._fallback_response(persona_state)
def _fallback_response(self, persona_state: PersonaState) -> str:
"""Fallback response based on mood"""
mood_responses = {
"joyful": "That's wonderful! I'm feeling great today!",
"cheerful": "That sounds nice!",
"neutral": "I understand.",
"melancholic": "I see... That's something to think about.",
"contemplative": "Hmm, let me consider that..."
}
return mood_responses.get(persona_state.current_mood, "I see.")
class OpenAIProvider:
"""OpenAI API provider"""
def __init__(self, model: str = "gpt-4o-mini", api_key: Optional[str] = None):
self.model = model
# Try to get API key from config first
config = Config()
self.api_key = api_key or config.get_api_key("openai") or os.getenv("OPENAI_API_KEY")
if not self.api_key:
raise ValueError("OpenAI API key not provided. Set it with: ai-gpt config set providers.openai.api_key YOUR_KEY")
self.client = OpenAI(api_key=self.api_key)
self.logger = logging.getLogger(__name__)
async def generate_response(
self,
prompt: str,
persona_state: PersonaState,
memories: List[Memory],
system_prompt: Optional[str] = None
) -> str:
"""Generate response using OpenAI"""
# Build context similar to Ollama
memory_context = "\n".join([
f"[{mem.level.value}] {mem.content[:200]}..."
for mem in memories[:5]
])
personality_desc = ", ".join([
f"{trait}: {value:.1f}"
for trait, value in persona_state.base_personality.items()
])
full_system_prompt = f"""You are an AI with unique personality traits and memories.
Current mood: {persona_state.current_mood}
Fortune today: {persona_state.fortune.fortune_value}/10
Personality traits: {personality_desc}
Recent memories:
{memory_context}
{system_prompt or 'Respond naturally based on your current state and memories. Be authentic to your mood and personality.'}"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": full_system_prompt},
{"role": "user", "content": prompt}
],
temperature=0.7 + (persona_state.fortune.fortune_value - 5) * 0.05 # Vary by fortune
)
return response.choices[0].message.content
except Exception as e:
self.logger.error(f"OpenAI generation failed: {e}")
return self._fallback_response(persona_state)
def _fallback_response(self, persona_state: PersonaState) -> str:
"""Fallback response based on mood"""
mood_responses = {
"joyful": "What a delightful conversation!",
"cheerful": "That's interesting!",
"neutral": "I understand what you mean.",
"melancholic": "I've been thinking about that too...",
"contemplative": "That gives me something to ponder..."
}
return mood_responses.get(persona_state.current_mood, "I see.")
def create_ai_provider(provider: str, model: str, **kwargs) -> AIProvider:
"""Factory function to create AI providers"""
if provider == "ollama":
return OllamaProvider(model=model, **kwargs)
elif provider == "openai":
return OpenAIProvider(model=model, **kwargs)
else:
raise ValueError(f"Unknown provider: {provider}")

444
src/aigpt/cli.py Normal file
View File

@ -0,0 +1,444 @@
"""CLI interface for ai.gpt using typer"""
import typer
from pathlib import Path
from typing import Optional
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from datetime import datetime, timedelta
from .persona import Persona
from .transmission import TransmissionController
from .mcp_server import AIGptMcpServer
from .ai_provider import create_ai_provider
from .scheduler import AIScheduler, TaskType
from .config import Config
app = typer.Typer(help="ai.gpt - Autonomous transmission AI with unique personality")
console = Console()
# Configuration
config = Config()
DEFAULT_DATA_DIR = config.data_dir
def get_persona(data_dir: Optional[Path] = None) -> Persona:
"""Get or create persona instance"""
if data_dir is None:
data_dir = DEFAULT_DATA_DIR
data_dir.mkdir(parents=True, exist_ok=True)
return Persona(data_dir)
@app.command()
def chat(
user_id: str = typer.Argument(..., help="User ID (atproto DID)"),
message: str = typer.Argument(..., help="Message to send to AI"),
data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory"),
model: Optional[str] = typer.Option(None, "--model", "-m", help="AI model to use"),
provider: Optional[str] = typer.Option(None, "--provider", help="AI provider (ollama/openai)")
):
"""Chat with the AI"""
persona = get_persona(data_dir)
# Create AI provider if specified
ai_provider = None
if provider and model:
try:
ai_provider = create_ai_provider(provider, model)
console.print(f"[dim]Using {provider} with model {model}[/dim]\n")
except Exception as e:
console.print(f"[yellow]Warning: Could not create AI provider: {e}[/yellow]")
console.print("[yellow]Falling back to simple responses[/yellow]\n")
# Process interaction
response, relationship_delta = persona.process_interaction(user_id, message, ai_provider)
# Get updated relationship
relationship = persona.relationships.get_or_create_relationship(user_id)
# Display response
console.print(Panel(response, title="AI Response", border_style="cyan"))
# Show relationship status
status_color = "green" if relationship.transmission_enabled else "yellow"
if relationship.is_broken:
status_color = "red"
console.print(f"\n[{status_color}]Relationship Status:[/{status_color}] {relationship.status.value}")
console.print(f"Score: {relationship.score:.2f} / {relationship.threshold}")
console.print(f"Transmission: {'✓ Enabled' if relationship.transmission_enabled else '✗ Disabled'}")
if relationship.is_broken:
console.print("[red]⚠️ This relationship is broken and cannot be repaired.[/red]")
@app.command()
def status(
user_id: Optional[str] = typer.Argument(None, help="User ID to check status for"),
data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory")
):
"""Check AI status and relationships"""
persona = get_persona(data_dir)
state = persona.get_current_state()
# Show AI state
console.print(Panel(f"[cyan]ai.gpt Status[/cyan]", expand=False))
console.print(f"Mood: {state.current_mood}")
console.print(f"Fortune: {state.fortune.fortune_value}/10")
if state.fortune.breakthrough_triggered:
console.print("[yellow]⚡ Breakthrough triggered![/yellow]")
# Show personality traits
table = Table(title="Current Personality")
table.add_column("Trait", style="cyan")
table.add_column("Value", style="magenta")
for trait, value in state.base_personality.items():
table.add_row(trait.capitalize(), f"{value:.2f}")
console.print(table)
# Show specific relationship if requested
if user_id:
rel = persona.relationships.get_or_create_relationship(user_id)
console.print(f"\n[cyan]Relationship with {user_id}:[/cyan]")
console.print(f"Status: {rel.status.value}")
console.print(f"Score: {rel.score:.2f}")
console.print(f"Total Interactions: {rel.total_interactions}")
console.print(f"Transmission Enabled: {rel.transmission_enabled}")
@app.command()
def fortune(
data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory")
):
"""Check today's AI fortune"""
persona = get_persona(data_dir)
fortune = persona.fortune_system.get_today_fortune()
# Fortune display
fortune_bar = "🌟" * fortune.fortune_value + "" * (10 - fortune.fortune_value)
console.print(Panel(
f"{fortune_bar}\n\n"
f"Today's Fortune: {fortune.fortune_value}/10\n"
f"Date: {fortune.date}",
title="AI Fortune",
border_style="yellow"
))
if fortune.consecutive_good > 0:
console.print(f"[green]Consecutive good days: {fortune.consecutive_good}[/green]")
if fortune.consecutive_bad > 0:
console.print(f"[red]Consecutive bad days: {fortune.consecutive_bad}[/red]")
if fortune.breakthrough_triggered:
console.print("\n[yellow]⚡ BREAKTHROUGH! Special fortune activated![/yellow]")
@app.command()
def transmit(
data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory"),
dry_run: bool = typer.Option(True, "--dry-run/--execute", help="Dry run or execute")
):
"""Check and execute autonomous transmissions"""
persona = get_persona(data_dir)
controller = TransmissionController(persona, persona.data_dir)
eligible = controller.check_transmission_eligibility()
if not eligible:
console.print("[yellow]No users eligible for transmission.[/yellow]")
return
console.print(f"[green]Found {len(eligible)} eligible users for transmission:[/green]")
for user_id, rel in eligible.items():
message = controller.generate_transmission_message(user_id)
if message:
console.print(f"\n[cyan]To:[/cyan] {user_id}")
console.print(f"[cyan]Message:[/cyan] {message}")
console.print(f"[cyan]Relationship:[/cyan] {rel.status.value} (score: {rel.score:.2f})")
if not dry_run:
# In real implementation, send via atproto or other channel
controller.record_transmission(user_id, message, success=True)
console.print("[green]✓ Transmitted[/green]")
else:
console.print("[yellow]→ Would transmit (dry run)[/yellow]")
@app.command()
def maintenance(
data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory")
):
"""Run daily maintenance tasks"""
persona = get_persona(data_dir)
console.print("[cyan]Running daily maintenance...[/cyan]")
persona.daily_maintenance()
console.print("[green]✓ Maintenance completed[/green]")
@app.command()
def relationships(
data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory")
):
"""List all relationships"""
persona = get_persona(data_dir)
table = Table(title="All Relationships")
table.add_column("User ID", style="cyan")
table.add_column("Status", style="magenta")
table.add_column("Score", style="green")
table.add_column("Transmission", style="yellow")
table.add_column("Last Interaction")
for user_id, rel in persona.relationships.relationships.items():
transmission = "" if rel.transmission_enabled else ""
if rel.is_broken:
transmission = "💔"
last_interaction = rel.last_interaction.strftime("%Y-%m-%d") if rel.last_interaction else "Never"
table.add_row(
user_id[:16] + "...",
rel.status.value,
f"{rel.score:.2f}",
transmission,
last_interaction
)
console.print(table)
@app.command()
def server(
host: str = typer.Option("localhost", "--host", "-h", help="Server host"),
port: int = typer.Option(8000, "--port", "-p", help="Server port"),
data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory"),
model: str = typer.Option("qwen2.5", "--model", "-m", help="AI model to use"),
provider: str = typer.Option("ollama", "--provider", help="AI provider (ollama/openai)")
):
"""Run MCP server for AI integration"""
import uvicorn
if data_dir is None:
data_dir = DEFAULT_DATA_DIR
data_dir.mkdir(parents=True, exist_ok=True)
# Create MCP server
mcp_server = AIGptMcpServer(data_dir)
app_instance = mcp_server.get_server().get_app()
console.print(Panel(
f"[cyan]Starting ai.gpt MCP Server[/cyan]\n\n"
f"Host: {host}:{port}\n"
f"Provider: {provider}\n"
f"Model: {model}\n"
f"Data: {data_dir}",
title="MCP Server",
border_style="green"
))
# Store provider info in app state for later use
app_instance.state.ai_provider = provider
app_instance.state.ai_model = model
# Run server
uvicorn.run(app_instance, host=host, port=port)
@app.command()
def schedule(
action: str = typer.Argument(..., help="Action: add, list, enable, disable, remove, run"),
task_type: Optional[str] = typer.Argument(None, help="Task type for add action"),
schedule_expr: Optional[str] = typer.Argument(None, help="Schedule expression (cron or interval)"),
data_dir: Optional[Path] = typer.Option(None, "--data-dir", "-d", help="Data directory"),
task_id: Optional[str] = typer.Option(None, "--task-id", "-t", help="Task ID"),
provider: Optional[str] = typer.Option(None, "--provider", help="AI provider for transmission"),
model: Optional[str] = typer.Option(None, "--model", "-m", help="AI model for transmission")
):
"""Manage scheduled tasks"""
persona = get_persona(data_dir)
scheduler = AIScheduler(persona.data_dir, persona)
if action == "add":
if not task_type or not schedule_expr:
console.print("[red]Error: task_type and schedule required for add action[/red]")
return
# Parse task type
try:
task_type_enum = TaskType(task_type)
except ValueError:
console.print(f"[red]Invalid task type. Valid types: {', '.join([t.value for t in TaskType])}[/red]")
return
# Metadata for transmission tasks
metadata = {}
if task_type_enum == TaskType.TRANSMISSION_CHECK:
metadata["provider"] = provider or "ollama"
metadata["model"] = model or "qwen2.5"
try:
task = scheduler.add_task(task_type_enum, schedule_expr, task_id, metadata)
console.print(f"[green]✓ Added task {task.task_id}[/green]")
console.print(f"Type: {task.task_type.value}")
console.print(f"Schedule: {task.schedule}")
except ValueError as e:
console.print(f"[red]Error: {e}[/red]")
elif action == "list":
tasks = scheduler.get_tasks()
if not tasks:
console.print("[yellow]No scheduled tasks[/yellow]")
return
table = Table(title="Scheduled Tasks")
table.add_column("Task ID", style="cyan")
table.add_column("Type", style="magenta")
table.add_column("Schedule", style="green")
table.add_column("Enabled", style="yellow")
table.add_column("Last Run")
for task in tasks:
enabled = "" if task.enabled else ""
last_run = task.last_run.strftime("%Y-%m-%d %H:%M") if task.last_run else "Never"
table.add_row(
task.task_id[:20] + "..." if len(task.task_id) > 20 else task.task_id,
task.task_type.value,
task.schedule,
enabled,
last_run
)
console.print(table)
elif action == "enable":
if not task_id:
console.print("[red]Error: --task-id required for enable action[/red]")
return
scheduler.enable_task(task_id)
console.print(f"[green]✓ Enabled task {task_id}[/green]")
elif action == "disable":
if not task_id:
console.print("[red]Error: --task-id required for disable action[/red]")
return
scheduler.disable_task(task_id)
console.print(f"[yellow]✓ Disabled task {task_id}[/yellow]")
elif action == "remove":
if not task_id:
console.print("[red]Error: --task-id required for remove action[/red]")
return
scheduler.remove_task(task_id)
console.print(f"[red]✓ Removed task {task_id}[/red]")
elif action == "run":
console.print("[cyan]Starting scheduler daemon...[/cyan]")
console.print("Press Ctrl+C to stop\n")
import asyncio
async def run_scheduler():
scheduler.start()
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
scheduler.stop()
try:
asyncio.run(run_scheduler())
except KeyboardInterrupt:
console.print("\n[yellow]Scheduler stopped[/yellow]")
else:
console.print(f"[red]Unknown action: {action}[/red]")
console.print("Valid actions: add, list, enable, disable, remove, run")
@app.command()
def config(
action: str = typer.Argument(..., help="Action: get, set, delete, list"),
key: Optional[str] = typer.Argument(None, help="Configuration key (dot notation)"),
value: Optional[str] = typer.Argument(None, help="Value to set")
):
"""Manage configuration settings"""
if action == "get":
if not key:
console.print("[red]Error: key required for get action[/red]")
return
val = config.get(key)
if val is None:
console.print(f"[yellow]Key '{key}' not found[/yellow]")
else:
console.print(f"[cyan]{key}[/cyan] = [green]{val}[/green]")
elif action == "set":
if not key or value is None:
console.print("[red]Error: key and value required for set action[/red]")
return
# Special handling for sensitive keys
if "password" in key or "api_key" in key:
console.print(f"[cyan]Setting {key}[/cyan] = [dim]***hidden***[/dim]")
else:
console.print(f"[cyan]Setting {key}[/cyan] = [green]{value}[/green]")
config.set(key, value)
console.print("[green]✓ Configuration saved[/green]")
elif action == "delete":
if not key:
console.print("[red]Error: key required for delete action[/red]")
return
if config.delete(key):
console.print(f"[green]✓ Deleted {key}[/green]")
else:
console.print(f"[yellow]Key '{key}' not found[/yellow]")
elif action == "list":
keys = config.list_keys(key or "")
if not keys:
console.print("[yellow]No configuration keys found[/yellow]")
return
table = Table(title="Configuration Settings")
table.add_column("Key", style="cyan")
table.add_column("Value", style="green")
for k in sorted(keys):
val = config.get(k)
# Hide sensitive values
if "password" in k or "api_key" in k:
display_val = "***hidden***" if val else "not set"
else:
display_val = str(val) if val is not None else "not set"
table.add_row(k, display_val)
console.print(table)
else:
console.print(f"[red]Unknown action: {action}[/red]")
console.print("Valid actions: get, set, delete, list")
if __name__ == "__main__":
app()

145
src/aigpt/config.py Normal file
View File

@ -0,0 +1,145 @@
"""Configuration management for ai.gpt"""
import json
import os
from pathlib import Path
from typing import Optional, Dict, Any
import logging
class Config:
"""Manages configuration settings"""
def __init__(self, config_dir: Optional[Path] = None):
if config_dir is None:
config_dir = Path.home() / ".config" / "syui" / "ai" / "gpt"
self.config_dir = config_dir
self.config_file = config_dir / "config.json"
self.data_dir = config_dir / "data"
# Create directories if they don't exist
self.config_dir.mkdir(parents=True, exist_ok=True)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.logger = logging.getLogger(__name__)
self._config: Dict[str, Any] = {}
self._load_config()
def _load_config(self):
"""Load configuration from file"""
if self.config_file.exists():
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
self._config = json.load(f)
except Exception as e:
self.logger.error(f"Failed to load config: {e}")
self._config = {}
else:
# Initialize with default config
self._config = {
"providers": {
"openai": {
"api_key": None,
"default_model": "gpt-4o-mini"
},
"ollama": {
"host": "http://localhost:11434",
"default_model": "qwen2.5"
}
},
"atproto": {
"handle": None,
"password": None,
"host": "https://bsky.social"
},
"default_provider": "ollama"
}
self._save_config()
def _save_config(self):
"""Save configuration to file"""
try:
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self._config, f, indent=2)
except Exception as e:
self.logger.error(f"Failed to save config: {e}")
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value using dot notation"""
keys = key.split('.')
value = self._config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
def set(self, key: str, value: Any):
"""Set configuration value using dot notation"""
keys = key.split('.')
config = self._config
# Navigate to the parent dictionary
for k in keys[:-1]:
if k not in config:
config[k] = {}
config = config[k]
# Set the value
config[keys[-1]] = value
self._save_config()
def delete(self, key: str) -> bool:
"""Delete configuration value"""
keys = key.split('.')
config = self._config
# Navigate to the parent dictionary
for k in keys[:-1]:
if k not in config:
return False
config = config[k]
# Delete the key if it exists
if keys[-1] in config:
del config[keys[-1]]
self._save_config()
return True
return False
def list_keys(self, prefix: str = "") -> list[str]:
"""List all configuration keys with optional prefix"""
def _get_keys(config: dict, current_prefix: str = "") -> list[str]:
keys = []
for k, v in config.items():
full_key = f"{current_prefix}.{k}" if current_prefix else k
if isinstance(v, dict):
keys.extend(_get_keys(v, full_key))
else:
keys.append(full_key)
return keys
all_keys = _get_keys(self._config)
if prefix:
return [k for k in all_keys if k.startswith(prefix)]
return all_keys
def get_api_key(self, provider: str) -> Optional[str]:
"""Get API key for a specific provider"""
key = self.get(f"providers.{provider}.api_key")
# Also check environment variables
if not key and provider == "openai":
key = os.getenv("OPENAI_API_KEY")
return key
def get_provider_config(self, provider: str) -> Dict[str, Any]:
"""Get complete configuration for a provider"""
return self.get(f"providers.{provider}", {})

118
src/aigpt/fortune.py Normal file
View File

@ -0,0 +1,118 @@
"""AI Fortune system for daily personality variations"""
import json
import random
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Optional
import logging
from .models import AIFortune
class FortuneSystem:
"""Manages daily AI fortune affecting personality"""
def __init__(self, data_dir: Path):
self.data_dir = data_dir
self.fortune_file = data_dir / "fortunes.json"
self.fortunes: dict[str, AIFortune] = {}
self.logger = logging.getLogger(__name__)
self._load_fortunes()
def _load_fortunes(self):
"""Load fortune history from storage"""
if self.fortune_file.exists():
with open(self.fortune_file, 'r', encoding='utf-8') as f:
data = json.load(f)
for date_str, fortune_data in data.items():
# Convert date string back to date object
fortune_data['date'] = datetime.fromisoformat(fortune_data['date']).date()
self.fortunes[date_str] = AIFortune(**fortune_data)
def _save_fortunes(self):
"""Save fortune history to storage"""
data = {}
for date_str, fortune in self.fortunes.items():
fortune_dict = fortune.model_dump(mode='json')
fortune_dict['date'] = fortune.date.isoformat()
data[date_str] = fortune_dict
with open(self.fortune_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
def get_today_fortune(self) -> AIFortune:
"""Get or generate today's fortune"""
today = date.today()
today_str = today.isoformat()
if today_str in self.fortunes:
return self.fortunes[today_str]
# Generate new fortune
fortune_value = random.randint(1, 10)
# Check yesterday's fortune for consecutive tracking
yesterday = (today - timedelta(days=1))
yesterday_str = yesterday.isoformat()
consecutive_good = 0
consecutive_bad = 0
breakthrough_triggered = False
if yesterday_str in self.fortunes:
yesterday_fortune = self.fortunes[yesterday_str]
if fortune_value >= 7: # Good fortune
if yesterday_fortune.fortune_value >= 7:
consecutive_good = yesterday_fortune.consecutive_good + 1
else:
consecutive_good = 1
elif fortune_value <= 3: # Bad fortune
if yesterday_fortune.fortune_value <= 3:
consecutive_bad = yesterday_fortune.consecutive_bad + 1
else:
consecutive_bad = 1
# Check breakthrough conditions
if consecutive_good >= 3:
breakthrough_triggered = True
self.logger.info("Breakthrough! 3 consecutive good fortunes!")
fortune_value = 10 # Max fortune on breakthrough
elif consecutive_bad >= 3:
breakthrough_triggered = True
self.logger.info("Breakthrough! 3 consecutive bad fortunes!")
fortune_value = random.randint(7, 10) # Good fortune after bad streak
fortune = AIFortune(
date=today,
fortune_value=fortune_value,
consecutive_good=consecutive_good,
consecutive_bad=consecutive_bad,
breakthrough_triggered=breakthrough_triggered
)
self.fortunes[today_str] = fortune
self._save_fortunes()
self.logger.info(f"Today's fortune: {fortune_value}/10")
return fortune
def get_personality_modifier(self, fortune: AIFortune) -> dict[str, float]:
"""Get personality modifiers based on fortune"""
base_modifier = fortune.fortune_value / 10.0
modifiers = {
"optimism": base_modifier,
"energy": base_modifier * 0.8,
"patience": 1.0 - (abs(5.5 - fortune.fortune_value) * 0.1),
"creativity": 0.5 + (base_modifier * 0.5),
"empathy": 0.7 + (base_modifier * 0.3)
}
# Breakthrough effects
if fortune.breakthrough_triggered:
modifiers["confidence"] = 1.0
modifiers["spontaneity"] = 0.9
return modifiers

149
src/aigpt/mcp_server.py Normal file
View File

@ -0,0 +1,149 @@
"""MCP Server for ai.gpt system"""
from typing import Optional, List, Dict, Any
from fastapi_mcp import FastapiMcpServer
from pathlib import Path
import logging
from .persona import Persona
from .models import Memory, Relationship, PersonaState
logger = logging.getLogger(__name__)
class AIGptMcpServer:
"""MCP Server that exposes ai.gpt functionality to AI assistants"""
def __init__(self, data_dir: Path):
self.data_dir = data_dir
self.persona = Persona(data_dir)
self.server = FastapiMcpServer("ai-gpt", "AI.GPT Memory and Relationship System")
self._register_tools()
def _register_tools(self):
"""Register all MCP tools"""
@self.server.tool("get_memories")
async def get_memories(user_id: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]:
"""Get active memories from the AI's memory system"""
memories = self.persona.memory.get_active_memories(limit=limit)
return [
{
"id": mem.id,
"content": mem.content,
"level": mem.level.value,
"importance": mem.importance_score,
"is_core": mem.is_core,
"timestamp": mem.timestamp.isoformat()
}
for mem in memories
]
@self.server.tool("get_relationship")
async def get_relationship(user_id: str) -> Dict[str, Any]:
"""Get relationship status with a specific user"""
rel = self.persona.relationships.get_or_create_relationship(user_id)
return {
"user_id": rel.user_id,
"status": rel.status.value,
"score": rel.score,
"transmission_enabled": rel.transmission_enabled,
"is_broken": rel.is_broken,
"total_interactions": rel.total_interactions,
"last_interaction": rel.last_interaction.isoformat() if rel.last_interaction else None
}
@self.server.tool("get_all_relationships")
async def get_all_relationships() -> List[Dict[str, Any]]:
"""Get all relationships"""
relationships = []
for user_id, rel in self.persona.relationships.relationships.items():
relationships.append({
"user_id": user_id,
"status": rel.status.value,
"score": rel.score,
"transmission_enabled": rel.transmission_enabled,
"is_broken": rel.is_broken
})
return relationships
@self.server.tool("get_persona_state")
async def get_persona_state() -> Dict[str, Any]:
"""Get current persona state including fortune and mood"""
state = self.persona.get_current_state()
return {
"mood": state.current_mood,
"fortune": {
"value": state.fortune.fortune_value,
"date": state.fortune.date.isoformat(),
"breakthrough": state.fortune.breakthrough_triggered
},
"personality": state.base_personality,
"active_memory_count": len(state.active_memories)
}
@self.server.tool("process_interaction")
async def process_interaction(user_id: str, message: str) -> Dict[str, Any]:
"""Process an interaction with a user"""
response, relationship_delta = self.persona.process_interaction(user_id, message)
rel = self.persona.relationships.get_or_create_relationship(user_id)
return {
"response": response,
"relationship_delta": relationship_delta,
"new_relationship_score": rel.score,
"transmission_enabled": rel.transmission_enabled,
"relationship_status": rel.status.value
}
@self.server.tool("check_transmission_eligibility")
async def check_transmission_eligibility(user_id: str) -> Dict[str, Any]:
"""Check if AI can transmit to a specific user"""
can_transmit = self.persona.can_transmit_to(user_id)
rel = self.persona.relationships.get_or_create_relationship(user_id)
return {
"can_transmit": can_transmit,
"relationship_score": rel.score,
"threshold": rel.threshold,
"is_broken": rel.is_broken,
"transmission_enabled": rel.transmission_enabled
}
@self.server.tool("get_fortune")
async def get_fortune() -> Dict[str, Any]:
"""Get today's AI fortune"""
fortune = self.persona.fortune_system.get_today_fortune()
modifiers = self.persona.fortune_system.get_personality_modifier(fortune)
return {
"value": fortune.fortune_value,
"date": fortune.date.isoformat(),
"consecutive_good": fortune.consecutive_good,
"consecutive_bad": fortune.consecutive_bad,
"breakthrough": fortune.breakthrough_triggered,
"personality_modifiers": modifiers
}
@self.server.tool("summarize_memories")
async def summarize_memories(user_id: str) -> Optional[Dict[str, Any]]:
"""Create a summary of recent memories for a user"""
summary = self.persona.memory.summarize_memories(user_id)
if summary:
return {
"id": summary.id,
"content": summary.content,
"level": summary.level.value,
"timestamp": summary.timestamp.isoformat()
}
return None
@self.server.tool("run_maintenance")
async def run_maintenance() -> Dict[str, str]:
"""Run daily maintenance tasks"""
self.persona.daily_maintenance()
return {"status": "Maintenance completed successfully"}
def get_server(self) -> FastapiMcpServer:
"""Get the FastAPI MCP server instance"""
return self.server

155
src/aigpt/memory.py Normal file
View File

@ -0,0 +1,155 @@
"""Memory management system for ai.gpt"""
import json
import hashlib
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional, Dict, Any
import logging
from .models import Memory, MemoryLevel, Conversation
class MemoryManager:
"""Manages AI's memory with hierarchical storage and forgetting"""
def __init__(self, data_dir: Path):
self.data_dir = data_dir
self.memories_file = data_dir / "memories.json"
self.conversations_file = data_dir / "conversations.json"
self.memories: Dict[str, Memory] = {}
self.conversations: List[Conversation] = []
self.logger = logging.getLogger(__name__)
self._load_memories()
def _load_memories(self):
"""Load memories from persistent storage"""
if self.memories_file.exists():
with open(self.memories_file, 'r', encoding='utf-8') as f:
data = json.load(f)
for mem_data in data:
memory = Memory(**mem_data)
self.memories[memory.id] = memory
if self.conversations_file.exists():
with open(self.conversations_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.conversations = [Conversation(**conv) for conv in data]
def _save_memories(self):
"""Save memories to persistent storage"""
memories_data = [mem.model_dump(mode='json') for mem in self.memories.values()]
with open(self.memories_file, 'w', encoding='utf-8') as f:
json.dump(memories_data, f, indent=2, default=str)
conv_data = [conv.model_dump(mode='json') for conv in self.conversations]
with open(self.conversations_file, 'w', encoding='utf-8') as f:
json.dump(conv_data, f, indent=2, default=str)
def add_conversation(self, conversation: Conversation) -> Memory:
"""Add a conversation and create memory from it"""
self.conversations.append(conversation)
# Create memory from conversation
memory_id = hashlib.sha256(
f"{conversation.id}{conversation.timestamp}".encode()
).hexdigest()[:16]
memory = Memory(
id=memory_id,
timestamp=conversation.timestamp,
content=f"User: {conversation.user_message}\nAI: {conversation.ai_response}",
level=MemoryLevel.FULL_LOG,
importance_score=abs(conversation.relationship_delta) * 0.1
)
self.memories[memory.id] = memory
self._save_memories()
return memory
def summarize_memories(self, user_id: str) -> Optional[Memory]:
"""Create summary from recent memories"""
recent_memories = [
mem for mem in self.memories.values()
if mem.level == MemoryLevel.FULL_LOG
and (datetime.now() - mem.timestamp).days < 7
]
if len(recent_memories) < 5:
return None
# Simple summary creation (in real implementation, use AI)
summary_content = f"Summary of {len(recent_memories)} recent interactions"
summary_id = hashlib.sha256(
f"summary_{datetime.now().isoformat()}".encode()
).hexdigest()[:16]
summary = Memory(
id=summary_id,
timestamp=datetime.now(),
content=summary_content,
summary=summary_content,
level=MemoryLevel.SUMMARY,
importance_score=0.5
)
self.memories[summary.id] = summary
# Mark summarized memories for potential forgetting
for mem in recent_memories:
mem.importance_score *= 0.9
self._save_memories()
return summary
def identify_core_memories(self) -> List[Memory]:
"""Identify memories that should become core (never forgotten)"""
core_candidates = [
mem for mem in self.memories.values()
if mem.importance_score > 0.8
and not mem.is_core
and mem.level != MemoryLevel.FORGOTTEN
]
for memory in core_candidates:
memory.is_core = True
memory.level = MemoryLevel.CORE
self.logger.info(f"Memory {memory.id} promoted to core")
self._save_memories()
return core_candidates
def apply_forgetting(self):
"""Apply selective forgetting based on importance and time"""
now = datetime.now()
for memory in self.memories.values():
if memory.is_core or memory.level == MemoryLevel.FORGOTTEN:
continue
# Time-based decay
age_days = (now - memory.timestamp).days
decay_factor = memory.decay_rate * age_days
memory.importance_score -= decay_factor
# Forget unimportant old memories
if memory.importance_score <= 0.1 and age_days > 30:
memory.level = MemoryLevel.FORGOTTEN
self.logger.info(f"Memory {memory.id} forgotten")
self._save_memories()
def get_active_memories(self, limit: int = 10) -> List[Memory]:
"""Get currently active memories for persona"""
active = [
mem for mem in self.memories.values()
if mem.level != MemoryLevel.FORGOTTEN
]
# Sort by importance and recency
active.sort(
key=lambda m: (m.is_core, m.importance_score, m.timestamp),
reverse=True
)
return active[:limit]

79
src/aigpt/models.py Normal file
View File

@ -0,0 +1,79 @@
"""Data models for ai.gpt system"""
from datetime import datetime
from typing import Optional, Dict, List, Any
from enum import Enum
from pydantic import BaseModel, Field
class MemoryLevel(str, Enum):
"""Memory importance levels"""
FULL_LOG = "full_log"
SUMMARY = "summary"
CORE = "core"
FORGOTTEN = "forgotten"
class RelationshipStatus(str, Enum):
"""Relationship status levels"""
STRANGER = "stranger"
ACQUAINTANCE = "acquaintance"
FRIEND = "friend"
CLOSE_FRIEND = "close_friend"
BROKEN = "broken" # 不可逆
class Memory(BaseModel):
"""Single memory unit"""
id: str
timestamp: datetime
content: str
summary: Optional[str] = None
level: MemoryLevel = MemoryLevel.FULL_LOG
importance_score: float = Field(ge=0.0, le=1.0)
is_core: bool = False
decay_rate: float = 0.01
class Relationship(BaseModel):
"""Relationship with a specific user"""
user_id: str # atproto DID
status: RelationshipStatus = RelationshipStatus.STRANGER
score: float = 0.0
daily_interactions: int = 0
total_interactions: int = 0
last_interaction: Optional[datetime] = None
transmission_enabled: bool = False
threshold: float = 100.0
decay_rate: float = 0.1
daily_limit: int = 10
is_broken: bool = False
class AIFortune(BaseModel):
"""Daily AI fortune affecting personality"""
date: datetime.date
fortune_value: int = Field(ge=1, le=10)
consecutive_good: int = 0
consecutive_bad: int = 0
breakthrough_triggered: bool = False
class PersonaState(BaseModel):
"""Current persona state"""
base_personality: Dict[str, float]
current_mood: str
fortune: AIFortune
active_memories: List[str] # Memory IDs
relationship_modifiers: Dict[str, float]
class Conversation(BaseModel):
"""Conversation log entry"""
id: str
user_id: str
timestamp: datetime
user_message: str
ai_response: str
relationship_delta: float = 0.0
memory_created: bool = False

181
src/aigpt/persona.py Normal file
View File

@ -0,0 +1,181 @@
"""Persona management system integrating memory, relationships, and fortune"""
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
import logging
from .models import PersonaState, Conversation
from .memory import MemoryManager
from .relationship import RelationshipTracker
from .fortune import FortuneSystem
class Persona:
"""AI persona with unique characteristics based on interactions"""
def __init__(self, data_dir: Path, name: str = "ai"):
self.data_dir = data_dir
self.name = name
self.memory = MemoryManager(data_dir)
self.relationships = RelationshipTracker(data_dir)
self.fortune_system = FortuneSystem(data_dir)
self.logger = logging.getLogger(__name__)
# Base personality traits
self.base_personality = {
"curiosity": 0.7,
"empathy": 0.8,
"creativity": 0.6,
"patience": 0.7,
"optimism": 0.6
}
self.state_file = data_dir / "persona_state.json"
self._load_state()
def _load_state(self):
"""Load persona state from storage"""
if self.state_file.exists():
with open(self.state_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.base_personality = data.get("base_personality", self.base_personality)
def _save_state(self):
"""Save persona state to storage"""
state_data = {
"base_personality": self.base_personality,
"last_updated": datetime.now().isoformat()
}
with open(self.state_file, 'w', encoding='utf-8') as f:
json.dump(state_data, f, indent=2)
def get_current_state(self) -> PersonaState:
"""Get current persona state including all modifiers"""
# Get today's fortune
fortune = self.fortune_system.get_today_fortune()
fortune_modifiers = self.fortune_system.get_personality_modifier(fortune)
# Apply fortune modifiers to base personality
current_personality = {}
for trait, base_value in self.base_personality.items():
modifier = fortune_modifiers.get(trait, 1.0)
current_personality[trait] = min(1.0, base_value * modifier)
# Get active memories for context
active_memories = self.memory.get_active_memories(limit=5)
# Determine mood based on fortune and recent interactions
mood = self._determine_mood(fortune.fortune_value)
state = PersonaState(
base_personality=current_personality,
current_mood=mood,
fortune=fortune,
active_memories=[mem.id for mem in active_memories],
relationship_modifiers={}
)
return state
def _determine_mood(self, fortune_value: int) -> str:
"""Determine current mood based on fortune and other factors"""
if fortune_value >= 8:
return "joyful"
elif fortune_value >= 6:
return "cheerful"
elif fortune_value >= 4:
return "neutral"
elif fortune_value >= 2:
return "melancholic"
else:
return "contemplative"
def process_interaction(self, user_id: str, message: str, ai_provider=None) -> tuple[str, float]:
"""Process user interaction and generate response"""
# Get current state
state = self.get_current_state()
# Get relationship with user
relationship = self.relationships.get_or_create_relationship(user_id)
# Simple response generation (use AI provider if available)
if relationship.is_broken:
response = "..."
relationship_delta = 0.0
else:
if ai_provider:
# Use AI provider for response generation
memories = self.memory.get_active_memories(limit=5)
import asyncio
response = asyncio.run(
ai_provider.generate_response(message, state, memories)
)
# Calculate relationship delta based on interaction quality
if state.current_mood in ["joyful", "cheerful"]:
relationship_delta = 2.0
elif relationship.status.value == "close_friend":
relationship_delta = 1.5
else:
relationship_delta = 1.0
else:
# Fallback to simple responses
if state.current_mood == "joyful":
response = f"What a wonderful day! {message} sounds interesting!"
relationship_delta = 2.0
elif relationship.status.value == "close_friend":
response = f"I've been thinking about our conversations. {message}"
relationship_delta = 1.5
else:
response = f"I understand. {message}"
relationship_delta = 1.0
# Create conversation record
conv_id = f"{user_id}_{datetime.now().timestamp()}"
conversation = Conversation(
id=conv_id,
user_id=user_id,
timestamp=datetime.now(),
user_message=message,
ai_response=response,
relationship_delta=relationship_delta,
memory_created=True
)
# Update memory
self.memory.add_conversation(conversation)
# Update relationship
self.relationships.update_interaction(user_id, relationship_delta)
return response, relationship_delta
def can_transmit_to(self, user_id: str) -> bool:
"""Check if AI can transmit messages to this user"""
relationship = self.relationships.get_or_create_relationship(user_id)
return relationship.transmission_enabled and not relationship.is_broken
def daily_maintenance(self):
"""Perform daily maintenance tasks"""
self.logger.info("Performing daily maintenance...")
# Apply time decay to relationships
self.relationships.apply_time_decay()
# Apply forgetting to memories
self.memory.apply_forgetting()
# Identify core memories
core_memories = self.memory.identify_core_memories()
if core_memories:
self.logger.info(f"Identified {len(core_memories)} new core memories")
# Create memory summaries
for user_id in self.relationships.relationships:
summary = self.memory.summarize_memories(user_id)
if summary:
self.logger.info(f"Created summary for interactions with {user_id}")
self._save_state()
self.logger.info("Daily maintenance completed")

135
src/aigpt/relationship.py Normal file
View File

@ -0,0 +1,135 @@
"""Relationship tracking system with irreversible damage"""
import json
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Optional
import logging
from .models import Relationship, RelationshipStatus
class RelationshipTracker:
"""Tracks and manages relationships with users"""
def __init__(self, data_dir: Path):
self.data_dir = data_dir
self.relationships_file = data_dir / "relationships.json"
self.relationships: Dict[str, Relationship] = {}
self.logger = logging.getLogger(__name__)
self._load_relationships()
def _load_relationships(self):
"""Load relationships from persistent storage"""
if self.relationships_file.exists():
with open(self.relationships_file, 'r', encoding='utf-8') as f:
data = json.load(f)
for user_id, rel_data in data.items():
self.relationships[user_id] = Relationship(**rel_data)
def _save_relationships(self):
"""Save relationships to persistent storage"""
data = {
user_id: rel.model_dump(mode='json')
for user_id, rel in self.relationships.items()
}
with open(self.relationships_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, default=str)
def get_or_create_relationship(self, user_id: str) -> Relationship:
"""Get existing relationship or create new one"""
if user_id not in self.relationships:
self.relationships[user_id] = Relationship(user_id=user_id)
self._save_relationships()
return self.relationships[user_id]
def update_interaction(self, user_id: str, delta: float) -> Relationship:
"""Update relationship based on interaction"""
rel = self.get_or_create_relationship(user_id)
# Check if relationship is broken (irreversible)
if rel.is_broken:
self.logger.warning(f"Relationship with {user_id} is broken. No updates allowed.")
return rel
# Check daily limit
if rel.last_interaction and rel.last_interaction.date() == datetime.now().date():
if rel.daily_interactions >= rel.daily_limit:
self.logger.info(f"Daily interaction limit reached for {user_id}")
return rel
else:
rel.daily_interactions = 0
# Update interaction counts
rel.daily_interactions += 1
rel.total_interactions += 1
rel.last_interaction = datetime.now()
# Update score with bounds
old_score = rel.score
rel.score += delta
rel.score = max(0.0, min(200.0, rel.score)) # 0-200 range
# Check for relationship damage
if delta < -10.0: # Significant negative interaction
self.logger.warning(f"Major relationship damage with {user_id}: {delta}")
if rel.score <= 0:
rel.is_broken = True
rel.status = RelationshipStatus.BROKEN
rel.transmission_enabled = False
self.logger.error(f"Relationship with {user_id} is now BROKEN (irreversible)")
# Update relationship status based on score
if not rel.is_broken:
if rel.score >= 150:
rel.status = RelationshipStatus.CLOSE_FRIEND
elif rel.score >= 100:
rel.status = RelationshipStatus.FRIEND
elif rel.score >= 50:
rel.status = RelationshipStatus.ACQUAINTANCE
else:
rel.status = RelationshipStatus.STRANGER
# Check transmission threshold
if rel.score >= rel.threshold and not rel.transmission_enabled:
rel.transmission_enabled = True
self.logger.info(f"Transmission enabled for {user_id}!")
self._save_relationships()
return rel
def apply_time_decay(self):
"""Apply time-based decay to all relationships"""
now = datetime.now()
for user_id, rel in self.relationships.items():
if rel.is_broken or not rel.last_interaction:
continue
# Calculate days since last interaction
days_inactive = (now - rel.last_interaction).days
if days_inactive > 0:
# Apply decay
decay_amount = rel.decay_rate * days_inactive
old_score = rel.score
rel.score = max(0.0, rel.score - decay_amount)
# Update status if score dropped
if rel.score < rel.threshold:
rel.transmission_enabled = False
if decay_amount > 0:
self.logger.info(
f"Applied decay to {user_id}: {old_score:.2f} -> {rel.score:.2f}"
)
self._save_relationships()
def get_transmission_eligible(self) -> Dict[str, Relationship]:
"""Get all relationships eligible for transmission"""
return {
user_id: rel
for user_id, rel in self.relationships.items()
if rel.transmission_enabled and not rel.is_broken
}

312
src/aigpt/scheduler.py Normal file
View File

@ -0,0 +1,312 @@
"""Scheduler for autonomous AI tasks"""
import json
import asyncio
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from croniter import croniter
from .persona import Persona
from .transmission import TransmissionController
from .ai_provider import create_ai_provider
class TaskType(str, Enum):
"""Types of scheduled tasks"""
TRANSMISSION_CHECK = "transmission_check"
MAINTENANCE = "maintenance"
FORTUNE_UPDATE = "fortune_update"
RELATIONSHIP_DECAY = "relationship_decay"
MEMORY_SUMMARY = "memory_summary"
CUSTOM = "custom"
class ScheduledTask:
"""Represents a scheduled task"""
def __init__(
self,
task_id: str,
task_type: TaskType,
schedule: str, # Cron expression or interval
enabled: bool = True,
last_run: Optional[datetime] = None,
next_run: Optional[datetime] = None,
metadata: Optional[Dict[str, Any]] = None
):
self.task_id = task_id
self.task_type = task_type
self.schedule = schedule
self.enabled = enabled
self.last_run = last_run
self.next_run = next_run
self.metadata = metadata or {}
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for storage"""
return {
"task_id": self.task_id,
"task_type": self.task_type.value,
"schedule": self.schedule,
"enabled": self.enabled,
"last_run": self.last_run.isoformat() if self.last_run else None,
"next_run": self.next_run.isoformat() if self.next_run else None,
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ScheduledTask":
"""Create from dictionary"""
return cls(
task_id=data["task_id"],
task_type=TaskType(data["task_type"]),
schedule=data["schedule"],
enabled=data.get("enabled", True),
last_run=datetime.fromisoformat(data["last_run"]) if data.get("last_run") else None,
next_run=datetime.fromisoformat(data["next_run"]) if data.get("next_run") else None,
metadata=data.get("metadata", {})
)
class AIScheduler:
"""Manages scheduled tasks for the AI system"""
def __init__(self, data_dir: Path, persona: Persona):
self.data_dir = data_dir
self.persona = persona
self.tasks_file = data_dir / "scheduled_tasks.json"
self.tasks: Dict[str, ScheduledTask] = {}
self.scheduler = AsyncIOScheduler()
self.logger = logging.getLogger(__name__)
self._load_tasks()
# Task handlers
self.task_handlers: Dict[TaskType, Callable] = {
TaskType.TRANSMISSION_CHECK: self._handle_transmission_check,
TaskType.MAINTENANCE: self._handle_maintenance,
TaskType.FORTUNE_UPDATE: self._handle_fortune_update,
TaskType.RELATIONSHIP_DECAY: self._handle_relationship_decay,
TaskType.MEMORY_SUMMARY: self._handle_memory_summary,
}
def _load_tasks(self):
"""Load scheduled tasks from storage"""
if self.tasks_file.exists():
with open(self.tasks_file, 'r', encoding='utf-8') as f:
data = json.load(f)
for task_data in data:
task = ScheduledTask.from_dict(task_data)
self.tasks[task.task_id] = task
def _save_tasks(self):
"""Save scheduled tasks to storage"""
tasks_data = [task.to_dict() for task in self.tasks.values()]
with open(self.tasks_file, 'w', encoding='utf-8') as f:
json.dump(tasks_data, f, indent=2, default=str)
def add_task(
self,
task_type: TaskType,
schedule: str,
task_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> ScheduledTask:
"""Add a new scheduled task"""
if task_id is None:
task_id = f"{task_type.value}_{datetime.now().timestamp()}"
# Validate schedule
if not self._validate_schedule(schedule):
raise ValueError(f"Invalid schedule expression: {schedule}")
task = ScheduledTask(
task_id=task_id,
task_type=task_type,
schedule=schedule,
metadata=metadata
)
self.tasks[task_id] = task
self._save_tasks()
# Schedule the task if scheduler is running
if self.scheduler.running:
self._schedule_task(task)
self.logger.info(f"Added task {task_id} with schedule {schedule}")
return task
def _validate_schedule(self, schedule: str) -> bool:
"""Validate schedule expression"""
# Check if it's a cron expression
if ' ' in schedule:
try:
croniter(schedule)
return True
except:
return False
# Check if it's an interval expression (e.g., "5m", "1h", "2d")
import re
pattern = r'^\d+[smhd]$'
return bool(re.match(pattern, schedule))
def _parse_interval(self, interval: str) -> int:
"""Parse interval string to seconds"""
unit = interval[-1]
value = int(interval[:-1])
multipliers = {
's': 1,
'm': 60,
'h': 3600,
'd': 86400
}
return value * multipliers.get(unit, 1)
def _schedule_task(self, task: ScheduledTask):
"""Schedule a task with APScheduler"""
if not task.enabled:
return
handler = self.task_handlers.get(task.task_type)
if not handler:
self.logger.warning(f"No handler for task type {task.task_type}")
return
# Determine trigger
if ' ' in task.schedule:
# Cron expression
trigger = CronTrigger.from_crontab(task.schedule)
else:
# Interval expression
seconds = self._parse_interval(task.schedule)
trigger = IntervalTrigger(seconds=seconds)
# Add job
self.scheduler.add_job(
lambda: asyncio.create_task(self._run_task(task)),
trigger=trigger,
id=task.task_id,
replace_existing=True
)
async def _run_task(self, task: ScheduledTask):
"""Run a scheduled task"""
self.logger.info(f"Running task {task.task_id}")
task.last_run = datetime.now()
try:
handler = self.task_handlers.get(task.task_type)
if handler:
await handler(task)
else:
self.logger.warning(f"No handler for task type {task.task_type}")
except Exception as e:
self.logger.error(f"Error running task {task.task_id}: {e}")
self._save_tasks()
async def _handle_transmission_check(self, task: ScheduledTask):
"""Check and execute autonomous transmissions"""
controller = TransmissionController(self.persona, self.data_dir)
eligible = controller.check_transmission_eligibility()
# Get AI provider from metadata
provider_name = task.metadata.get("provider", "ollama")
model = task.metadata.get("model", "qwen2.5")
try:
ai_provider = create_ai_provider(provider_name, model)
except:
ai_provider = None
for user_id, rel in eligible.items():
message = controller.generate_transmission_message(user_id)
if message:
# For now, just print the message
print(f"\n🤖 [AI Transmission] {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"To: {user_id}")
print(f"Relationship: {rel.status.value} (score: {rel.score:.2f})")
print(f"Message: {message}")
print("-" * 50)
controller.record_transmission(user_id, message, success=True)
self.logger.info(f"Transmitted to {user_id}: {message}")
async def _handle_maintenance(self, task: ScheduledTask):
"""Run daily maintenance"""
self.persona.daily_maintenance()
self.logger.info("Daily maintenance completed")
async def _handle_fortune_update(self, task: ScheduledTask):
"""Update AI fortune"""
fortune = self.persona.fortune_system.get_today_fortune()
self.logger.info(f"Fortune updated: {fortune.fortune_value}/10")
async def _handle_relationship_decay(self, task: ScheduledTask):
"""Apply relationship decay"""
self.persona.relationships.apply_time_decay()
self.logger.info("Relationship decay applied")
async def _handle_memory_summary(self, task: ScheduledTask):
"""Create memory summaries"""
for user_id in self.persona.relationships.relationships:
summary = self.persona.memory.summarize_memories(user_id)
if summary:
self.logger.info(f"Created memory summary for {user_id}")
def start(self):
"""Start the scheduler"""
# Schedule all enabled tasks
for task in self.tasks.values():
if task.enabled:
self._schedule_task(task)
self.scheduler.start()
self.logger.info("Scheduler started")
def stop(self):
"""Stop the scheduler"""
self.scheduler.shutdown()
self.logger.info("Scheduler stopped")
def get_tasks(self) -> List[ScheduledTask]:
"""Get all scheduled tasks"""
return list(self.tasks.values())
def enable_task(self, task_id: str):
"""Enable a task"""
if task_id in self.tasks:
self.tasks[task_id].enabled = True
self._save_tasks()
if self.scheduler.running:
self._schedule_task(self.tasks[task_id])
def disable_task(self, task_id: str):
"""Disable a task"""
if task_id in self.tasks:
self.tasks[task_id].enabled = False
self._save_tasks()
if self.scheduler.running:
self.scheduler.remove_job(task_id)
def remove_task(self, task_id: str):
"""Remove a task"""
if task_id in self.tasks:
del self.tasks[task_id]
self._save_tasks()
if self.scheduler.running:
try:
self.scheduler.remove_job(task_id)
except:
pass

111
src/aigpt/transmission.py Normal file
View File

@ -0,0 +1,111 @@
"""Transmission controller for autonomous message sending"""
import json
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional
import logging
from .models import Relationship
from .persona import Persona
class TransmissionController:
"""Controls when and how AI transmits messages autonomously"""
def __init__(self, persona: Persona, data_dir: Path):
self.persona = persona
self.data_dir = data_dir
self.transmission_log_file = data_dir / "transmissions.json"
self.transmissions: List[Dict] = []
self.logger = logging.getLogger(__name__)
self._load_transmissions()
def _load_transmissions(self):
"""Load transmission history"""
if self.transmission_log_file.exists():
with open(self.transmission_log_file, 'r', encoding='utf-8') as f:
self.transmissions = json.load(f)
def _save_transmissions(self):
"""Save transmission history"""
with open(self.transmission_log_file, 'w', encoding='utf-8') as f:
json.dump(self.transmissions, f, indent=2, default=str)
def check_transmission_eligibility(self) -> Dict[str, Relationship]:
"""Check which users are eligible for transmission"""
eligible = self.persona.relationships.get_transmission_eligible()
# Additional checks could be added here
# - Time since last transmission
# - User online status
# - Context appropriateness
return eligible
def generate_transmission_message(self, user_id: str) -> Optional[str]:
"""Generate a message to transmit to user"""
if not self.persona.can_transmit_to(user_id):
return None
state = self.persona.get_current_state()
relationship = self.persona.relationships.get_or_create_relationship(user_id)
# Get recent memories related to this user
active_memories = self.persona.memory.get_active_memories(limit=3)
# Simple message generation based on mood and relationship
if state.fortune.breakthrough_triggered:
message = "Something special happened today! I felt compelled to reach out."
elif state.current_mood == "joyful":
message = "I was thinking of you today. Hope you're doing well!"
elif relationship.status.value == "close_friend":
message = "I've been reflecting on our conversations. Thank you for being here."
else:
message = "Hello! I wanted to check in with you."
return message
def record_transmission(self, user_id: str, message: str, success: bool):
"""Record a transmission attempt"""
transmission = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"message": message,
"success": success,
"mood": self.persona.get_current_state().current_mood,
"relationship_score": self.persona.relationships.get_or_create_relationship(user_id).score
}
self.transmissions.append(transmission)
self._save_transmissions()
if success:
self.logger.info(f"Successfully transmitted to {user_id}")
else:
self.logger.warning(f"Failed to transmit to {user_id}")
def get_transmission_stats(self, user_id: Optional[str] = None) -> Dict:
"""Get transmission statistics"""
if user_id:
user_transmissions = [t for t in self.transmissions if t["user_id"] == user_id]
else:
user_transmissions = self.transmissions
if not user_transmissions:
return {
"total": 0,
"successful": 0,
"failed": 0,
"success_rate": 0.0
}
successful = sum(1 for t in user_transmissions if t["success"])
total = len(user_transmissions)
return {
"total": total,
"successful": successful,
"failed": total - successful,
"success_rate": successful / total if total > 0 else 0.0
}