fix chat
This commit is contained in:
@@ -14,6 +14,9 @@ use crate::tid;
|
|||||||
use crate::types::{PutRecordRequest, PutRecordResponse};
|
use crate::types::{PutRecordRequest, PutRecordResponse};
|
||||||
use crate::xrpc::XrpcClient;
|
use crate::xrpc::XrpcClient;
|
||||||
|
|
||||||
|
/// Default chat proxy DID service
|
||||||
|
const CHAT_PROXY_HEADER: &str = "did:web:bsky.syu.is#bsky_chat";
|
||||||
|
|
||||||
const BOT_RULES: &str = include_str!("../rules/bot.md");
|
const BOT_RULES: &str = include_str!("../rules/bot.md");
|
||||||
|
|
||||||
/// Persistent Claude session using stream-json protocol
|
/// Persistent Claude session using stream-json protocol
|
||||||
@@ -519,3 +522,238 @@ async fn poll_once(
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ============================================================
|
||||||
|
// Chat (DM) bot
|
||||||
|
// ============================================================
|
||||||
|
|
||||||
|
/// Chat bot state: tracks cursor for getLog polling
|
||||||
|
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||||
|
struct ChatBotState {
|
||||||
|
/// Cursor for chat.bsky.convo.getLog (hex timestamp)
|
||||||
|
#[serde(default)]
|
||||||
|
cursor: Option<String>,
|
||||||
|
/// Set of processed message IDs
|
||||||
|
processed: HashSet<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn chat_state_path() -> Result<PathBuf> {
|
||||||
|
let config_dir = dirs::config_dir()
|
||||||
|
.context("Could not find config directory")?
|
||||||
|
.join(token::BUNDLE_ID);
|
||||||
|
fs::create_dir_all(&config_dir)?;
|
||||||
|
Ok(config_dir.join("bot_chat_state.json"))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_chat_state() -> ChatBotState {
|
||||||
|
let path = match chat_state_path() {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => return ChatBotState::default(),
|
||||||
|
};
|
||||||
|
match fs::read_to_string(&path) {
|
||||||
|
Ok(content) => serde_json::from_str(&content).unwrap_or_default(),
|
||||||
|
Err(_) => ChatBotState::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn save_chat_state(state: &ChatBotState) -> Result<()> {
|
||||||
|
let path = chat_state_path()?;
|
||||||
|
let content = serde_json::to_string_pretty(state)?;
|
||||||
|
fs::write(&path, content)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load chat proxy from config.json network field
|
||||||
|
fn load_chat_proxy(config_path: &str) -> Result<String> {
|
||||||
|
let content = fs::read_to_string(config_path)
|
||||||
|
.with_context(|| format!("Config file not found: {}", config_path))?;
|
||||||
|
let config: Value = serde_json::from_str(&content)?;
|
||||||
|
if let Some(network) = config["network"].as_str() {
|
||||||
|
Ok(format!("did:web:bsky.{}#bsky_chat", network))
|
||||||
|
} else {
|
||||||
|
Ok(CHAT_PROXY_HEADER.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Chat API uses XrpcClient with atproto-proxy header via query_auth_proxy / call_proxy
|
||||||
|
|
||||||
|
/// Main chat bot entry point
|
||||||
|
pub async fn start_chat(interval_secs: u64, config_path: &str) -> Result<()> {
|
||||||
|
let admin_did = load_admin_did(config_path)?;
|
||||||
|
let proxy_did = load_chat_proxy(config_path)?;
|
||||||
|
eprintln!("chat-bot: admin DID = {}", admin_did);
|
||||||
|
eprintln!("chat-bot: proxy = {}", proxy_did);
|
||||||
|
eprintln!("chat-bot: polling interval = {}s", interval_secs);
|
||||||
|
|
||||||
|
let mut state = load_chat_state();
|
||||||
|
|
||||||
|
// Spawn persistent Claude session
|
||||||
|
let mut claude = ClaudeSession::spawn().await?;
|
||||||
|
eprintln!("chat-bot: claude session started");
|
||||||
|
eprintln!("chat-bot: starting chat poll loop...");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if let Err(e) = chat_poll_once(&admin_did, &proxy_did, &mut state, &mut claude).await {
|
||||||
|
eprintln!("chat-bot: poll error: {}", e);
|
||||||
|
}
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Single chat poll iteration
|
||||||
|
async fn chat_poll_once(
|
||||||
|
admin_did: &str,
|
||||||
|
proxy_did: &str,
|
||||||
|
state: &mut ChatBotState,
|
||||||
|
claude: &mut ClaudeSession,
|
||||||
|
) -> Result<()> {
|
||||||
|
// Refresh bot session (OAuth/DPoP if available, legacy Bearer fallback)
|
||||||
|
let session = auth::refresh_bot_session().await?;
|
||||||
|
let pds = session.pds.as_deref().unwrap_or("syu.is");
|
||||||
|
let client = XrpcClient::new_bot(pds);
|
||||||
|
|
||||||
|
// Poll getLog for new events
|
||||||
|
let mut params: Vec<(&str, &str)> = vec![("limit", "50")];
|
||||||
|
let cursor_str;
|
||||||
|
if let Some(ref c) = state.cursor {
|
||||||
|
cursor_str = c.clone();
|
||||||
|
params.push(("cursor", &cursor_str));
|
||||||
|
}
|
||||||
|
let log_resp: Value = client.query_auth_proxy(
|
||||||
|
"chat.bsky.convo.getLog", ¶ms, &session.access_jwt, proxy_did
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
// Update cursor
|
||||||
|
if let Some(new_cursor) = log_resp["cursor"].as_str() {
|
||||||
|
state.cursor = Some(new_cursor.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
let logs = match log_resp["logs"].as_array() {
|
||||||
|
Some(l) => l.clone(),
|
||||||
|
None => return Ok(()),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Collect new messages from admin
|
||||||
|
let mut messages: Vec<(String, String, String)> = Vec::new(); // (msg_id, convo_id, text)
|
||||||
|
|
||||||
|
for log_entry in &logs {
|
||||||
|
let entry_type = log_entry["$type"].as_str().unwrap_or("");
|
||||||
|
|
||||||
|
// Only process new messages
|
||||||
|
if entry_type != "chat.bsky.convo.defs#logCreateMessage" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let convo_id = match log_entry["convoId"].as_str() {
|
||||||
|
Some(c) => c,
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let message = &log_entry["message"];
|
||||||
|
let msg_id = match message["id"].as_str() {
|
||||||
|
Some(id) => id,
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Skip already processed
|
||||||
|
if state.processed.contains(msg_id) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only respond to messages from admin
|
||||||
|
let sender_did = match message["sender"]["did"].as_str() {
|
||||||
|
Some(d) => d,
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
if sender_did != admin_did {
|
||||||
|
// Mark non-admin messages as processed too (don't reply)
|
||||||
|
state.processed.insert(msg_id.to_string());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let text = message["text"].as_str().unwrap_or("");
|
||||||
|
if text.is_empty() {
|
||||||
|
state.processed.insert(msg_id.to_string());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
messages.push((msg_id.to_string(), convo_id.to_string(), text.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process messages
|
||||||
|
for (msg_id, convo_id, text) in &messages {
|
||||||
|
eprintln!(
|
||||||
|
"chat-bot: new DM: {}",
|
||||||
|
if text.chars().count() > 50 {
|
||||||
|
format!("{}...", text.chars().take(50).collect::<String>())
|
||||||
|
} else {
|
||||||
|
text.clone()
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Send to Claude
|
||||||
|
let response = match claude.send(text).await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("chat-bot: claude error: {}, respawning...", e);
|
||||||
|
match ClaudeSession::spawn().await {
|
||||||
|
Ok(new_session) => {
|
||||||
|
*claude = new_session;
|
||||||
|
match claude.send(text).await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e2) => {
|
||||||
|
eprintln!("chat-bot: claude retry failed: {}", e2);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e2) => {
|
||||||
|
eprintln!("chat-bot: claude respawn failed: {}", e2);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Send reply via chat
|
||||||
|
let send_body = serde_json::json!({
|
||||||
|
"convoId": convo_id,
|
||||||
|
"message": {
|
||||||
|
"text": response
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
match client.call_proxy::<Value, Value>(
|
||||||
|
"chat.bsky.convo.sendMessage", &send_body, &session.access_jwt, proxy_did
|
||||||
|
).await {
|
||||||
|
Ok(_) => {
|
||||||
|
eprintln!("chat-bot: replied in convo {}", convo_id);
|
||||||
|
state.processed.insert(msg_id.clone());
|
||||||
|
if let Err(e) = save_chat_state(state) {
|
||||||
|
eprintln!("chat-bot: failed to save state: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("chat-bot: send failed: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save state (cursor update)
|
||||||
|
if let Err(e) = save_chat_state(state) {
|
||||||
|
eprintln!("chat-bot: failed to save state: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prune old entries if over 1000
|
||||||
|
if state.processed.len() > 1000 {
|
||||||
|
let drain_count = state.processed.len() - 500;
|
||||||
|
let to_remove: Vec<String> = state.processed.iter().take(drain_count).cloned().collect();
|
||||||
|
for key in to_remove {
|
||||||
|
state.processed.remove(&key);
|
||||||
|
}
|
||||||
|
save_chat_state(state)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
@@ -693,7 +693,7 @@ pub async fn oauth_login(handle: &str, is_bot: bool) -> Result<()> {
|
|||||||
// 5. Client metadata (derived from config.json siteUrl)
|
// 5. Client metadata (derived from config.json siteUrl)
|
||||||
let site_url = load_site_url()?;
|
let site_url = load_site_url()?;
|
||||||
let client_id = format!("{}/client-metadata.json", site_url);
|
let client_id = format!("{}/client-metadata.json", site_url);
|
||||||
let scope = "atproto transition:generic";
|
let scope = "atproto transition:generic transition:chat.bsky";
|
||||||
|
|
||||||
// Try /oauth/cli first, fallback to /oauth/callback
|
// Try /oauth/cli first, fallback to /oauth/callback
|
||||||
let redirect_candidates = [
|
let redirect_candidates = [
|
||||||
|
|||||||
12
src/main.rs
12
src/main.rs
@@ -291,6 +291,15 @@ enum BotCommands {
|
|||||||
#[arg(short, long, default_value = "public/config.json")]
|
#[arg(short, long, default_value = "public/config.json")]
|
||||||
config: String,
|
config: String,
|
||||||
},
|
},
|
||||||
|
/// Start the DM chat bot (poll chat messages and reply)
|
||||||
|
Chat {
|
||||||
|
/// Poll interval in seconds
|
||||||
|
#[arg(short, long, default_value = "30")]
|
||||||
|
interval: u64,
|
||||||
|
/// Path to config.json
|
||||||
|
#[arg(short, long, default_value = "public/config.json")]
|
||||||
|
config: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Subcommand)]
|
#[derive(Subcommand)]
|
||||||
@@ -412,6 +421,9 @@ async fn main() -> Result<()> {
|
|||||||
BotCommands::Start { interval, config } => {
|
BotCommands::Start { interval, config } => {
|
||||||
commands::bot::start(interval, &config).await?;
|
commands::bot::start(interval, &config).await?;
|
||||||
}
|
}
|
||||||
|
BotCommands::Chat { interval, config } => {
|
||||||
|
commands::bot::start_chat(interval, &config).await?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Commands::Pds { command } => {
|
Commands::Pds { command } => {
|
||||||
|
|||||||
78
src/xrpc.rs
78
src/xrpc.rs
@@ -194,6 +194,20 @@ impl XrpcClient {
|
|||||||
url: &str,
|
url: &str,
|
||||||
full_url: &str,
|
full_url: &str,
|
||||||
body: Option<&B>,
|
body: Option<&B>,
|
||||||
|
) -> Result<T> {
|
||||||
|
self.dpop_request_with_retry_proxy(oauth, token, method, url, full_url, body, None).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// DPoP-authenticated request with optional proxy header
|
||||||
|
async fn dpop_request_with_retry_proxy<B: Serialize, T: DeserializeOwned>(
|
||||||
|
&self,
|
||||||
|
oauth: &OAuthSession,
|
||||||
|
token: &str,
|
||||||
|
method: &str,
|
||||||
|
url: &str,
|
||||||
|
full_url: &str,
|
||||||
|
body: Option<&B>,
|
||||||
|
proxy: Option<&str>,
|
||||||
) -> Result<T> {
|
) -> Result<T> {
|
||||||
let mut dpop_nonce: Option<String> = None;
|
let mut dpop_nonce: Option<String> = None;
|
||||||
|
|
||||||
@@ -205,7 +219,7 @@ impl XrpcClient {
|
|||||||
dpop_nonce.as_deref(),
|
dpop_nonce.as_deref(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let builder = match method {
|
let mut builder = match method {
|
||||||
"GET" => self.inner.get(url),
|
"GET" => self.inner.get(url),
|
||||||
_ => {
|
_ => {
|
||||||
let b = self.inner.post(url);
|
let b = self.inner.post(url);
|
||||||
@@ -217,6 +231,10 @@ impl XrpcClient {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let Some(proxy_did) = proxy {
|
||||||
|
builder = builder.header("atproto-proxy", proxy_did);
|
||||||
|
}
|
||||||
|
|
||||||
let res = builder
|
let res = builder
|
||||||
.header("Authorization", format!("DPoP {}", token))
|
.header("Authorization", format!("DPoP {}", token))
|
||||||
.header("DPoP", dpop_proof)
|
.header("DPoP", dpop_proof)
|
||||||
@@ -302,6 +320,64 @@ impl XrpcClient {
|
|||||||
anyhow::bail!("XRPC DPoP call failed after nonce retry");
|
anyhow::bail!("XRPC DPoP call failed after nonce retry");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Authenticated GET with atproto-proxy header (for chat API)
|
||||||
|
pub async fn query_auth_proxy<T: DeserializeOwned>(
|
||||||
|
&self,
|
||||||
|
nsid: &str,
|
||||||
|
params: &[(&str, &str)],
|
||||||
|
token: &str,
|
||||||
|
proxy: &str,
|
||||||
|
) -> Result<T> {
|
||||||
|
let mut url = format!("{}/xrpc/{}", Self::ensure_scheme(&self.pds_host), nsid);
|
||||||
|
if !params.is_empty() {
|
||||||
|
url.push('?');
|
||||||
|
let qs: Vec<String> = params.iter().map(|(k, v)| format!("{}={}", k, v)).collect();
|
||||||
|
url.push_str(&qs.join("&"));
|
||||||
|
}
|
||||||
|
let full_url = url.clone();
|
||||||
|
|
||||||
|
if let Some(oauth) = self.try_load_oauth() {
|
||||||
|
self.dpop_request_with_retry_proxy(&oauth, token, "GET", &url, &full_url, None::<&()>, Some(proxy))
|
||||||
|
.await
|
||||||
|
} else {
|
||||||
|
let res = self.inner
|
||||||
|
.get(&url)
|
||||||
|
.header("Authorization", format!("Bearer {}", token))
|
||||||
|
.header("atproto-proxy", proxy)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.context("XRPC proxy query failed")?;
|
||||||
|
self.handle_response(res).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Authenticated POST with atproto-proxy header (for chat API)
|
||||||
|
pub async fn call_proxy<B: Serialize, T: DeserializeOwned>(
|
||||||
|
&self,
|
||||||
|
nsid: &str,
|
||||||
|
body: &B,
|
||||||
|
token: &str,
|
||||||
|
proxy: &str,
|
||||||
|
) -> Result<T> {
|
||||||
|
let url = format!("{}/xrpc/{}", Self::ensure_scheme(&self.pds_host), nsid);
|
||||||
|
let full_url = url.clone();
|
||||||
|
|
||||||
|
if let Some(oauth) = self.try_load_oauth() {
|
||||||
|
self.dpop_request_with_retry_proxy(&oauth, token, "POST", &url, &full_url, Some(body), Some(proxy))
|
||||||
|
.await
|
||||||
|
} else {
|
||||||
|
let res = self.inner
|
||||||
|
.post(&url)
|
||||||
|
.header("Authorization", format!("Bearer {}", token))
|
||||||
|
.header("atproto-proxy", proxy)
|
||||||
|
.json(body)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.context("XRPC proxy call failed")?;
|
||||||
|
self.handle_response(res).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle an XRPC response: check status, parse ATProto errors, deserialize body.
|
/// Handle an XRPC response: check status, parse ATProto errors, deserialize body.
|
||||||
async fn handle_response<T: DeserializeOwned>(&self, res: reqwest::Response) -> Result<T> {
|
async fn handle_response<T: DeserializeOwned>(&self, res: reqwest::Response) -> Result<T> {
|
||||||
if !res.status().is_success() {
|
if !res.status().is_success() {
|
||||||
|
|||||||
Reference in New Issue
Block a user