2
0

feat: session isolation and sibling agent context injection

- State dir now uses /tmp/aishell/{pid}/ with a 'latest' symlink,
  enabling parallel session execution without data loss
- Agents receive sibling agent names and tasks in their prompt,
  reducing work duplication across parallel agents

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-24 19:38:01 +09:00
parent 2a1945eb5e
commit 98151276cb
3 changed files with 100 additions and 47 deletions

View File

@@ -65,10 +65,10 @@ pub struct Agent {
impl Agent {
pub fn spawn(id: usize, name: &str, task: &str, cwd: &str) -> Result<Self, String> {
Self::spawn_with_config(id, name, task, cwd, None, None)
Self::spawn_with_config(id, name, task, cwd, None, None, &[])
}
pub fn spawn_with_config(id: usize, name: &str, task: &str, cwd: &str, host: Option<&str>, protocol: Option<&str>) -> Result<Self, String> {
pub fn spawn_with_config(id: usize, name: &str, task: &str, cwd: &str, host: Option<&str>, protocol: Option<&str>, siblings: &[(&str, &str)]) -> Result<Self, String> {
let cwd_path = std::path::Path::new(cwd);
if !cwd_path.is_dir() {
return Err(format!("directory not found: {cwd}"));
@@ -86,7 +86,14 @@ impl Agent {
let history = recent_session_summary();
let history_ctx = if history.is_empty() { String::new() }
else { format!("\n[session history]\n{history}") };
let full_task = format!("{identity}\n[task]\n{task}{git_ctx}{history_ctx}");
let sibling_ctx = if siblings.is_empty() { String::new() }
else {
let lines: Vec<String> = siblings.iter()
.map(|(n, t)| format!("- {n}: {t}"))
.collect();
format!("\n[sibling agents]\n{}", lines.join("\n"))
};
let full_task = format!("{identity}\n[task]\n{task}{git_ctx}{history_ctx}{sibling_ctx}");
claude::send_message(&mut stdin, &full_task);
let (tx, rx) = mpsc::channel();
claude::spawn_reader(child, stdout, tx);

View File

@@ -6,11 +6,41 @@ use crate::agent::Agent;
use crate::ai::{ClaudeManager, OutputEvent};
use crate::config;
const STATE_DIR: &str = "/tmp/aishell";
const STATE_BASE: &str = "/tmp/aishell";
use std::sync::OnceLock;
static SESSION_STATE_DIR: OnceLock<String> = OnceLock::new();
/// Get the active state directory.
/// Writers (after init_session): session-specific dir.
/// Readers (status/log/etc): `latest` symlink path.
fn state_dir() -> String {
SESSION_STATE_DIR.get()
.cloned()
.unwrap_or_else(|| format!("{STATE_BASE}/latest"))
}
/// Initialize a new session directory and update the `latest` symlink.
fn init_session() {
let session_id = std::process::id();
let dir = format!("{STATE_BASE}/{session_id}");
create_state_dir_at(&dir);
// Ensure base dir exists for symlink
create_state_dir_at(STATE_BASE);
// Update latest symlink
let latest = format!("{STATE_BASE}/latest");
let _ = std::fs::remove_file(&latest);
#[cfg(unix)]
{ let _ = std::os::unix::fs::symlink(&dir, &latest); }
#[cfg(not(unix))]
{ let _ = std::fs::write(&latest, &dir); }
let _ = SESSION_STATE_DIR.set(dir);
}
pub fn run(config_or_task: &str, cwd_override: Option<&str>, name_override: Option<&str>) -> Result<(), String> {
let _ = std::fs::remove_dir_all(STATE_DIR);
create_state_dir();
init_session();
let args: Vec<String> = std::env::args().collect();
let loop_mode = args.iter().any(|a| a == "--loop");
@@ -61,7 +91,7 @@ pub fn run(config_or_task: &str, cwd_override: Option<&str>, name_override: Opti
let skip_ai = agents.len() <= 1 && prev_decision.is_empty();
atomic_write(
&format!("{STATE_DIR}/loop.json"),
&format!("{}/loop.json", state_dir()),
b"{\"ready\":false}",
);
@@ -78,7 +108,7 @@ pub fn run(config_or_task: &str, cwd_override: Option<&str>, name_override: Opti
"decision": &d,
});
atomic_write(
&format!("{STATE_DIR}/decision.json"),
&format!("{}/decision.json", state_dir()),
serde_json::to_string_pretty(&data).unwrap_or_default().as_bytes(),
);
eprintln!(" ✓ Done\n");
@@ -113,7 +143,7 @@ pub fn run(config_or_task: &str, cwd_override: Option<&str>, name_override: Opti
for _ in 0..secs * 2 {
if !running.load(Ordering::Relaxed) { break; }
// Check for early stop signal
if let Ok(content) = std::fs::read_to_string(format!("{STATE_DIR}/loop.json")) {
if let Ok(content) = std::fs::read_to_string(format!("{}/loop.json", state_dir())) {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&content) {
if v["quit"].as_bool() == Some(true) { break; }
}
@@ -156,7 +186,7 @@ pub fn review() {
match execute_once(configs) {
Ok(()) => {
// Check if decision says "no issues" / "問題なし" / "コミット可能"
if let Ok(content) = std::fs::read_to_string(format!("{STATE_DIR}/decision.json")) {
if let Ok(content) = std::fs::read_to_string(format!("{}/decision.json", state_dir())) {
if let Ok(d) = serde_json::from_str::<serde_json::Value>(&content) {
let text = d["decision"].as_str().unwrap_or("").to_lowercase();
if text.contains("コミット可能") || text.contains("no issues") || text.contains("問題なし") || text.contains("commit") {
@@ -191,8 +221,7 @@ pub fn run_preset(preset_name: &str) -> Result<(), String> {
/// Single-cycle execution: agents → AI integration → save → exit.
fn execute_once(configs: Vec<config::AgentConfig>) -> Result<(), String> {
let _ = std::fs::remove_dir_all(STATE_DIR);
create_state_dir();
init_session();
let running = Arc::new(AtomicBool::new(true));
setup_ctrlc(running.clone());
let agents = spawn_and_wait(&configs, &running);
@@ -209,7 +238,7 @@ fn execute_once(configs: Vec<config::AgentConfig>) -> Result<(), String> {
"decision": &d,
});
atomic_write(
&format!("{STATE_DIR}/decision.json"),
&format!("{}/decision.json", state_dir()),
serde_json::to_string_pretty(&data).unwrap_or_default().as_bytes(),
);
eprintln!(" ✓ Done\n");
@@ -229,14 +258,13 @@ fn execute_once(configs: Vec<config::AgentConfig>) -> Result<(), String> {
eprintln!(" saved: {path}");
// Write done marker for background monitoring
atomic_write(&format!("{STATE_DIR}/done"), b"1");
atomic_write(&format!("{}/done", state_dir()), b"1");
Ok(())
}
fn run_with_configs(configs: Vec<config::AgentConfig>) -> Result<(), String> {
let _ = std::fs::remove_dir_all(STATE_DIR);
create_state_dir();
init_session();
let running = Arc::new(AtomicBool::new(true));
setup_ctrlc(running.clone());
@@ -253,7 +281,7 @@ fn run_with_configs(configs: Vec<config::AgentConfig>) -> Result<(), String> {
write_state(&agents);
let skip_ai = agents.len() <= 1 && prev_decision.is_empty();
atomic_write(&format!("{STATE_DIR}/loop.json"), b"{\"ready\":false}");
atomic_write(&format!("{}/loop.json", state_dir()), b"{\"ready\":false}");
let decision = if skip_ai {
eprintln!("\n (AI integration skipped: single agent)");
@@ -268,7 +296,7 @@ fn run_with_configs(configs: Vec<config::AgentConfig>) -> Result<(), String> {
"decision": &d,
});
atomic_write(
&format!("{STATE_DIR}/decision.json"),
&format!("{}/decision.json", state_dir()),
serde_json::to_string_pretty(&data).unwrap_or_default().as_bytes(),
);
eprintln!(" ✓ Done\n");
@@ -328,12 +356,10 @@ fn run_once(configs: &[config::AgentConfig]) -> Result<(), String> {
eprintln!("\n AI proposed {} agent(s):", next.len());
for c in &next { eprintln!(" @{} {}", c.name, c.task); }
let _ = std::fs::remove_dir_all(STATE_DIR);
create_state_dir();
return run_with_configs(next);
}
atomic_write(&format!("{STATE_DIR}/done"), b"1");
atomic_write(&format!("{}/done", state_dir()), b"1");
Ok(())
}
@@ -359,11 +385,20 @@ fn spawn_and_wait(configs: &[config::AgentConfig], running: &Arc<AtomicBool>) ->
let mut pending: Vec<&config::AgentConfig> = Vec::new();
let mut next_id = 1;
// Build sibling list for context injection
let all_siblings: Vec<(String, String)> = configs.iter()
.map(|c| (c.name.clone(), c.task.clone()))
.collect();
// Spawn agents without dependencies immediately, defer the rest
for cfg in configs {
if cfg.depends_on.is_empty() {
let cwd = expand_tilde(&cfg.cwd);
match Agent::spawn_with_config(next_id, &cfg.name, &cfg.task, &cwd, cfg.host.as_deref(), cfg.protocol.as_deref()) {
let siblings: Vec<(&str, &str)> = all_siblings.iter()
.filter(|(n, _)| n != &cfg.name)
.map(|(n, t)| (n.as_str(), t.as_str()))
.collect();
match Agent::spawn_with_config(next_id, &cfg.name, &cfg.task, &cwd, cfg.host.as_deref(), cfg.protocol.as_deref(), &siblings) {
Ok(agent) => {
eprintln!(" started: {} ({})", cfg.name, cwd);
agents.push(agent);
@@ -394,7 +429,11 @@ fn spawn_and_wait(configs: &[config::AgentConfig], running: &Arc<AtomicBool>) ->
});
if deps_met {
let cwd = expand_tilde(&cfg.cwd);
match Agent::spawn_with_config(next_id, &cfg.name, &cfg.task, &cwd, cfg.host.as_deref(), cfg.protocol.as_deref()) {
let siblings: Vec<(&str, &str)> = all_siblings.iter()
.filter(|(n, _)| n != &cfg.name)
.map(|(n, t)| (n.as_str(), t.as_str()))
.collect();
match Agent::spawn_with_config(next_id, &cfg.name, &cfg.task, &cwd, cfg.host.as_deref(), cfg.protocol.as_deref(), &siblings) {
Ok(agent) => {
eprintln!(" started: {} ({})", cfg.name, cwd);
newly_started.push(agent);
@@ -487,14 +526,14 @@ fn write_loop_waiting(agents: &[Agent], has_next: bool) {
});
atomic_write(
&format!("{STATE_DIR}/loop.json"),
&format!("{}/loop.json", state_dir()),
serde_json::to_string_pretty(&control).unwrap_or_default().as_bytes(),
);
}
/// Reset loop.json after resuming: next=false, save=null.
fn reset_loop_control() {
let path = format!("{STATE_DIR}/loop.json");
let path = format!("{}/loop.json", state_dir());
if let Ok(content) = std::fs::read_to_string(&path) {
if let Ok(mut v) = serde_json::from_str::<serde_json::Value>(&content) {
v["next"] = serde_json::json!(false);
@@ -507,7 +546,7 @@ fn reset_loop_control() {
/// Poll loop.json until `next: true` or `quit: true`.
/// Only reads signals when `ready: true` (set by write_loop_waiting).
fn wait_for_loop_signal(running: &Arc<AtomicBool>) -> LoopAction {
let path = format!("{STATE_DIR}/loop.json");
let path = format!("{}/loop.json", state_dir());
loop {
if !running.load(Ordering::Relaxed) {
return LoopAction::Quit;
@@ -693,7 +732,7 @@ fn integrate_results(agents: &[Agent], prev_decision: &str, cycle: usize) -> Res
// ── Status/Log/Decision commands ───────────────────────────
pub fn status(verbose: bool) {
let path = format!("{STATE_DIR}/agents.json");
let path = format!("{}/agents.json", state_dir());
match std::fs::read_to_string(&path) {
Ok(content) => {
if let Ok(agents) = serde_json::from_str::<Vec<serde_json::Value>>(&content) {
@@ -709,7 +748,7 @@ pub fn status(verbose: bool) {
};
println!(" {icon} {id} {name:16} {c:10} {elapsed:>6} {task}");
if verbose {
if let Ok(d) = std::fs::read_to_string(format!("{STATE_DIR}/{id}.json")) {
if let Ok(d) = std::fs::read_to_string(format!("{}/{id}.json", state_dir())) {
if let Ok(d) = serde_json::from_str::<serde_json::Value>(&d) {
let s = d["summary"].as_str().unwrap_or("");
if !s.is_empty() { println!(" {s}"); }
@@ -725,7 +764,7 @@ pub fn status(verbose: bool) {
pub fn log(id_or_name: &str) {
let path = if id_or_name.chars().all(|c| c.is_ascii_digit()) {
format!("{STATE_DIR}/{id_or_name}.json")
format!("{}/{id_or_name}.json", state_dir())
} else {
find_agent_by_name(id_or_name).unwrap_or_default()
};
@@ -747,7 +786,7 @@ pub fn log(id_or_name: &str) {
}
pub fn decision() {
let path = format!("{STATE_DIR}/decision.json");
let path = format!("{}/decision.json", state_dir());
match std::fs::read_to_string(&path) {
Ok(content) => {
if let Ok(d) = serde_json::from_str::<serde_json::Value>(&content) {
@@ -837,7 +876,7 @@ pub fn plan(preset: Option<&str>, config_path: Option<&str>) {
/// Save the latest decision to aigpt memory (explicit, not automatic).
pub fn remember() {
let dec_path = format!("{STATE_DIR}/decision.json");
let dec_path = format!("{}/decision.json", state_dir());
match std::fs::read_to_string(&dec_path) {
Ok(content) => {
if let Ok(d) = serde_json::from_str::<serde_json::Value>(&content) {
@@ -859,7 +898,7 @@ pub fn remember() {
/// Wait for background run to complete, then show summary.
pub fn wait_done() {
let done_path = format!("{STATE_DIR}/done");
let done_path = format!("{}/done", state_dir());
eprint!(" waiting...");
loop {
if std::path::Path::new(&done_path).exists() {
@@ -867,7 +906,7 @@ pub fn wait_done() {
let _ = std::fs::remove_file(&done_path);
// Show compact summary
status(false);
if let Ok(content) = std::fs::read_to_string(format!("{STATE_DIR}/decision.json")) {
if let Ok(content) = std::fs::read_to_string(format!("{}/decision.json", state_dir())) {
if let Ok(d) = serde_json::from_str::<serde_json::Value>(&content) {
let text = d["decision"].as_str().unwrap_or("");
let first: String = text.lines()
@@ -891,7 +930,7 @@ pub fn signal_next(save: Vec<usize>) {
};
let control = serde_json::json!({"ready": true, "next": true, "quit": false, "save": save_val});
atomic_write(
&format!("{STATE_DIR}/loop.json"),
&format!("{}/loop.json", state_dir()),
serde_json::to_string_pretty(&control).unwrap_or_default().as_bytes(),
);
eprintln!("signaled: next (save: {:?})", save);
@@ -900,7 +939,7 @@ pub fn signal_next(save: Vec<usize>) {
pub fn signal_quit() {
let control = serde_json::json!({"ready": true, "quit": true});
atomic_write(
&format!("{STATE_DIR}/loop.json"),
&format!("{}/loop.json", state_dir()),
serde_json::to_string_pretty(&control).unwrap_or_default().as_bytes(),
);
eprintln!("signaled: quit");
@@ -1063,7 +1102,7 @@ pub fn context() {
let git_diff = run("git", &["diff", "--stat"]);
// Session info
let dec_path = format!("{STATE_DIR}/decision.json");
let dec_path = format!("{}/decision.json", state_dir());
let session_dir = config::sessions_dir();
let session_count = std::fs::read_dir(&session_dir)
.into_iter().flatten().flatten()
@@ -1113,7 +1152,7 @@ pub fn context() {
// Agents
println!("\n[agents]");
let agent_path = format!("{STATE_DIR}/agents.json");
let agent_path = format!("{}/agents.json", state_dir());
if let Ok(content) = std::fs::read_to_string(&agent_path) {
if let Ok(agents) = serde_json::from_str::<Vec<serde_json::Value>>(&content) {
if agents.is_empty() {
@@ -1126,7 +1165,7 @@ pub fn context() {
let icon = match c { "success" => "", "error" | "stopped" => "", _ => "" };
println!(" {icon} {name} [{c}]");
// Show first 2 meaningful lines from detail file
if let Ok(detail) = std::fs::read_to_string(format!("{STATE_DIR}/{id}.json")) {
if let Ok(detail) = std::fs::read_to_string(format!("{}/{id}.json", state_dir())) {
if let Ok(d) = serde_json::from_str::<serde_json::Value>(&detail) {
let result = d["result"].as_str().unwrap_or("");
for line in result.lines().filter(|l| !l.is_empty()).take(2) {
@@ -1144,7 +1183,7 @@ pub fn context() {
// Last decision
println!("\n[decision]");
let dec_path = format!("{STATE_DIR}/decision.json");
let dec_path = format!("{}/decision.json", state_dir());
if let Ok(content) = std::fs::read_to_string(&dec_path) {
if let Ok(d) = serde_json::from_str::<serde_json::Value>(&content) {
let text = d["decision"].as_str().unwrap_or("");
@@ -1232,12 +1271,12 @@ fn show_cmd_history(filter: Option<&str>) {
}
fn find_agent_by_name(name: &str) -> Option<String> {
let content = std::fs::read_to_string(format!("{STATE_DIR}/agents.json")).ok()?;
let content = std::fs::read_to_string(format!("{}/agents.json", state_dir())).ok()?;
let agents: Vec<serde_json::Value> = serde_json::from_str(&content).ok()?;
agents.iter()
.find(|a| a["name"].as_str() == Some(name))
.and_then(|a| a["id"].as_u64())
.map(|id| format!("{STATE_DIR}/{id}.json"))
.map(|id| format!("{}/{id}.json", state_dir()))
}
// ── Infra ──────────────────────────────────────────────────
@@ -1265,12 +1304,12 @@ fn setup_ctrlc(running: Arc<AtomicBool>) {
fn write_state(agents: &[Agent]) {
let summary: Vec<_> = agents.iter().map(|a| a.to_summary_json()).collect();
atomic_write(
&format!("{STATE_DIR}/agents.json"),
&format!("{}/agents.json", state_dir()),
serde_json::to_string_pretty(&serde_json::Value::Array(summary)).unwrap_or_default().as_bytes(),
);
for a in agents {
atomic_write(
&format!("{STATE_DIR}/{}.json", a.id),
&format!("{}/{}.json", state_dir(), a.id),
serde_json::to_string_pretty(&a.to_json()).unwrap_or_default().as_bytes(),
);
}
@@ -1296,13 +1335,13 @@ fn atomic_write(path: &str, content: &[u8]) {
}
}
fn create_state_dir() {
fn create_state_dir_at(path: &str) {
#[cfg(unix)] {
use std::os::unix::fs::DirBuilderExt;
let _ = std::fs::DirBuilder::new().mode(0o700).recursive(true).create(STATE_DIR);
let _ = std::fs::DirBuilder::new().mode(0o700).recursive(true).create(path);
}
#[cfg(not(unix))] {
let _ = std::fs::create_dir_all(STATE_DIR);
let _ = std::fs::create_dir_all(path);
}
}

View File

@@ -101,10 +101,17 @@ impl App {
claude.send(&msg);
}
let all_siblings: Vec<(String, String)> = configs.iter()
.map(|c| (c.name.clone(), c.task.clone()))
.collect();
let mut errors = Vec::new();
for cfg in configs {
let cwd = expand_tilde(&cfg.cwd);
match Agent::spawn_with_config(app.next_id, &cfg.name, &cfg.task, &cwd, cfg.host.as_deref(), cfg.protocol.as_deref()) {
let siblings: Vec<(&str, &str)> = all_siblings.iter()
.filter(|(n, _)| n != &cfg.name)
.map(|(n, t)| (n.as_str(), t.as_str()))
.collect();
match Agent::spawn_with_config(app.next_id, &cfg.name, &cfg.task, &cwd, cfg.host.as_deref(), cfg.protocol.as_deref(), &siblings) {
Ok(agent) => {
app.agents.push(agent);
app.next_id += 1;