From 98151276cbd566d73d787795cecd42a7166c6edb Mon Sep 17 00:00:00 2001 From: syui Date: Tue, 24 Mar 2026 19:38:01 +0900 Subject: [PATCH] feat: session isolation and sibling agent context injection - State dir now uses /tmp/aishell/{pid}/ with a 'latest' symlink, enabling parallel session execution without data loss - Agents receive sibling agent names and tasks in their prompt, reducing work duplication across parallel agents Co-Authored-By: Claude Opus 4.6 (1M context) --- src/agent.rs | 13 +++-- src/headless.rs | 125 +++++++++++++++++++++++++++++++----------------- src/tui.rs | 9 +++- 3 files changed, 100 insertions(+), 47 deletions(-) diff --git a/src/agent.rs b/src/agent.rs index c630a1b..f3611e2 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -65,10 +65,10 @@ pub struct Agent { impl Agent { pub fn spawn(id: usize, name: &str, task: &str, cwd: &str) -> Result { - Self::spawn_with_config(id, name, task, cwd, None, None) + Self::spawn_with_config(id, name, task, cwd, None, None, &[]) } - pub fn spawn_with_config(id: usize, name: &str, task: &str, cwd: &str, host: Option<&str>, protocol: Option<&str>) -> Result { + pub fn spawn_with_config(id: usize, name: &str, task: &str, cwd: &str, host: Option<&str>, protocol: Option<&str>, siblings: &[(&str, &str)]) -> Result { let cwd_path = std::path::Path::new(cwd); if !cwd_path.is_dir() { return Err(format!("directory not found: {cwd}")); @@ -86,7 +86,14 @@ impl Agent { let history = recent_session_summary(); let history_ctx = if history.is_empty() { String::new() } else { format!("\n[session history]\n{history}") }; - let full_task = format!("{identity}\n[task]\n{task}{git_ctx}{history_ctx}"); + let sibling_ctx = if siblings.is_empty() { String::new() } + else { + let lines: Vec = siblings.iter() + .map(|(n, t)| format!("- {n}: {t}")) + .collect(); + format!("\n[sibling agents]\n{}", lines.join("\n")) + }; + let full_task = format!("{identity}\n[task]\n{task}{git_ctx}{history_ctx}{sibling_ctx}"); claude::send_message(&mut stdin, &full_task); let (tx, rx) = mpsc::channel(); claude::spawn_reader(child, stdout, tx); diff --git a/src/headless.rs b/src/headless.rs index 446d998..a0af3f3 100644 --- a/src/headless.rs +++ b/src/headless.rs @@ -6,11 +6,41 @@ use crate::agent::Agent; use crate::ai::{ClaudeManager, OutputEvent}; use crate::config; -const STATE_DIR: &str = "/tmp/aishell"; +const STATE_BASE: &str = "/tmp/aishell"; + +use std::sync::OnceLock; +static SESSION_STATE_DIR: OnceLock = OnceLock::new(); + +/// Get the active state directory. +/// Writers (after init_session): session-specific dir. +/// Readers (status/log/etc): `latest` symlink path. +fn state_dir() -> String { + SESSION_STATE_DIR.get() + .cloned() + .unwrap_or_else(|| format!("{STATE_BASE}/latest")) +} + +/// Initialize a new session directory and update the `latest` symlink. +fn init_session() { + let session_id = std::process::id(); + let dir = format!("{STATE_BASE}/{session_id}"); + create_state_dir_at(&dir); + // Ensure base dir exists for symlink + create_state_dir_at(STATE_BASE); + + // Update latest symlink + let latest = format!("{STATE_BASE}/latest"); + let _ = std::fs::remove_file(&latest); + #[cfg(unix)] + { let _ = std::os::unix::fs::symlink(&dir, &latest); } + #[cfg(not(unix))] + { let _ = std::fs::write(&latest, &dir); } + + let _ = SESSION_STATE_DIR.set(dir); +} pub fn run(config_or_task: &str, cwd_override: Option<&str>, name_override: Option<&str>) -> Result<(), String> { - let _ = std::fs::remove_dir_all(STATE_DIR); - create_state_dir(); + init_session(); let args: Vec = std::env::args().collect(); let loop_mode = args.iter().any(|a| a == "--loop"); @@ -61,7 +91,7 @@ pub fn run(config_or_task: &str, cwd_override: Option<&str>, name_override: Opti let skip_ai = agents.len() <= 1 && prev_decision.is_empty(); atomic_write( - &format!("{STATE_DIR}/loop.json"), + &format!("{}/loop.json", state_dir()), b"{\"ready\":false}", ); @@ -78,7 +108,7 @@ pub fn run(config_or_task: &str, cwd_override: Option<&str>, name_override: Opti "decision": &d, }); atomic_write( - &format!("{STATE_DIR}/decision.json"), + &format!("{}/decision.json", state_dir()), serde_json::to_string_pretty(&data).unwrap_or_default().as_bytes(), ); eprintln!(" ✓ Done\n"); @@ -113,7 +143,7 @@ pub fn run(config_or_task: &str, cwd_override: Option<&str>, name_override: Opti for _ in 0..secs * 2 { if !running.load(Ordering::Relaxed) { break; } // Check for early stop signal - if let Ok(content) = std::fs::read_to_string(format!("{STATE_DIR}/loop.json")) { + if let Ok(content) = std::fs::read_to_string(format!("{}/loop.json", state_dir())) { if let Ok(v) = serde_json::from_str::(&content) { if v["quit"].as_bool() == Some(true) { break; } } @@ -156,7 +186,7 @@ pub fn review() { match execute_once(configs) { Ok(()) => { // Check if decision says "no issues" / "問題なし" / "コミット可能" - if let Ok(content) = std::fs::read_to_string(format!("{STATE_DIR}/decision.json")) { + if let Ok(content) = std::fs::read_to_string(format!("{}/decision.json", state_dir())) { if let Ok(d) = serde_json::from_str::(&content) { let text = d["decision"].as_str().unwrap_or("").to_lowercase(); if text.contains("コミット可能") || text.contains("no issues") || text.contains("問題なし") || text.contains("commit") { @@ -191,8 +221,7 @@ pub fn run_preset(preset_name: &str) -> Result<(), String> { /// Single-cycle execution: agents → AI integration → save → exit. fn execute_once(configs: Vec) -> Result<(), String> { - let _ = std::fs::remove_dir_all(STATE_DIR); - create_state_dir(); + init_session(); let running = Arc::new(AtomicBool::new(true)); setup_ctrlc(running.clone()); let agents = spawn_and_wait(&configs, &running); @@ -209,7 +238,7 @@ fn execute_once(configs: Vec) -> Result<(), String> { "decision": &d, }); atomic_write( - &format!("{STATE_DIR}/decision.json"), + &format!("{}/decision.json", state_dir()), serde_json::to_string_pretty(&data).unwrap_or_default().as_bytes(), ); eprintln!(" ✓ Done\n"); @@ -229,14 +258,13 @@ fn execute_once(configs: Vec) -> Result<(), String> { eprintln!(" saved: {path}"); // Write done marker for background monitoring - atomic_write(&format!("{STATE_DIR}/done"), b"1"); + atomic_write(&format!("{}/done", state_dir()), b"1"); Ok(()) } fn run_with_configs(configs: Vec) -> Result<(), String> { - let _ = std::fs::remove_dir_all(STATE_DIR); - create_state_dir(); + init_session(); let running = Arc::new(AtomicBool::new(true)); setup_ctrlc(running.clone()); @@ -253,7 +281,7 @@ fn run_with_configs(configs: Vec) -> Result<(), String> { write_state(&agents); let skip_ai = agents.len() <= 1 && prev_decision.is_empty(); - atomic_write(&format!("{STATE_DIR}/loop.json"), b"{\"ready\":false}"); + atomic_write(&format!("{}/loop.json", state_dir()), b"{\"ready\":false}"); let decision = if skip_ai { eprintln!("\n (AI integration skipped: single agent)"); @@ -268,7 +296,7 @@ fn run_with_configs(configs: Vec) -> Result<(), String> { "decision": &d, }); atomic_write( - &format!("{STATE_DIR}/decision.json"), + &format!("{}/decision.json", state_dir()), serde_json::to_string_pretty(&data).unwrap_or_default().as_bytes(), ); eprintln!(" ✓ Done\n"); @@ -328,12 +356,10 @@ fn run_once(configs: &[config::AgentConfig]) -> Result<(), String> { eprintln!("\n AI proposed {} agent(s):", next.len()); for c in &next { eprintln!(" @{} {}", c.name, c.task); } - let _ = std::fs::remove_dir_all(STATE_DIR); - create_state_dir(); return run_with_configs(next); } - atomic_write(&format!("{STATE_DIR}/done"), b"1"); + atomic_write(&format!("{}/done", state_dir()), b"1"); Ok(()) } @@ -359,11 +385,20 @@ fn spawn_and_wait(configs: &[config::AgentConfig], running: &Arc) -> let mut pending: Vec<&config::AgentConfig> = Vec::new(); let mut next_id = 1; + // Build sibling list for context injection + let all_siblings: Vec<(String, String)> = configs.iter() + .map(|c| (c.name.clone(), c.task.clone())) + .collect(); + // Spawn agents without dependencies immediately, defer the rest for cfg in configs { if cfg.depends_on.is_empty() { let cwd = expand_tilde(&cfg.cwd); - match Agent::spawn_with_config(next_id, &cfg.name, &cfg.task, &cwd, cfg.host.as_deref(), cfg.protocol.as_deref()) { + let siblings: Vec<(&str, &str)> = all_siblings.iter() + .filter(|(n, _)| n != &cfg.name) + .map(|(n, t)| (n.as_str(), t.as_str())) + .collect(); + match Agent::spawn_with_config(next_id, &cfg.name, &cfg.task, &cwd, cfg.host.as_deref(), cfg.protocol.as_deref(), &siblings) { Ok(agent) => { eprintln!(" started: {} ({})", cfg.name, cwd); agents.push(agent); @@ -394,7 +429,11 @@ fn spawn_and_wait(configs: &[config::AgentConfig], running: &Arc) -> }); if deps_met { let cwd = expand_tilde(&cfg.cwd); - match Agent::spawn_with_config(next_id, &cfg.name, &cfg.task, &cwd, cfg.host.as_deref(), cfg.protocol.as_deref()) { + let siblings: Vec<(&str, &str)> = all_siblings.iter() + .filter(|(n, _)| n != &cfg.name) + .map(|(n, t)| (n.as_str(), t.as_str())) + .collect(); + match Agent::spawn_with_config(next_id, &cfg.name, &cfg.task, &cwd, cfg.host.as_deref(), cfg.protocol.as_deref(), &siblings) { Ok(agent) => { eprintln!(" started: {} ({})", cfg.name, cwd); newly_started.push(agent); @@ -487,14 +526,14 @@ fn write_loop_waiting(agents: &[Agent], has_next: bool) { }); atomic_write( - &format!("{STATE_DIR}/loop.json"), + &format!("{}/loop.json", state_dir()), serde_json::to_string_pretty(&control).unwrap_or_default().as_bytes(), ); } /// Reset loop.json after resuming: next=false, save=null. fn reset_loop_control() { - let path = format!("{STATE_DIR}/loop.json"); + let path = format!("{}/loop.json", state_dir()); if let Ok(content) = std::fs::read_to_string(&path) { if let Ok(mut v) = serde_json::from_str::(&content) { v["next"] = serde_json::json!(false); @@ -507,7 +546,7 @@ fn reset_loop_control() { /// Poll loop.json until `next: true` or `quit: true`. /// Only reads signals when `ready: true` (set by write_loop_waiting). fn wait_for_loop_signal(running: &Arc) -> LoopAction { - let path = format!("{STATE_DIR}/loop.json"); + let path = format!("{}/loop.json", state_dir()); loop { if !running.load(Ordering::Relaxed) { return LoopAction::Quit; @@ -693,7 +732,7 @@ fn integrate_results(agents: &[Agent], prev_decision: &str, cycle: usize) -> Res // ── Status/Log/Decision commands ─────────────────────────── pub fn status(verbose: bool) { - let path = format!("{STATE_DIR}/agents.json"); + let path = format!("{}/agents.json", state_dir()); match std::fs::read_to_string(&path) { Ok(content) => { if let Ok(agents) = serde_json::from_str::>(&content) { @@ -709,7 +748,7 @@ pub fn status(verbose: bool) { }; println!(" {icon} {id} {name:16} {c:10} {elapsed:>6} {task}"); if verbose { - if let Ok(d) = std::fs::read_to_string(format!("{STATE_DIR}/{id}.json")) { + if let Ok(d) = std::fs::read_to_string(format!("{}/{id}.json", state_dir())) { if let Ok(d) = serde_json::from_str::(&d) { let s = d["summary"].as_str().unwrap_or(""); if !s.is_empty() { println!(" {s}"); } @@ -725,7 +764,7 @@ pub fn status(verbose: bool) { pub fn log(id_or_name: &str) { let path = if id_or_name.chars().all(|c| c.is_ascii_digit()) { - format!("{STATE_DIR}/{id_or_name}.json") + format!("{}/{id_or_name}.json", state_dir()) } else { find_agent_by_name(id_or_name).unwrap_or_default() }; @@ -747,7 +786,7 @@ pub fn log(id_or_name: &str) { } pub fn decision() { - let path = format!("{STATE_DIR}/decision.json"); + let path = format!("{}/decision.json", state_dir()); match std::fs::read_to_string(&path) { Ok(content) => { if let Ok(d) = serde_json::from_str::(&content) { @@ -837,7 +876,7 @@ pub fn plan(preset: Option<&str>, config_path: Option<&str>) { /// Save the latest decision to aigpt memory (explicit, not automatic). pub fn remember() { - let dec_path = format!("{STATE_DIR}/decision.json"); + let dec_path = format!("{}/decision.json", state_dir()); match std::fs::read_to_string(&dec_path) { Ok(content) => { if let Ok(d) = serde_json::from_str::(&content) { @@ -859,7 +898,7 @@ pub fn remember() { /// Wait for background run to complete, then show summary. pub fn wait_done() { - let done_path = format!("{STATE_DIR}/done"); + let done_path = format!("{}/done", state_dir()); eprint!(" waiting..."); loop { if std::path::Path::new(&done_path).exists() { @@ -867,7 +906,7 @@ pub fn wait_done() { let _ = std::fs::remove_file(&done_path); // Show compact summary status(false); - if let Ok(content) = std::fs::read_to_string(format!("{STATE_DIR}/decision.json")) { + if let Ok(content) = std::fs::read_to_string(format!("{}/decision.json", state_dir())) { if let Ok(d) = serde_json::from_str::(&content) { let text = d["decision"].as_str().unwrap_or(""); let first: String = text.lines() @@ -891,7 +930,7 @@ pub fn signal_next(save: Vec) { }; let control = serde_json::json!({"ready": true, "next": true, "quit": false, "save": save_val}); atomic_write( - &format!("{STATE_DIR}/loop.json"), + &format!("{}/loop.json", state_dir()), serde_json::to_string_pretty(&control).unwrap_or_default().as_bytes(), ); eprintln!("signaled: next (save: {:?})", save); @@ -900,7 +939,7 @@ pub fn signal_next(save: Vec) { pub fn signal_quit() { let control = serde_json::json!({"ready": true, "quit": true}); atomic_write( - &format!("{STATE_DIR}/loop.json"), + &format!("{}/loop.json", state_dir()), serde_json::to_string_pretty(&control).unwrap_or_default().as_bytes(), ); eprintln!("signaled: quit"); @@ -1063,7 +1102,7 @@ pub fn context() { let git_diff = run("git", &["diff", "--stat"]); // Session info - let dec_path = format!("{STATE_DIR}/decision.json"); + let dec_path = format!("{}/decision.json", state_dir()); let session_dir = config::sessions_dir(); let session_count = std::fs::read_dir(&session_dir) .into_iter().flatten().flatten() @@ -1113,7 +1152,7 @@ pub fn context() { // Agents println!("\n[agents]"); - let agent_path = format!("{STATE_DIR}/agents.json"); + let agent_path = format!("{}/agents.json", state_dir()); if let Ok(content) = std::fs::read_to_string(&agent_path) { if let Ok(agents) = serde_json::from_str::>(&content) { if agents.is_empty() { @@ -1126,7 +1165,7 @@ pub fn context() { let icon = match c { "success" => "✓", "error" | "stopped" => "✗", _ => "●" }; println!(" {icon} {name} [{c}]"); // Show first 2 meaningful lines from detail file - if let Ok(detail) = std::fs::read_to_string(format!("{STATE_DIR}/{id}.json")) { + if let Ok(detail) = std::fs::read_to_string(format!("{}/{id}.json", state_dir())) { if let Ok(d) = serde_json::from_str::(&detail) { let result = d["result"].as_str().unwrap_or(""); for line in result.lines().filter(|l| !l.is_empty()).take(2) { @@ -1144,7 +1183,7 @@ pub fn context() { // Last decision println!("\n[decision]"); - let dec_path = format!("{STATE_DIR}/decision.json"); + let dec_path = format!("{}/decision.json", state_dir()); if let Ok(content) = std::fs::read_to_string(&dec_path) { if let Ok(d) = serde_json::from_str::(&content) { let text = d["decision"].as_str().unwrap_or(""); @@ -1232,12 +1271,12 @@ fn show_cmd_history(filter: Option<&str>) { } fn find_agent_by_name(name: &str) -> Option { - let content = std::fs::read_to_string(format!("{STATE_DIR}/agents.json")).ok()?; + let content = std::fs::read_to_string(format!("{}/agents.json", state_dir())).ok()?; let agents: Vec = serde_json::from_str(&content).ok()?; agents.iter() .find(|a| a["name"].as_str() == Some(name)) .and_then(|a| a["id"].as_u64()) - .map(|id| format!("{STATE_DIR}/{id}.json")) + .map(|id| format!("{}/{id}.json", state_dir())) } // ── Infra ────────────────────────────────────────────────── @@ -1265,12 +1304,12 @@ fn setup_ctrlc(running: Arc) { fn write_state(agents: &[Agent]) { let summary: Vec<_> = agents.iter().map(|a| a.to_summary_json()).collect(); atomic_write( - &format!("{STATE_DIR}/agents.json"), + &format!("{}/agents.json", state_dir()), serde_json::to_string_pretty(&serde_json::Value::Array(summary)).unwrap_or_default().as_bytes(), ); for a in agents { atomic_write( - &format!("{STATE_DIR}/{}.json", a.id), + &format!("{}/{}.json", state_dir(), a.id), serde_json::to_string_pretty(&a.to_json()).unwrap_or_default().as_bytes(), ); } @@ -1296,13 +1335,13 @@ fn atomic_write(path: &str, content: &[u8]) { } } -fn create_state_dir() { +fn create_state_dir_at(path: &str) { #[cfg(unix)] { use std::os::unix::fs::DirBuilderExt; - let _ = std::fs::DirBuilder::new().mode(0o700).recursive(true).create(STATE_DIR); + let _ = std::fs::DirBuilder::new().mode(0o700).recursive(true).create(path); } #[cfg(not(unix))] { - let _ = std::fs::create_dir_all(STATE_DIR); + let _ = std::fs::create_dir_all(path); } } diff --git a/src/tui.rs b/src/tui.rs index 1fb1f09..d0fb678 100644 --- a/src/tui.rs +++ b/src/tui.rs @@ -101,10 +101,17 @@ impl App { claude.send(&msg); } + let all_siblings: Vec<(String, String)> = configs.iter() + .map(|c| (c.name.clone(), c.task.clone())) + .collect(); let mut errors = Vec::new(); for cfg in configs { let cwd = expand_tilde(&cfg.cwd); - match Agent::spawn_with_config(app.next_id, &cfg.name, &cfg.task, &cwd, cfg.host.as_deref(), cfg.protocol.as_deref()) { + let siblings: Vec<(&str, &str)> = all_siblings.iter() + .filter(|(n, _)| n != &cfg.name) + .map(|(n, t)| (n.as_str(), t.as_str())) + .collect(); + match Agent::spawn_with_config(app.next_id, &cfg.name, &cfg.task, &cwd, cfg.host.as_deref(), cfg.protocol.as_deref(), &siblings) { Ok(agent) => { app.agents.push(agent); app.next_id += 1;