diff --git a/src/commands/bot.rs b/src/commands/bot.rs index 1879d51..fdac91d 100644 --- a/src/commands/bot.rs +++ b/src/commands/bot.rs @@ -14,6 +14,9 @@ use crate::tid; use crate::types::{PutRecordRequest, PutRecordResponse}; use crate::xrpc::XrpcClient; +/// Default chat proxy DID service +const CHAT_PROXY_HEADER: &str = "did:web:bsky.syu.is#bsky_chat"; + const BOT_RULES: &str = include_str!("../rules/bot.md"); /// Persistent Claude session using stream-json protocol @@ -519,3 +522,238 @@ async fn poll_once( Ok(()) } + +// ============================================================ +// Chat (DM) bot +// ============================================================ + +/// Chat bot state: tracks cursor for getLog polling +#[derive(Debug, Default, Serialize, Deserialize)] +struct ChatBotState { + /// Cursor for chat.bsky.convo.getLog (hex timestamp) + #[serde(default)] + cursor: Option, + /// Set of processed message IDs + processed: HashSet, +} + +fn chat_state_path() -> Result { + let config_dir = dirs::config_dir() + .context("Could not find config directory")? + .join(token::BUNDLE_ID); + fs::create_dir_all(&config_dir)?; + Ok(config_dir.join("bot_chat_state.json")) +} + +fn load_chat_state() -> ChatBotState { + let path = match chat_state_path() { + Ok(p) => p, + Err(_) => return ChatBotState::default(), + }; + match fs::read_to_string(&path) { + Ok(content) => serde_json::from_str(&content).unwrap_or_default(), + Err(_) => ChatBotState::default(), + } +} + +fn save_chat_state(state: &ChatBotState) -> Result<()> { + let path = chat_state_path()?; + let content = serde_json::to_string_pretty(state)?; + fs::write(&path, content)?; + Ok(()) +} + +/// Load chat proxy from config.json network field +fn load_chat_proxy(config_path: &str) -> Result { + let content = fs::read_to_string(config_path) + .with_context(|| format!("Config file not found: {}", config_path))?; + let config: Value = serde_json::from_str(&content)?; + if let Some(network) = config["network"].as_str() { + Ok(format!("did:web:bsky.{}#bsky_chat", network)) + } else { + Ok(CHAT_PROXY_HEADER.to_string()) + } +} + +// Chat API uses XrpcClient with atproto-proxy header via query_auth_proxy / call_proxy + +/// Main chat bot entry point +pub async fn start_chat(interval_secs: u64, config_path: &str) -> Result<()> { + let admin_did = load_admin_did(config_path)?; + let proxy_did = load_chat_proxy(config_path)?; + eprintln!("chat-bot: admin DID = {}", admin_did); + eprintln!("chat-bot: proxy = {}", proxy_did); + eprintln!("chat-bot: polling interval = {}s", interval_secs); + + let mut state = load_chat_state(); + + // Spawn persistent Claude session + let mut claude = ClaudeSession::spawn().await?; + eprintln!("chat-bot: claude session started"); + eprintln!("chat-bot: starting chat poll loop..."); + + loop { + if let Err(e) = chat_poll_once(&admin_did, &proxy_did, &mut state, &mut claude).await { + eprintln!("chat-bot: poll error: {}", e); + } + tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; + } +} + +/// Single chat poll iteration +async fn chat_poll_once( + admin_did: &str, + proxy_did: &str, + state: &mut ChatBotState, + claude: &mut ClaudeSession, +) -> Result<()> { + // Refresh bot session (OAuth/DPoP if available, legacy Bearer fallback) + let session = auth::refresh_bot_session().await?; + let pds = session.pds.as_deref().unwrap_or("syu.is"); + let client = XrpcClient::new_bot(pds); + + // Poll getLog for new events + let mut params: Vec<(&str, &str)> = vec![("limit", "50")]; + let cursor_str; + if let Some(ref c) = state.cursor { + cursor_str = c.clone(); + params.push(("cursor", &cursor_str)); + } + let log_resp: Value = client.query_auth_proxy( + "chat.bsky.convo.getLog", ¶ms, &session.access_jwt, proxy_did + ).await?; + + // Update cursor + if let Some(new_cursor) = log_resp["cursor"].as_str() { + state.cursor = Some(new_cursor.to_string()); + } + + let logs = match log_resp["logs"].as_array() { + Some(l) => l.clone(), + None => return Ok(()), + }; + + // Collect new messages from admin + let mut messages: Vec<(String, String, String)> = Vec::new(); // (msg_id, convo_id, text) + + for log_entry in &logs { + let entry_type = log_entry["$type"].as_str().unwrap_or(""); + + // Only process new messages + if entry_type != "chat.bsky.convo.defs#logCreateMessage" { + continue; + } + + let convo_id = match log_entry["convoId"].as_str() { + Some(c) => c, + None => continue, + }; + + let message = &log_entry["message"]; + let msg_id = match message["id"].as_str() { + Some(id) => id, + None => continue, + }; + + // Skip already processed + if state.processed.contains(msg_id) { + continue; + } + + // Only respond to messages from admin + let sender_did = match message["sender"]["did"].as_str() { + Some(d) => d, + None => continue, + }; + if sender_did != admin_did { + // Mark non-admin messages as processed too (don't reply) + state.processed.insert(msg_id.to_string()); + continue; + } + + let text = message["text"].as_str().unwrap_or(""); + if text.is_empty() { + state.processed.insert(msg_id.to_string()); + continue; + } + + messages.push((msg_id.to_string(), convo_id.to_string(), text.to_string())); + } + + // Process messages + for (msg_id, convo_id, text) in &messages { + eprintln!( + "chat-bot: new DM: {}", + if text.chars().count() > 50 { + format!("{}...", text.chars().take(50).collect::()) + } else { + text.clone() + } + ); + + // Send to Claude + let response = match claude.send(text).await { + Ok(r) => r, + Err(e) => { + eprintln!("chat-bot: claude error: {}, respawning...", e); + match ClaudeSession::spawn().await { + Ok(new_session) => { + *claude = new_session; + match claude.send(text).await { + Ok(r) => r, + Err(e2) => { + eprintln!("chat-bot: claude retry failed: {}", e2); + continue; + } + } + } + Err(e2) => { + eprintln!("chat-bot: claude respawn failed: {}", e2); + continue; + } + } + } + }; + + // Send reply via chat + let send_body = serde_json::json!({ + "convoId": convo_id, + "message": { + "text": response + } + }); + + match client.call_proxy::( + "chat.bsky.convo.sendMessage", &send_body, &session.access_jwt, proxy_did + ).await { + Ok(_) => { + eprintln!("chat-bot: replied in convo {}", convo_id); + state.processed.insert(msg_id.clone()); + if let Err(e) = save_chat_state(state) { + eprintln!("chat-bot: failed to save state: {}", e); + } + } + Err(e) => { + eprintln!("chat-bot: send failed: {}", e); + continue; + } + } + } + + // Save state (cursor update) + if let Err(e) = save_chat_state(state) { + eprintln!("chat-bot: failed to save state: {}", e); + } + + // Prune old entries if over 1000 + if state.processed.len() > 1000 { + let drain_count = state.processed.len() - 500; + let to_remove: Vec = state.processed.iter().take(drain_count).cloned().collect(); + for key in to_remove { + state.processed.remove(&key); + } + save_chat_state(state)?; + } + + Ok(()) +} diff --git a/src/commands/oauth.rs b/src/commands/oauth.rs index fe04ea7..d0d70b8 100644 --- a/src/commands/oauth.rs +++ b/src/commands/oauth.rs @@ -693,7 +693,7 @@ pub async fn oauth_login(handle: &str, is_bot: bool) -> Result<()> { // 5. Client metadata (derived from config.json siteUrl) let site_url = load_site_url()?; let client_id = format!("{}/client-metadata.json", site_url); - let scope = "atproto transition:generic"; + let scope = "atproto transition:generic transition:chat.bsky"; // Try /oauth/cli first, fallback to /oauth/callback let redirect_candidates = [ diff --git a/src/main.rs b/src/main.rs index 4b2d9fa..a2e30d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -291,6 +291,15 @@ enum BotCommands { #[arg(short, long, default_value = "public/config.json")] config: String, }, + /// Start the DM chat bot (poll chat messages and reply) + Chat { + /// Poll interval in seconds + #[arg(short, long, default_value = "30")] + interval: u64, + /// Path to config.json + #[arg(short, long, default_value = "public/config.json")] + config: String, + }, } #[derive(Subcommand)] @@ -412,6 +421,9 @@ async fn main() -> Result<()> { BotCommands::Start { interval, config } => { commands::bot::start(interval, &config).await?; } + BotCommands::Chat { interval, config } => { + commands::bot::start_chat(interval, &config).await?; + } } } Commands::Pds { command } => { diff --git a/src/xrpc.rs b/src/xrpc.rs index 35a7e3a..5651597 100644 --- a/src/xrpc.rs +++ b/src/xrpc.rs @@ -194,6 +194,20 @@ impl XrpcClient { url: &str, full_url: &str, body: Option<&B>, + ) -> Result { + self.dpop_request_with_retry_proxy(oauth, token, method, url, full_url, body, None).await + } + + /// DPoP-authenticated request with optional proxy header + async fn dpop_request_with_retry_proxy( + &self, + oauth: &OAuthSession, + token: &str, + method: &str, + url: &str, + full_url: &str, + body: Option<&B>, + proxy: Option<&str>, ) -> Result { let mut dpop_nonce: Option = None; @@ -205,7 +219,7 @@ impl XrpcClient { dpop_nonce.as_deref(), )?; - let builder = match method { + let mut builder = match method { "GET" => self.inner.get(url), _ => { let b = self.inner.post(url); @@ -217,6 +231,10 @@ impl XrpcClient { } }; + if let Some(proxy_did) = proxy { + builder = builder.header("atproto-proxy", proxy_did); + } + let res = builder .header("Authorization", format!("DPoP {}", token)) .header("DPoP", dpop_proof) @@ -302,6 +320,64 @@ impl XrpcClient { anyhow::bail!("XRPC DPoP call failed after nonce retry"); } + /// Authenticated GET with atproto-proxy header (for chat API) + pub async fn query_auth_proxy( + &self, + nsid: &str, + params: &[(&str, &str)], + token: &str, + proxy: &str, + ) -> Result { + let mut url = format!("{}/xrpc/{}", Self::ensure_scheme(&self.pds_host), nsid); + if !params.is_empty() { + url.push('?'); + let qs: Vec = params.iter().map(|(k, v)| format!("{}={}", k, v)).collect(); + url.push_str(&qs.join("&")); + } + let full_url = url.clone(); + + if let Some(oauth) = self.try_load_oauth() { + self.dpop_request_with_retry_proxy(&oauth, token, "GET", &url, &full_url, None::<&()>, Some(proxy)) + .await + } else { + let res = self.inner + .get(&url) + .header("Authorization", format!("Bearer {}", token)) + .header("atproto-proxy", proxy) + .send() + .await + .context("XRPC proxy query failed")?; + self.handle_response(res).await + } + } + + /// Authenticated POST with atproto-proxy header (for chat API) + pub async fn call_proxy( + &self, + nsid: &str, + body: &B, + token: &str, + proxy: &str, + ) -> Result { + let url = format!("{}/xrpc/{}", Self::ensure_scheme(&self.pds_host), nsid); + let full_url = url.clone(); + + if let Some(oauth) = self.try_load_oauth() { + self.dpop_request_with_retry_proxy(&oauth, token, "POST", &url, &full_url, Some(body), Some(proxy)) + .await + } else { + let res = self.inner + .post(&url) + .header("Authorization", format!("Bearer {}", token)) + .header("atproto-proxy", proxy) + .json(body) + .send() + .await + .context("XRPC proxy call failed")?; + self.handle_response(res).await + } + } + /// Handle an XRPC response: check status, parse ATProto errors, deserialize body. async fn handle_response(&self, res: reqwest::Response) -> Result { if !res.status().is_success() {