use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::{HashMap, HashSet}; use std::fs; use std::path::PathBuf; use std::process::Stdio; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use super::auth; use super::token; use crate::lexicons::{app_bsky_notification, com_atproto_repo}; use crate::tid; use crate::types::{PutRecordRequest, PutRecordResponse}; use crate::xrpc::XrpcClient; /// Default chat proxy DID service const CHAT_PROXY_HEADER: &str = "did:web:bsky.syu.is#bsky_chat"; const BOT_RULES: &str = include_str!("../rules/bot.md"); /// Persistent Claude session using stream-json protocol struct ClaudeSession { stdin: tokio::process::ChildStdin, response_rx: tokio::sync::mpsc::Receiver, } impl ClaudeSession { /// Spawn a persistent claude process with stream-json I/O async fn spawn(user_mode: bool) -> Result { // Run claude inside a dedicated directory under config let work_dir = dirs::config_dir() .context("Could not find config directory")? .join(token::BUNDLE_ID) .join("bot"); fs::create_dir_all(&work_dir)?; // Always overwrite CLAUDE.md with latest rules let rules_path = work_dir.join("CLAUDE.md"); fs::write(&rules_path, BOT_RULES)?; // Manage settings.json: create in user mode, remove otherwise let settings_dir = work_dir.join(".claude"); let settings_path = settings_dir.join("settings.json"); if user_mode { fs::create_dir_all(&settings_dir)?; let settings = serde_json::json!({ "permissions": { "deny": [ "Bash", "Edit(.claude/**)", "Write(.claude/**)", "Edit(.mcp.json)", "Write(.mcp.json)" ], "allow": [ "Read", "Glob", "Grep" ] } }); fs::write(&settings_path, serde_json::to_string_pretty(&settings)?)?; } else { // Remove restrictive settings from previous --user run let _ = fs::remove_file(&settings_path); } eprintln!("bot: claude working directory = {}", work_dir.display()); eprintln!("bot: user_mode = {}", user_mode); let mut cmd = tokio::process::Command::new("claude"); cmd.arg("--input-format") .arg("stream-json") .arg("--output-format") .arg("stream-json") .arg("--verbose"); if user_mode { cmd.arg("--permission-mode").arg("dontAsk"); } else { cmd.arg("--dangerously-skip-permissions"); } let mut child = cmd .current_dir(&work_dir) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() .context("failed to start claude process")?; let stdin = child.stdin.take().context("failed to capture claude stdin")?; let stdout = child.stdout.take().context("failed to capture claude stdout")?; let stderr = child.stderr.take().context("failed to capture claude stderr")?; // Background task: log stderr tokio::spawn(async move { let reader = BufReader::new(stderr); let mut lines = reader.lines(); while let Ok(Some(line)) = lines.next_line().await { eprintln!("bot: claude stderr: {}", line); } }); // Brief wait to check if the process exits immediately tokio::time::sleep(std::time::Duration::from_millis(500)).await; let (tx, rx) = tokio::sync::mpsc::channel::(16); // Background task: read stdout JSON lines and extract responses tokio::spawn(async move { let reader = BufReader::new(stdout); let mut lines = reader.lines(); let mut accumulated = String::new(); loop { let line = match lines.next_line().await { Ok(Some(l)) => l, Ok(None) => { eprintln!("bot: claude stdout closed"); break; } Err(e) => { eprintln!("bot: claude stdout error: {}", e); break; } }; if line.trim().is_empty() { continue; } let json: Value = match serde_json::from_str(&line) { Ok(v) => v, Err(_) => continue, }; let msg_type = json.get("type").and_then(|t| t.as_str()); match msg_type { Some("assistant") => { // Extract text from content array if let Some(content) = json.pointer("/message/content").and_then(|c| c.as_array()) { for item in content { if item.get("type").and_then(|t| t.as_str()) == Some("text") { if let Some(text) = item.get("text").and_then(|t| t.as_str()) { accumulated = text.to_string(); } } } } } Some("result") => { // Use accumulated text if available, otherwise use result field let response = if !accumulated.is_empty() { std::mem::take(&mut accumulated) } else { json.get("result") .and_then(|r| r.as_str()) .unwrap_or("") .to_string() }; if tx.send(response).await.is_err() { break; } accumulated.clear(); } _ => {} } } // Wait for child to exit let status = child.wait().await; eprintln!("bot: claude process exited: {:?}", status); }); Ok(Self { stdin, response_rx: rx, }) } /// Send a message and wait for the response (with 120s timeout) async fn send(&mut self, text: &str) -> Result { let msg = serde_json::json!({ "type": "user", "message": { "role": "user", "content": text } }); let mut line = serde_json::to_string(&msg)?; line.push('\n'); self.stdin .write_all(line.as_bytes()) .await .context("failed to write to claude stdin")?; self.stdin.flush().await?; // Wait for response (no timeout — claude may use tools) let response = self.response_rx.recv() .await .context("claude session closed unexpectedly")?; if response.is_empty() { anyhow::bail!("claude returned empty response"); } // Truncate to 300 graphemes (Bluesky limit) let truncated: String = response.chars().take(300).collect(); Ok(truncated) } } /// Persistent bot state: tracks processed notifications #[derive(Debug, Default, Serialize, Deserialize)] struct BotState { /// Set of processed notification URIs processed: HashSet, /// Timestamp of the last seen notification #[serde(default)] last_seen: Option, /// Per-user daily message counts: DID -> (date "YYYY-MM-DD", count) #[serde(default)] user_counts: HashMap, } /// Parsed notification for processing struct Notification { uri: String, #[allow(dead_code)] cid: String, #[allow(dead_code)] author_did: String, author_handle: String, text: String, #[allow(dead_code)] indexed_at: String, root_uri: String, root_cid: String, parent_uri: String, parent_cid: String, } /// State file path: ~/.config/ai.syui.log/bot_state.json fn state_path() -> Result { let config_dir = dirs::config_dir() .context("Could not find config directory")? .join(token::BUNDLE_ID); fs::create_dir_all(&config_dir)?; Ok(config_dir.join("bot_state.json")) } fn load_state() -> BotState { let path = match state_path() { Ok(p) => p, Err(_) => return BotState::default(), }; match fs::read_to_string(&path) { Ok(content) => serde_json::from_str(&content).unwrap_or_default(), Err(_) => BotState::default(), } } fn save_state(state: &BotState) -> Result<()> { let path = state_path()?; let content = serde_json::to_string_pretty(state)?; fs::write(&path, content)?; Ok(()) } /// Load admin DID from config.json ($cfg first, then fallback) fn load_admin_did() -> Result { let config = token::load_config()?; config["did"] .as_str() .map(|s| s.to_string()) .context("config.json missing 'did' field") } /// Load daily rate limit from config.json (bot.limit, default 3) fn load_user_limit() -> u32 { token::load_config() .ok() .and_then(|c| c["bot"]["limit"].as_u64()) .unwrap_or(100) as u32 } /// Check if a user is within daily rate limit. Returns true if allowed. /// Increments count on success. fn check_rate_limit(state: &mut BotState, did: &str, limit: u32) -> bool { let today = chrono::Utc::now().format("%Y-%m-%d").to_string(); let entry = state.user_counts.entry(did.to_string()).or_insert_with(|| (today.clone(), 0)); // Reset if new day if entry.0 != today { *entry = (today, 0); } if entry.1 >= limit { false } else { entry.1 += 1; true } } /// Fetch notifications using the bot session async fn fetch_notifications( client: &XrpcClient, token: &str, ) -> Result> { let body: Value = client .query_auth( &app_bsky_notification::LIST_NOTIFICATIONS, &[("limit", "50")], token, ) .await?; Ok(body["notifications"] .as_array() .cloned() .unwrap_or_default()) } /// Parse a notification JSON value into a Notification struct fn parse_notification(notif: &Value) -> Option { let uri = notif["uri"].as_str()?; let cid = notif["cid"].as_str()?; let author_did = notif["author"]["did"].as_str()?; let author_handle = notif["author"]["handle"].as_str().unwrap_or("unknown"); let text = notif["record"]["text"].as_str().unwrap_or(""); let indexed_at = notif["indexedAt"].as_str().unwrap_or(""); // Determine root: use reply root if present, otherwise this post is root let root_uri = notif["record"]["reply"]["root"]["uri"] .as_str() .unwrap_or(uri); let root_cid = notif["record"]["reply"]["root"]["cid"] .as_str() .unwrap_or(cid); // Parent is always this notification's post Some(Notification { uri: uri.to_string(), cid: cid.to_string(), author_did: author_did.to_string(), author_handle: author_handle.to_string(), text: text.to_string(), indexed_at: indexed_at.to_string(), root_uri: root_uri.to_string(), root_cid: root_cid.to_string(), parent_uri: uri.to_string(), parent_cid: cid.to_string(), }) } /// Post a reply using the bot session async fn post_reply( client: &XrpcClient, session: &token::Session, text: &str, root_uri: &str, root_cid: &str, parent_uri: &str, parent_cid: &str, ) -> Result { let now = chrono::Utc::now() .format("%Y-%m-%dT%H:%M:%S%.3fZ") .to_string(); let record = serde_json::json!({ "$type": "app.bsky.feed.post", "text": text, "reply": { "root": { "uri": root_uri, "cid": root_cid, }, "parent": { "uri": parent_uri, "cid": parent_cid, }, }, "createdAt": now, }); let rkey = tid::generate_tid(); let req = PutRecordRequest { repo: session.did.clone(), collection: "app.bsky.feed.post".to_string(), rkey, record, }; let result: PutRecordResponse = client .call(&com_atproto_repo::PUT_RECORD, &req, &session.access_jwt) .await?; Ok(result) } /// Main bot entry point pub async fn start(interval_secs: u64, user_mode: bool) -> Result<()> { let admin_did = load_admin_did()?; let user_limit = if user_mode { load_user_limit() } else { 0 }; eprintln!("bot: admin DID = {}", admin_did); eprintln!("bot: user_mode = {}, limit = {}/day", user_mode, user_limit); eprintln!("bot: polling interval = {}s", interval_secs); let mut state = load_state(); // Spawn persistent Claude session let mut session = ClaudeSession::spawn(user_mode).await?; eprintln!("bot: claude session started"); eprintln!("bot: starting notification loop..."); loop { if let Err(e) = poll_once(&admin_did, &mut state, &mut session, user_mode, user_limit).await { eprintln!("bot: poll error: {}", e); } tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; } } /// Single poll iteration async fn poll_once( admin_did: &str, state: &mut BotState, claude: &mut ClaudeSession, user_mode: bool, user_limit: u32, ) -> Result<()> { // Refresh bot session let session = auth::refresh_bot_session().await?; let pds = session.pds.as_deref().unwrap_or("bsky.social"); let client = XrpcClient::new_bot(pds); // Fetch notifications let notifications = fetch_notifications(&client, &session.access_jwt).await?; let mut new_items: Vec = Vec::new(); for notif in ¬ifications { let reason = notif["reason"].as_str().unwrap_or(""); // Only process mentions and replies if reason != "mention" && reason != "reply" { continue; } let uri = match notif["uri"].as_str() { Some(u) => u, None => continue, }; // Skip already processed if state.processed.contains(uri) { continue; } // Skip if before last_seen on first load if let Some(ref last) = state.last_seen { if let Some(indexed) = notif["indexedAt"].as_str() { if indexed <= last.as_str() && !state.processed.is_empty() { continue; } } } let author_did = match notif["author"]["did"].as_str() { Some(d) => d, None => continue, }; if author_did == admin_did { // Admin: always pass } else if user_mode { // User mode: check rate limit if !check_rate_limit(state, author_did, user_limit) { eprintln!("bot: rate limited user {} (limit {}/day)", author_did, user_limit); state.processed.insert(uri.to_string()); continue; } } else { // Admin-only mode: skip non-admin continue; } if let Some(parsed) = parse_notification(notif) { new_items.push(parsed); } } // Process in chronological order (oldest first) new_items.reverse(); for notif in &new_items { eprintln!( "bot: processing notification from @{}: {}", notif.author_handle, if notif.text.chars().count() > 50 { format!("{}...", notif.text.chars().take(50).collect::()) } else { notif.text.clone() } ); // Send to persistent Claude session let response = match claude.send(¬if.text).await { Ok(r) => r, Err(e) => { eprintln!("bot: claude error: {}, respawning session...", e); // Try to respawn session match ClaudeSession::spawn(user_mode).await { Ok(new_session) => { *claude = new_session; eprintln!("bot: claude session respawned"); // Retry once with fresh session match claude.send(¬if.text).await { Ok(r) => r, Err(e2) => { eprintln!("bot: claude retry failed: {}", e2); continue; } } } Err(e2) => { eprintln!("bot: claude respawn failed: {}", e2); continue; } } } }; // Post reply match post_reply( &client, &session, &response, ¬if.root_uri, ¬if.root_cid, ¬if.parent_uri, ¬if.parent_cid, ) .await { Ok(result) => { eprintln!("bot: replied -> {}", result.uri); state.processed.insert(notif.uri.clone()); if let Err(e) = save_state(state) { eprintln!("bot: failed to save state: {}", e); } } Err(e) => { eprintln!("bot: reply failed for {}: {}", notif.uri, e); continue; } } } // Update last_seen to newest notification timestamp if let Some(newest) = notifications.first() { if let Some(ts) = newest["indexedAt"].as_str() { state.last_seen = Some(ts.to_string()); } } // Mark notifications as seen if !new_items.is_empty() { let now = chrono::Utc::now() .format("%Y-%m-%dT%H:%M:%S%.3fZ") .to_string(); let seen_body = serde_json::json!({ "seenAt": now }); let _ = client .call_no_response( &app_bsky_notification::UPDATE_SEEN, &seen_body, &session.access_jwt, ) .await; } // Prune old entries if over 1000 if state.processed.len() > 1000 { let drain_count = state.processed.len() - 500; let to_remove: Vec = state.processed.iter().take(drain_count).cloned().collect(); for key in to_remove { state.processed.remove(&key); } if let Err(e) = save_state(state) { eprintln!("bot: failed to save pruned state: {}", e); } } Ok(()) } // ============================================================ // Chat (DM) bot // ============================================================ /// Chat bot state: tracks cursor for getLog polling #[derive(Debug, Default, Serialize, Deserialize)] struct ChatBotState { /// Cursor for chat.bsky.convo.getLog (hex timestamp) #[serde(default)] cursor: Option, /// Set of processed message IDs processed: HashSet, } fn chat_state_path() -> Result { let config_dir = dirs::config_dir() .context("Could not find config directory")? .join(token::BUNDLE_ID); fs::create_dir_all(&config_dir)?; Ok(config_dir.join("bot_chat_state.json")) } fn load_chat_state() -> ChatBotState { let path = match chat_state_path() { Ok(p) => p, Err(_) => return ChatBotState::default(), }; match fs::read_to_string(&path) { Ok(content) => serde_json::from_str(&content).unwrap_or_default(), Err(_) => ChatBotState::default(), } } fn save_chat_state(state: &ChatBotState) -> Result<()> { let path = chat_state_path()?; let content = serde_json::to_string_pretty(state)?; fs::write(&path, content)?; Ok(()) } /// Load chat proxy from config.json network field fn load_chat_proxy() -> Result { let config = token::load_config()?; if let Some(network) = config["network"].as_str() { Ok(format!("did:web:bsky.{}#bsky_chat", network)) } else { Ok(CHAT_PROXY_HEADER.to_string()) } } // Chat API uses XrpcClient with atproto-proxy header via query_auth_proxy / call_proxy /// Main chat bot entry point pub async fn start_chat(interval_secs: u64) -> Result<()> { let admin_did = load_admin_did()?; let proxy_did = load_chat_proxy()?; eprintln!("chat-bot: admin DID = {}", admin_did); eprintln!("chat-bot: proxy = {}", proxy_did); eprintln!("chat-bot: polling interval = {}s", interval_secs); let mut state = load_chat_state(); // Spawn persistent Claude session let mut claude = ClaudeSession::spawn(false).await?; eprintln!("chat-bot: claude session started"); eprintln!("chat-bot: starting chat poll loop..."); loop { if let Err(e) = chat_poll_once(&admin_did, &proxy_did, &mut state, &mut claude).await { eprintln!("chat-bot: poll error: {}", e); } tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; } } /// Single chat poll iteration async fn chat_poll_once( admin_did: &str, proxy_did: &str, state: &mut ChatBotState, claude: &mut ClaudeSession, ) -> Result<()> { // Refresh bot session (OAuth/DPoP if available, legacy Bearer fallback) let session = auth::refresh_bot_session().await?; let pds = session.pds.as_deref().unwrap_or("syu.is"); let client = XrpcClient::new_bot(pds); // Poll getLog for new events let mut params: Vec<(&str, &str)> = vec![("limit", "50")]; let cursor_str; if let Some(ref c) = state.cursor { cursor_str = c.clone(); params.push(("cursor", &cursor_str)); } let log_resp: Value = client.query_auth_proxy( "chat.bsky.convo.getLog", ¶ms, &session.access_jwt, proxy_did ).await?; // Update cursor if let Some(new_cursor) = log_resp["cursor"].as_str() { state.cursor = Some(new_cursor.to_string()); } let logs = match log_resp["logs"].as_array() { Some(l) => l.clone(), None => return Ok(()), }; // Collect new messages from admin let mut messages: Vec<(String, String, String)> = Vec::new(); // (msg_id, convo_id, text) for log_entry in &logs { let entry_type = log_entry["$type"].as_str().unwrap_or(""); // Only process new messages if entry_type != "chat.bsky.convo.defs#logCreateMessage" { continue; } let convo_id = match log_entry["convoId"].as_str() { Some(c) => c, None => continue, }; let message = &log_entry["message"]; let msg_id = match message["id"].as_str() { Some(id) => id, None => continue, }; // Skip already processed if state.processed.contains(msg_id) { continue; } // Only respond to messages from admin let sender_did = match message["sender"]["did"].as_str() { Some(d) => d, None => continue, }; if sender_did != admin_did { // Mark non-admin messages as processed too (don't reply) state.processed.insert(msg_id.to_string()); continue; } let text = message["text"].as_str().unwrap_or(""); if text.is_empty() { state.processed.insert(msg_id.to_string()); continue; } messages.push((msg_id.to_string(), convo_id.to_string(), text.to_string())); } // Process messages for (msg_id, convo_id, text) in &messages { eprintln!( "chat-bot: new DM: {}", if text.chars().count() > 50 { format!("{}...", text.chars().take(50).collect::()) } else { text.clone() } ); // Send to Claude let response = match claude.send(text).await { Ok(r) => r, Err(e) => { eprintln!("chat-bot: claude error: {}, respawning...", e); match ClaudeSession::spawn(false).await { Ok(new_session) => { *claude = new_session; match claude.send(text).await { Ok(r) => r, Err(e2) => { eprintln!("chat-bot: claude retry failed: {}", e2); continue; } } } Err(e2) => { eprintln!("chat-bot: claude respawn failed: {}", e2); continue; } } } }; // Send reply via chat let send_body = serde_json::json!({ "convoId": convo_id, "message": { "text": response } }); match client.call_proxy::( "chat.bsky.convo.sendMessage", &send_body, &session.access_jwt, proxy_did ).await { Ok(_) => { eprintln!("chat-bot: replied in convo {}", convo_id); state.processed.insert(msg_id.clone()); if let Err(e) = save_chat_state(state) { eprintln!("chat-bot: failed to save state: {}", e); } } Err(e) => { eprintln!("chat-bot: send failed: {}", e); continue; } } } // Save state (cursor update) if let Err(e) = save_chat_state(state) { eprintln!("chat-bot: failed to save state: {}", e); } // Prune old entries if over 1000 if state.processed.len() > 1000 { let drain_count = state.processed.len() - 500; let to_remove: Vec = state.processed.iter().take(drain_count).cloned().collect(); for key in to_remove { state.processed.remove(&key); } save_chat_state(state)?; } Ok(()) }