diff --git a/Cargo.toml b/Cargo.toml index 36933e4..4d2d139 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,3 +23,4 @@ chrono = { version = "0.4", features = ["serde"] } rand = "0.8" dotenvy = "0.15" rustyline = "15" +thiserror = "2" diff --git a/src/commands/auth.rs b/src/commands/auth.rs index 76e3e1f..4bb2675 100644 --- a/src/commands/auth.rs +++ b/src/commands/auth.rs @@ -1,28 +1,13 @@ -use anyhow::{Context, Result}; -use serde::{Deserialize, Serialize}; +use anyhow::Result; use super::token::{self, Session}; -use crate::lexicons::{self, com_atproto_server}; - -#[derive(Debug, Serialize)] -struct CreateSessionRequest { - identifier: String, - password: String, -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct CreateSessionResponse { - did: String, - handle: String, - access_jwt: String, - refresh_jwt: String, -} +use crate::lexicons::com_atproto_server; +use crate::types::{CreateSessionRequest, CreateSessionResponse}; +use crate::xrpc::XrpcClient; /// Login to ATProto PDS pub async fn login(handle: &str, password: &str, pds: &str, is_bot: bool) -> Result<()> { - let client = reqwest::Client::new(); - let url = lexicons::url(pds, &com_atproto_server::CREATE_SESSION); + let client = XrpcClient::new(pds); let req = CreateSessionRequest { identifier: handle.to_string(), @@ -32,20 +17,8 @@ pub async fn login(handle: &str, password: &str, pds: &str, is_bot: bool) -> Res let account_type = if is_bot { "bot" } else { "user" }; println!("Logging in to {} as {} ({})...", pds, handle, account_type); - let res = client - .post(&url) - .json(&req) - .send() - .await - .context("Failed to send login request")?; - - if !res.status().is_success() { - let status = res.status(); - let body = res.text().await.unwrap_or_default(); - anyhow::bail!("Login failed: {} - {}", status, body); - } - - let session_res: CreateSessionResponse = res.json().await?; + let session_res: CreateSessionResponse = + client.call_unauth(&com_atproto_server::CREATE_SESSION, &req).await?; let session = Session { did: session_res.did, @@ -65,40 +38,32 @@ pub async fn login(handle: &str, password: &str, pds: &str, is_bot: bool) -> Res Ok(()) } -/// Refresh access token -pub async fn refresh_session() -> Result { - let session = token::load_session()?; - let pds = session.pds.as_deref().unwrap_or("bsky.social"); +/// Refresh a session (shared logic for user and bot) +async fn do_refresh(session: &Session, pds: &str) -> Result { + let client = XrpcClient::new(pds); - let client = reqwest::Client::new(); - let url = lexicons::url(pds, &com_atproto_server::REFRESH_SESSION); + let new_session: CreateSessionResponse = client + .call_bearer(&com_atproto_server::REFRESH_SESSION, &session.refresh_jwt) + .await?; - let res = client - .post(&url) - .header("Authorization", format!("Bearer {}", session.refresh_jwt)) - .send() - .await - .context("Failed to refresh session")?; - - if !res.status().is_success() { - let status = res.status(); - let body = res.text().await.unwrap_or_default(); - anyhow::bail!("Refresh failed: {} - {}. Try logging in again.", status, body); - } - - let new_session: CreateSessionResponse = res.json().await?; - - let session = Session { + Ok(Session { did: new_session.did, handle: new_session.handle, access_jwt: new_session.access_jwt, refresh_jwt: new_session.refresh_jwt, pds: Some(pds.to_string()), - }; + }) +} - token::save_session(&session)?; +/// Refresh access token +pub async fn refresh_session() -> Result { + let session = token::load_session()?; + let pds = session.pds.as_deref().unwrap_or("bsky.social"); - Ok(session) + let new_session = do_refresh(&session, pds).await?; + token::save_session(&new_session)?; + + Ok(new_session) } /// Refresh bot access token @@ -106,33 +71,8 @@ pub async fn refresh_bot_session() -> Result { let session = token::load_bot_session()?; let pds = session.pds.as_deref().unwrap_or("bsky.social"); - let client = reqwest::Client::new(); - let url = lexicons::url(pds, &com_atproto_server::REFRESH_SESSION); + let new_session = do_refresh(&session, pds).await?; + token::save_bot_session(&new_session)?; - let res = client - .post(&url) - .header("Authorization", format!("Bearer {}", session.refresh_jwt)) - .send() - .await - .context("Failed to refresh bot session")?; - - if !res.status().is_success() { - let status = res.status(); - let body = res.text().await.unwrap_or_default(); - anyhow::bail!("Bot refresh failed: {} - {}. Try 'ailog login --bot' again.", status, body); - } - - let new_session: CreateSessionResponse = res.json().await?; - - let session = Session { - did: new_session.did, - handle: new_session.handle, - access_jwt: new_session.access_jwt, - refresh_jwt: new_session.refresh_jwt, - pds: Some(pds.to_string()), - }; - - token::save_bot_session(&session)?; - - Ok(session) + Ok(new_session) } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 6da7743..10d6ca1 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,6 +1,9 @@ pub mod auth; pub mod token; -pub mod post; +pub mod record; +pub mod sync; +pub mod push; +pub mod notify; pub mod gen; pub mod lang; pub mod did; diff --git a/src/commands/notify.rs b/src/commands/notify.rs new file mode 100644 index 0000000..21a0026 --- /dev/null +++ b/src/commands/notify.rs @@ -0,0 +1,67 @@ +use anyhow::Result; +use serde_json::Value; + +use super::auth; +use crate::lexicons::app_bsky_notification; +use crate::xrpc::XrpcClient; + +/// List notifications (JSON output) +pub async fn list(limit: u32) -> Result<()> { + let session = auth::refresh_session().await?; + let pds = session.pds.as_deref().unwrap_or("bsky.social"); + let client = XrpcClient::new(pds); + let limit_str = limit.to_string(); + + let body: Value = client + .query_auth( + &app_bsky_notification::LIST_NOTIFICATIONS, + &[("limit", &limit_str)], + &session.access_jwt, + ) + .await?; + + println!("{}", serde_json::to_string_pretty(&body)?); + Ok(()) +} + +/// Get unread notification count (JSON output) +pub async fn count() -> Result<()> { + let session = auth::refresh_session().await?; + let pds = session.pds.as_deref().unwrap_or("bsky.social"); + let client = XrpcClient::new(pds); + + let body: Value = client + .query_auth( + &app_bsky_notification::GET_UNREAD_COUNT, + &[], + &session.access_jwt, + ) + .await?; + + println!("{}", serde_json::to_string_pretty(&body)?); + Ok(()) +} + +/// Mark notifications as seen (up to now) +pub async fn update_seen() -> Result<()> { + let session = auth::refresh_session().await?; + let pds = session.pds.as_deref().unwrap_or("bsky.social"); + let client = XrpcClient::new(pds); + let now = chrono::Utc::now() + .format("%Y-%m-%dT%H:%M:%S%.3fZ") + .to_string(); + + let body = serde_json::json!({ "seenAt": now }); + + client + .call_no_response( + &app_bsky_notification::UPDATE_SEEN, + &body, + &session.access_jwt, + ) + .await?; + + let result = serde_json::json!({ "success": true, "seenAt": now }); + println!("{}", serde_json::to_string_pretty(&result)?); + Ok(()) +} diff --git a/src/commands/post.rs b/src/commands/post.rs deleted file mode 100644 index c2790f9..0000000 --- a/src/commands/post.rs +++ /dev/null @@ -1,466 +0,0 @@ -use anyhow::{Context, Result}; -use rand::Rng; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::fs; - -use super::{auth, token}; -use crate::lexicons::{self, com_atproto_repo, com_atproto_identity}; - -#[derive(Debug, Serialize)] -struct PutRecordRequest { - repo: String, - collection: String, - rkey: String, - record: Value, -} - -#[derive(Debug, Serialize)] -struct DeleteRecordRequest { - repo: String, - collection: String, - rkey: String, -} - -#[derive(Debug, Deserialize)] -struct PutRecordResponse { - uri: String, - cid: String, -} - -#[derive(Debug, Deserialize)] -struct ListRecordsResponse { - records: Vec, - #[serde(default)] - #[allow(dead_code)] - cursor: Option, -} - -#[derive(Debug, Deserialize)] -struct Record { - uri: String, - cid: String, - value: Value, -} - -/// Generate TID (timestamp-based ID) -fn generate_tid() -> String { - const CHARSET: &[u8] = b"234567abcdefghijklmnopqrstuvwxyz"; - let mut rng = rand::thread_rng(); - (0..13) - .map(|_| { - let idx = rng.gen_range(0..CHARSET.len()); - CHARSET[idx] as char - }) - .collect() -} - -/// Put a record to ATProto -pub async fn put_record(file: &str, collection: &str, rkey: Option<&str>) -> Result<()> { - let session = auth::refresh_session().await?; - let pds = session.pds.as_deref().unwrap_or("bsky.social"); - - let content = fs::read_to_string(file) - .with_context(|| format!("Failed to read file: {}", file))?; - let record: Value = serde_json::from_str(&content)?; - - let rkey = rkey.map(|s| s.to_string()).unwrap_or_else(generate_tid); - - let client = reqwest::Client::new(); - let url = lexicons::url(pds, &com_atproto_repo::PUT_RECORD); - - let req = PutRecordRequest { - repo: session.did.clone(), - collection: collection.to_string(), - rkey: rkey.clone(), - record, - }; - - println!("Posting to {} with rkey: {}", collection, rkey); - println!("{}", serde_json::to_string_pretty(&req)?); - - let res = client - .post(&url) - .header("Authorization", format!("Bearer {}", session.access_jwt)) - .json(&req) - .send() - .await?; - - if !res.status().is_success() { - let status = res.status(); - let body = res.text().await.unwrap_or_default(); - anyhow::bail!("Put record failed: {} - {}", status, body); - } - - let result: PutRecordResponse = res.json().await?; - println!("Success!"); - println!(" URI: {}", result.uri); - println!(" CID: {}", result.cid); - - Ok(()) -} - -/// Put a lexicon schema -pub async fn put_lexicon(file: &str) -> Result<()> { - let session = auth::refresh_session().await?; - let pds = session.pds.as_deref().unwrap_or("bsky.social"); - - let content = fs::read_to_string(file) - .with_context(|| format!("Failed to read file: {}", file))?; - let lexicon: Value = serde_json::from_str(&content)?; - - let lexicon_id = lexicon["id"] - .as_str() - .context("Lexicon file must have 'id' field")? - .to_string(); - - let client = reqwest::Client::new(); - let url = lexicons::url(pds, &com_atproto_repo::PUT_RECORD); - - let req = PutRecordRequest { - repo: session.did.clone(), - collection: "com.atproto.lexicon.schema".to_string(), - rkey: lexicon_id.clone(), - record: lexicon, - }; - - println!("Putting lexicon: {}", lexicon_id); - println!("{}", serde_json::to_string_pretty(&req)?); - - let res = client - .post(&url) - .header("Authorization", format!("Bearer {}", session.access_jwt)) - .json(&req) - .send() - .await?; - - if !res.status().is_success() { - let status = res.status(); - let body = res.text().await.unwrap_or_default(); - anyhow::bail!("Put lexicon failed: {} - {}", status, body); - } - - let result: PutRecordResponse = res.json().await?; - println!("Success!"); - println!(" URI: {}", result.uri); - println!(" CID: {}", result.cid); - - Ok(()) -} - -/// Get records from a collection -pub async fn get_records(collection: &str, limit: u32) -> Result<()> { - let session = auth::refresh_session().await?; - let pds = session.pds.as_deref().unwrap_or("bsky.social"); - - let client = reqwest::Client::new(); - let base_url = lexicons::url(pds, &com_atproto_repo::LIST_RECORDS); - let url = format!( - "{}?repo={}&collection={}&limit={}", - base_url, session.did, collection, limit - ); - - let res = client - .get(&url) - .header("Authorization", format!("Bearer {}", session.access_jwt)) - .send() - .await?; - - if !res.status().is_success() { - let status = res.status(); - let body = res.text().await.unwrap_or_default(); - anyhow::bail!("Get records failed: {} - {}", status, body); - } - - let result: ListRecordsResponse = res.json().await?; - - println!("Found {} records in {}", result.records.len(), collection); - for record in &result.records { - println!("---"); - println!("URI: {}", record.uri); - println!("CID: {}", record.cid); - println!("{}", serde_json::to_string_pretty(&record.value)?); - } - - Ok(()) -} - -/// Delete a record -pub async fn delete_record(collection: &str, rkey: &str) -> Result<()> { - let session = auth::refresh_session().await?; - let pds = session.pds.as_deref().unwrap_or("bsky.social"); - - let client = reqwest::Client::new(); - let url = lexicons::url(pds, &com_atproto_repo::DELETE_RECORD); - - let req = DeleteRecordRequest { - repo: session.did.clone(), - collection: collection.to_string(), - rkey: rkey.to_string(), - }; - - println!("Deleting {} from {}", rkey, collection); - - let res = client - .post(&url) - .header("Authorization", format!("Bearer {}", session.access_jwt)) - .json(&req) - .send() - .await?; - - if !res.status().is_success() { - let status = res.status(); - let body = res.text().await.unwrap_or_default(); - anyhow::bail!("Delete failed: {} - {}", status, body); - } - - println!("Deleted successfully"); - - Ok(()) -} - -#[derive(Debug, Deserialize)] -struct Config { - handle: String, - #[serde(default)] - collection: Option, -} - -#[derive(Debug, Deserialize)] -struct DescribeRepoResponse { - did: String, - handle: String, - collections: Vec, -} - -/// Sync PDS data to local content directory -pub async fn sync_to_local(output: &str, is_bot: bool, collection_override: Option<&str>) -> Result<()> { - let client = reqwest::Client::new(); - - let (did, pds, _handle, collection) = if is_bot { - // Bot mode: use bot.json - let session = token::load_bot_session()?; - let pds = session.pds.as_deref().unwrap_or("bsky.social"); - let collection = collection_override.unwrap_or("ai.syui.log.chat"); - println!("Syncing bot data for {} ({})", session.handle, session.did); - (session.did.clone(), format!("https://{}", pds), session.handle.clone(), collection.to_string()) - } else { - // User mode: use config.json - let config_content = fs::read_to_string("public/config.json") - .context("config.json not found")?; - let config: Config = serde_json::from_str(&config_content)?; - - println!("Syncing data for {}", config.handle); - - // Resolve handle to DID - let resolve_url = format!( - "{}?handle={}", - lexicons::url("public.api.bsky.app", &com_atproto_identity::RESOLVE_HANDLE), - config.handle - ); - let res = client.get(&resolve_url).send().await?; - let resolve: serde_json::Value = res.json().await?; - let did = resolve["did"].as_str().context("Could not resolve handle")?.to_string(); - - // Get PDS from DID document - let plc_url = format!("https://plc.directory/{}", did); - let res = client.get(&plc_url).send().await?; - let did_doc: serde_json::Value = res.json().await?; - let pds = did_doc["service"] - .as_array() - .and_then(|services| { - services.iter().find(|s| s["type"] == "AtprotoPersonalDataServer") - }) - .and_then(|s| s["serviceEndpoint"].as_str()) - .context("Could not find PDS")? - .to_string(); - - let collection = collection_override - .map(|s| s.to_string()) - .unwrap_or_else(|| config.collection.as_deref().unwrap_or("ai.syui.log.post").to_string()); - - (did, pds, config.handle.clone(), collection) - }; - - println!("DID: {}", did); - println!("PDS: {}", pds); - - // Remove https:// prefix for lexicons::url - let pds_host = pds.trim_start_matches("https://"); - - // Create output directory - let did_dir = format!("{}/{}", output, did); - fs::create_dir_all(&did_dir)?; - - // 1. Sync describeRepo - let describe_url = format!( - "{}?repo={}", - lexicons::url(pds_host, &com_atproto_repo::DESCRIBE_REPO), - did - ); - let res = client.get(&describe_url).send().await?; - let describe: DescribeRepoResponse = res.json().await?; - - let describe_path = format!("{}/describe.json", did_dir); - let describe_json = serde_json::to_string_pretty(&serde_json::json!({ - "did": describe.did, - "handle": describe.handle, - "collections": describe.collections, - }))?; - fs::write(&describe_path, &describe_json)?; - println!("Saved: {}", describe_path); - - // 2. Sync profile - let profile_url = format!( - "{}?repo={}&collection=app.bsky.actor.profile&rkey=self", - lexicons::url(pds_host, &com_atproto_repo::GET_RECORD), - did - ); - let res = client.get(&profile_url).send().await?; - if res.status().is_success() { - let profile: serde_json::Value = res.json().await?; - let profile_dir = format!("{}/app.bsky.actor.profile", did_dir); - fs::create_dir_all(&profile_dir)?; - let profile_path = format!("{}/self.json", profile_dir); - fs::write(&profile_path, serde_json::to_string_pretty(&profile)?)?; - println!("Saved: {}", profile_path); - - // Download avatar blob if present - if let Some(avatar_cid) = profile["value"]["avatar"]["ref"]["$link"].as_str() { - let blob_dir = format!("{}/blob", did_dir); - fs::create_dir_all(&blob_dir)?; - let blob_path = format!("{}/{}", blob_dir, avatar_cid); - - let blob_url = format!( - "{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}", - pds, did, avatar_cid - ); - println!("Downloading avatar: {}", avatar_cid); - let blob_res = client.get(&blob_url).send().await?; - if blob_res.status().is_success() { - let blob_bytes = blob_res.bytes().await?; - fs::write(&blob_path, &blob_bytes)?; - println!("Saved: {}", blob_path); - } else { - println!("Failed to download avatar: {}", blob_res.status()); - } - } - } - - // 3. Sync collection records - let records_url = format!( - "{}?repo={}&collection={}&limit=100", - lexicons::url(pds_host, &com_atproto_repo::LIST_RECORDS), - did, collection - ); - let res = client.get(&records_url).send().await?; - if res.status().is_success() { - let list: ListRecordsResponse = res.json().await?; - let collection_dir = format!("{}/{}", did_dir, collection); - fs::create_dir_all(&collection_dir)?; - - let mut rkeys: Vec = Vec::new(); - for record in &list.records { - let rkey = record.uri.split('/').last().unwrap_or("unknown"); - rkeys.push(rkey.to_string()); - let record_path = format!("{}/{}.json", collection_dir, rkey); - let record_json = serde_json::json!({ - "uri": record.uri, - "cid": record.cid, - "value": record.value, - }); - fs::write(&record_path, serde_json::to_string_pretty(&record_json)?)?; - println!("Saved: {}", record_path); - } - - // Create index.json with list of rkeys - let index_path = format!("{}/index.json", collection_dir); - fs::write(&index_path, serde_json::to_string_pretty(&rkeys)?)?; - println!("Saved: {}", index_path); - - println!("Synced {} records from {}", list.records.len(), collection); - } - - println!("Sync complete!"); - - Ok(()) -} - -/// Push local content to PDS -pub async fn push_to_remote(input: &str, collection: &str, is_bot: bool) -> Result<()> { - let session = if is_bot { - auth::refresh_bot_session().await? - } else { - auth::refresh_session().await? - }; - let pds = session.pds.as_deref().unwrap_or("bsky.social"); - let did = &session.did; - - // Build collection directory path - let collection_dir = format!("{}/{}/{}", input, did, collection); - - if !std::path::Path::new(&collection_dir).exists() { - anyhow::bail!("Collection directory not found: {}", collection_dir); - } - - println!("Pushing records from {} to {}", collection_dir, collection); - - let client = reqwest::Client::new(); - let url = lexicons::url(pds, &com_atproto_repo::PUT_RECORD); - - let mut count = 0; - for entry in fs::read_dir(&collection_dir)? { - let entry = entry?; - let path = entry.path(); - - // Skip non-JSON files and index.json - if path.extension().map(|e| e != "json").unwrap_or(true) { - continue; - } - let filename = path.file_stem().and_then(|s| s.to_str()).unwrap_or(""); - if filename == "index" { - continue; - } - - let rkey = filename.to_string(); - let content = fs::read_to_string(&path)?; - let record_data: Value = serde_json::from_str(&content)?; - - // Extract value from record (sync saves as {uri, cid, value}) - let record = if record_data.get("value").is_some() { - record_data["value"].clone() - } else { - record_data - }; - - let req = PutRecordRequest { - repo: did.clone(), - collection: collection.to_string(), - rkey: rkey.clone(), - record, - }; - - println!("Pushing: {}", rkey); - - let res = client - .post(&url) - .header("Authorization", format!("Bearer {}", session.access_jwt)) - .json(&req) - .send() - .await?; - - if !res.status().is_success() { - let status = res.status(); - let body = res.text().await.unwrap_or_default(); - println!(" Failed: {} - {}", status, body); - } else { - let result: PutRecordResponse = res.json().await?; - println!(" OK: {}", result.uri); - count += 1; - } - } - - println!("Pushed {} records to {}", count, collection); - - Ok(()) -} diff --git a/src/commands/push.rs b/src/commands/push.rs new file mode 100644 index 0000000..e3f1e41 --- /dev/null +++ b/src/commands/push.rs @@ -0,0 +1,85 @@ +use anyhow::Result; +use serde_json::Value; +use std::fs; + +use super::auth; +use crate::lexicons::com_atproto_repo; +use crate::types::{PutRecordRequest, PutRecordResponse}; +use crate::xrpc::XrpcClient; + +/// Push local content to PDS +pub async fn push_to_remote(input: &str, collection: &str, is_bot: bool) -> Result<()> { + let session = if is_bot { + auth::refresh_bot_session().await? + } else { + auth::refresh_session().await? + }; + let pds = session.pds.as_deref().unwrap_or("bsky.social"); + let did = &session.did; + let client = XrpcClient::new(pds); + + // Build collection directory path + let collection_dir = format!("{}/{}/{}", input, did, collection); + + if !std::path::Path::new(&collection_dir).exists() { + anyhow::bail!("Collection directory not found: {}", collection_dir); + } + + println!("Pushing records from {} to {}", collection_dir, collection); + + let mut count = 0; + for entry in fs::read_dir(&collection_dir)? { + let entry = entry?; + let path = entry.path(); + + // Skip non-JSON files and index.json + if path.extension().map(|e| e != "json").unwrap_or(true) { + continue; + } + let filename = path.file_stem().and_then(|s| s.to_str()).unwrap_or(""); + if filename == "index" { + continue; + } + + let rkey = filename.to_string(); + let content = fs::read_to_string(&path)?; + let record_data: Value = serde_json::from_str(&content)?; + + // Extract value from record (sync saves as {uri, cid, value}) + let record = if record_data.get("value").is_some() { + record_data["value"].clone() + } else { + record_data + }; + + let req = PutRecordRequest { + repo: did.clone(), + collection: collection.to_string(), + rkey: rkey.clone(), + record, + }; + + println!("Pushing: {}", rkey); + + match client + .call::<_, PutRecordResponse>( + &com_atproto_repo::PUT_RECORD, + &req, + &session.access_jwt, + ) + .await + { + Ok(result) => { + println!(" OK: {}", result.uri); + count += 1; + } + Err(e) => { + println!(" Failed: {}", e); + } + } + } + + println!("Pushed {} records to {}", count, collection); + + Ok(()) +} diff --git a/src/commands/record.rs b/src/commands/record.rs new file mode 100644 index 0000000..49ac3cb --- /dev/null +++ b/src/commands/record.rs @@ -0,0 +1,135 @@ +use anyhow::{Context, Result}; +use serde_json::Value; +use std::fs; + +use super::auth; +use crate::lexicons::com_atproto_repo; +use crate::tid; +use crate::types::{ + DeleteRecordRequest, ListRecordsResponse, PutRecordRequest, PutRecordResponse, +}; +use crate::xrpc::XrpcClient; + +/// Put a record to ATProto +pub async fn put_record(file: &str, collection: &str, rkey: Option<&str>) -> Result<()> { + let session = auth::refresh_session().await?; + let pds = session.pds.as_deref().unwrap_or("bsky.social"); + let client = XrpcClient::new(pds); + + let content = fs::read_to_string(file) + .with_context(|| format!("Failed to read file: {}", file))?; + let record: Value = serde_json::from_str(&content)?; + + let rkey = rkey + .map(|s| s.to_string()) + .unwrap_or_else(tid::generate_tid); + + let req = PutRecordRequest { + repo: session.did.clone(), + collection: collection.to_string(), + rkey: rkey.clone(), + record, + }; + + println!("Posting to {} with rkey: {}", collection, rkey); + println!("{}", serde_json::to_string_pretty(&req)?); + + let result: PutRecordResponse = client + .call(&com_atproto_repo::PUT_RECORD, &req, &session.access_jwt) + .await?; + + println!("Success!"); + println!(" URI: {}", result.uri); + println!(" CID: {}", result.cid); + + Ok(()) +} + +/// Put a lexicon schema +pub async fn put_lexicon(file: &str) -> Result<()> { + let session = auth::refresh_session().await?; + let pds = session.pds.as_deref().unwrap_or("bsky.social"); + let client = XrpcClient::new(pds); + + let content = fs::read_to_string(file) + .with_context(|| format!("Failed to read file: {}", file))?; + let lexicon: Value = serde_json::from_str(&content)?; + + let lexicon_id = lexicon["id"] + .as_str() + .context("Lexicon file must have 'id' field")? + .to_string(); + + let req = PutRecordRequest { + repo: session.did.clone(), + collection: "com.atproto.lexicon.schema".to_string(), + rkey: lexicon_id.clone(), + record: lexicon, + }; + + println!("Putting lexicon: {}", lexicon_id); + println!("{}", serde_json::to_string_pretty(&req)?); + + let result: PutRecordResponse = client + .call(&com_atproto_repo::PUT_RECORD, &req, &session.access_jwt) + .await?; + + println!("Success!"); + println!(" URI: {}", result.uri); + println!(" CID: {}", result.cid); + + Ok(()) +} + +/// Get records from a collection +pub async fn get_records(collection: &str, limit: u32) -> Result<()> { + let session = auth::refresh_session().await?; + let pds = session.pds.as_deref().unwrap_or("bsky.social"); + let client = XrpcClient::new(pds); + let limit_str = limit.to_string(); + + let result: ListRecordsResponse = client + .query_auth( + &com_atproto_repo::LIST_RECORDS, + &[ + ("repo", &session.did), + ("collection", collection), + ("limit", &limit_str), + ], + &session.access_jwt, + ) + .await?; + + println!("Found {} records in {}", result.records.len(), collection); + for record in &result.records { + println!("---"); + println!("URI: {}", record.uri); + println!("CID: {}", record.cid); + println!("{}", serde_json::to_string_pretty(&record.value)?); + } + + Ok(()) +} + +/// Delete a record +pub async fn delete_record(collection: &str, rkey: &str) -> Result<()> { + let session = auth::refresh_session().await?; + let pds = session.pds.as_deref().unwrap_or("bsky.social"); + let client = XrpcClient::new(pds); + + let req = DeleteRecordRequest { + repo: session.did.clone(), + collection: collection.to_string(), + rkey: rkey.to_string(), + }; + + println!("Deleting {} from {}", rkey, collection); + + client + .call_no_response(&com_atproto_repo::DELETE_RECORD, &req, &session.access_jwt) + .await?; + + println!("Deleted successfully"); + + Ok(()) +} diff --git a/src/commands/sync.rs b/src/commands/sync.rs new file mode 100644 index 0000000..1c72190 --- /dev/null +++ b/src/commands/sync.rs @@ -0,0 +1,190 @@ +use anyhow::{Context, Result}; +use std::fs; + +use super::token; +use crate::lexicons::{self, com_atproto_identity, com_atproto_repo}; +use crate::types::{Config, DescribeRepoResponse, ListRecordsResponse}; + +/// Sync PDS data to local content directory +pub async fn sync_to_local( + output: &str, + is_bot: bool, + collection_override: Option<&str>, +) -> Result<()> { + let client = reqwest::Client::new(); + + let (did, pds, _handle, collection) = if is_bot { + // Bot mode: use bot.json + let session = token::load_bot_session()?; + let pds = session.pds.as_deref().unwrap_or("bsky.social"); + let collection = collection_override.unwrap_or("ai.syui.log.chat"); + println!( + "Syncing bot data for {} ({})", + session.handle, session.did + ); + ( + session.did.clone(), + format!("https://{}", pds), + session.handle.clone(), + collection.to_string(), + ) + } else { + // User mode: use config.json + let config_content = + fs::read_to_string("public/config.json").context("config.json not found")?; + let config: Config = serde_json::from_str(&config_content)?; + + println!("Syncing data for {}", config.handle); + + // Resolve handle to DID + let resolve_url = format!( + "{}?handle={}", + lexicons::url( + "public.api.bsky.app", + &com_atproto_identity::RESOLVE_HANDLE + ), + config.handle + ); + let res = client.get(&resolve_url).send().await?; + let resolve: serde_json::Value = res.json().await?; + let did = resolve["did"] + .as_str() + .context("Could not resolve handle")? + .to_string(); + + // Get PDS from DID document + let plc_url = format!("https://plc.directory/{}", did); + let res = client.get(&plc_url).send().await?; + let did_doc: serde_json::Value = res.json().await?; + let pds = did_doc["service"] + .as_array() + .and_then(|services| { + services + .iter() + .find(|s| s["type"] == "AtprotoPersonalDataServer") + }) + .and_then(|s| s["serviceEndpoint"].as_str()) + .context("Could not find PDS")? + .to_string(); + + let collection = collection_override + .map(|s| s.to_string()) + .unwrap_or_else(|| { + config + .collection + .as_deref() + .unwrap_or("ai.syui.log.post") + .to_string() + }); + + (did, pds, config.handle.clone(), collection) + }; + + println!("DID: {}", did); + println!("PDS: {}", pds); + + // Remove https:// prefix for lexicons::url + let pds_host = pds.trim_start_matches("https://"); + + // Create output directory + let did_dir = format!("{}/{}", output, did); + fs::create_dir_all(&did_dir)?; + + // 1. Sync describeRepo + let describe_url = format!( + "{}?repo={}", + lexicons::url(pds_host, &com_atproto_repo::DESCRIBE_REPO), + did + ); + let res = client.get(&describe_url).send().await?; + let describe: DescribeRepoResponse = res.json().await?; + + let describe_path = format!("{}/describe.json", did_dir); + let describe_json = serde_json::to_string_pretty(&serde_json::json!({ + "did": describe.did, + "handle": describe.handle, + "collections": describe.collections, + }))?; + fs::write(&describe_path, &describe_json)?; + println!("Saved: {}", describe_path); + + // 2. Sync profile + let profile_url = format!( + "{}?repo={}&collection=app.bsky.actor.profile&rkey=self", + lexicons::url(pds_host, &com_atproto_repo::GET_RECORD), + did + ); + let res = client.get(&profile_url).send().await?; + if res.status().is_success() { + let profile: serde_json::Value = res.json().await?; + let profile_dir = format!("{}/app.bsky.actor.profile", did_dir); + fs::create_dir_all(&profile_dir)?; + let profile_path = format!("{}/self.json", profile_dir); + fs::write(&profile_path, serde_json::to_string_pretty(&profile)?)?; + println!("Saved: {}", profile_path); + + // Download avatar blob if present + if let Some(avatar_cid) = profile["value"]["avatar"]["ref"]["$link"].as_str() { + let blob_dir = format!("{}/blob", did_dir); + fs::create_dir_all(&blob_dir)?; + let blob_path = format!("{}/{}", blob_dir, avatar_cid); + + let blob_url = format!( + "{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}", + pds, did, avatar_cid + ); + println!("Downloading avatar: {}", avatar_cid); + let blob_res = client.get(&blob_url).send().await?; + if blob_res.status().is_success() { + let blob_bytes = blob_res.bytes().await?; + fs::write(&blob_path, &blob_bytes)?; + println!("Saved: {}", blob_path); + } else { + println!("Failed to download avatar: {}", blob_res.status()); + } + } + } + + // 3. Sync collection records + let records_url = format!( + "{}?repo={}&collection={}&limit=100", + lexicons::url(pds_host, &com_atproto_repo::LIST_RECORDS), + did, + collection + ); + let res = client.get(&records_url).send().await?; + if res.status().is_success() { + let list: ListRecordsResponse = res.json().await?; + let collection_dir = format!("{}/{}", did_dir, collection); + fs::create_dir_all(&collection_dir)?; + + let mut rkeys: Vec = Vec::new(); + for record in &list.records { + let rkey = record.uri.split('/').next_back().unwrap_or("unknown"); + rkeys.push(rkey.to_string()); + let record_path = format!("{}/{}.json", collection_dir, rkey); + let record_json = serde_json::json!({ + "uri": record.uri, + "cid": record.cid, + "value": record.value, + }); + fs::write(&record_path, serde_json::to_string_pretty(&record_json)?)?; + println!("Saved: {}", record_path); + } + + // Create index.json with list of rkeys + let index_path = format!("{}/index.json", collection_dir); + fs::write(&index_path, serde_json::to_string_pretty(&rkeys)?)?; + println!("Saved: {}", index_path); + + println!( + "Synced {} records from {}", + list.records.len(), + collection + ); + } + + println!("Sync complete!"); + + Ok(()) +} diff --git a/src/commands/token.rs b/src/commands/token.rs index db2b7d8..8d2efaa 100644 --- a/src/commands/token.rs +++ b/src/commands/token.rs @@ -51,7 +51,6 @@ pub fn save_session(session: &Session) -> Result<()> { let path = token_path()?; let content = serde_json::to_string_pretty(session)?; fs::write(&path, content)?; - println!("Token saved to {:?}", path); Ok(()) } @@ -69,6 +68,5 @@ pub fn save_bot_session(session: &Session) -> Result<()> { let path = bot_token_path()?; let content = serde_json::to_string_pretty(session)?; fs::write(&path, content)?; - println!("Bot token saved to {:?}", path); Ok(()) } diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..450dbd2 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,57 @@ +use thiserror::Error; + +/// Structured error types for ailog +#[derive(Debug, Error)] +pub enum AppError { + #[error("Session expired: {0}")] + SessionExpired(String), + + #[error("XRPC error ({status}): {message}")] + Xrpc { + status: u16, + message: String, + #[source] + source: Option>, + }, + + #[error("Network error: {0}")] + Network(#[from] reqwest::Error), + + #[error("Rate limited: retry after {retry_after_secs}s")] + RateLimited { retry_after_secs: u64 }, + + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), +} + +impl AppError { + pub fn xrpc(status: u16, message: impl Into) -> Self { + Self::Xrpc { + status, + message: message.into(), + source: None, + } + } +} + +/// ATProto error response body +#[derive(Debug, serde::Deserialize)] +pub struct AtprotoErrorResponse { + pub error: Option, + pub message: Option, +} + +impl AtprotoErrorResponse { + /// Format as a human-readable string + pub fn to_display_string(&self) -> String { + match (&self.error, &self.message) { + (Some(e), Some(m)) => format!("{}: {}", e, m), + (Some(e), None) => e.clone(), + (None, Some(m)) => m.clone(), + (None, None) => "Unknown error".to_string(), + } + } +} diff --git a/src/lms/chat.rs b/src/lms/chat.rs index d280c30..2b4f3ee 100644 --- a/src/lms/chat.rs +++ b/src/lms/chat.rs @@ -8,6 +8,7 @@ use std::fs; use std::path::Path; use crate::commands::token::{self, BUNDLE_ID}; +use crate::tid; #[derive(Debug, Clone, Serialize, Deserialize)] struct ChatMessage { @@ -135,19 +136,6 @@ fn save_session(session: &ChatSession) -> Result<()> { Ok(()) } -/// Generate TID -fn generate_tid() -> String { - use rand::Rng; - const CHARSET: &[u8] = b"234567abcdefghijklmnopqrstuvwxyz"; - let mut rng = rand::thread_rng(); - (0..13) - .map(|_| { - let idx = rng.gen_range(0..CHARSET.len()); - CHARSET[idx] as char - }) - .collect() -} - /// Call LLM API async fn call_llm(client: &reqwest::Client, url: &str, model: &str, messages: &[ChatMessage]) -> Result { let max_tokens = env::var("CHAT_MAX_TOKENS") @@ -185,7 +173,7 @@ fn save_chat_local( root_uri: Option<&str>, parent_uri: Option<&str>, ) -> Result { - let rkey = generate_tid(); + let rkey = tid::generate_tid(); let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); let uri = format!("at://{}/ai.syui.log.chat/{}", did, rkey); diff --git a/src/main.rs b/src/main.rs index 6f60264..edfacd2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,11 @@ mod commands; +mod error; mod lexicons; mod lms; mod mcp; +mod tid; +mod types; +mod xrpc; use anyhow::Result; use clap::{Parser, Subcommand}; @@ -157,6 +161,13 @@ enum Commands { #[command(alias = "v")] Version, + /// Notification commands + #[command(alias = "n")] + Notify { + #[command(subcommand)] + command: NotifyCommands, + }, + /// PDS commands Pds { #[command(subcommand)] @@ -164,6 +175,21 @@ enum Commands { }, } +#[derive(Subcommand)] +enum NotifyCommands { + /// List notifications (JSON) + #[command(alias = "ls")] + List { + /// Max number of notifications + #[arg(short, long, default_value = "25")] + limit: u32, + }, + /// Get unread count (JSON) + Count, + /// Mark all notifications as seen + Seen, +} + #[derive(Subcommand)] enum PdsCommands { /// Check PDS versions @@ -187,22 +213,22 @@ async fn main() -> Result<()> { commands::auth::login(&handle, &password, &server, bot).await?; } Commands::Lexicon { file } => { - commands::post::put_lexicon(&file).await?; + commands::record::put_lexicon(&file).await?; } Commands::Post { file, collection, rkey } => { - commands::post::put_record(&file, &collection, rkey.as_deref()).await?; + commands::record::put_record(&file, &collection, rkey.as_deref()).await?; } Commands::Get { collection, limit } => { - commands::post::get_records(&collection, limit).await?; + commands::record::get_records(&collection, limit).await?; } Commands::Delete { collection, rkey } => { - commands::post::delete_record(&collection, &rkey).await?; + commands::record::delete_record(&collection, &rkey).await?; } Commands::Sync { output, bot, collection } => { - commands::post::sync_to_local(&output, bot, collection.as_deref()).await?; + commands::sync::sync_to_local(&output, bot, collection.as_deref()).await?; } Commands::Push { input, collection, bot } => { - commands::post::push_to_remote(&input, &collection, bot).await?; + commands::push::push_to_remote(&input, &collection, bot).await?; } Commands::Gen { input, output } => { commands::gen::generate(&input, &output)?; @@ -225,6 +251,19 @@ async fn main() -> Result<()> { Commands::Version => { println!("{}", env!("CARGO_PKG_VERSION")); } + Commands::Notify { command } => { + match command { + NotifyCommands::List { limit } => { + commands::notify::list(limit).await?; + } + NotifyCommands::Count => { + commands::notify::count().await?; + } + NotifyCommands::Seen => { + commands::notify::update_seen().await?; + } + } + } Commands::Pds { command } => { match command { PdsCommands::Version { networks } => { diff --git a/src/mcp/mod.rs b/src/mcp/mod.rs index bd40be6..e2e4c51 100644 --- a/src/mcp/mod.rs +++ b/src/mcp/mod.rs @@ -6,6 +6,7 @@ use std::fs; use std::env; use crate::commands::token; +use crate::tid; const BUNDLE_ID: &str = "ai.syui.log"; @@ -146,19 +147,6 @@ fn save_mcp_session(session: &McpSession) -> Result<()> { Ok(()) } -/// Generate TID (timestamp-based ID) -fn generate_tid() -> String { - const CHARSET: &[u8] = b"234567abcdefghijklmnopqrstuvwxyz"; - use rand::Rng; - let mut rng = rand::thread_rng(); - (0..13) - .map(|_| { - let idx = rng.gen_range(0..CHARSET.len()); - CHARSET[idx] as char - }) - .collect() -} - /// Save chat record to local file fn save_chat_record( output_dir: &str, @@ -169,7 +157,7 @@ fn save_chat_record( parent_uri: Option<&str>, translations: Option<&std::collections::HashMap>, ) -> Result { - let rkey = generate_tid(); + let rkey = tid::generate_tid(); let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); let uri = format!("at://{}/ai.syui.log.chat/{}", did, rkey); diff --git a/src/tid.rs b/src/tid.rs new file mode 100644 index 0000000..7e631f8 --- /dev/null +++ b/src/tid.rs @@ -0,0 +1,83 @@ +use std::sync::atomic::{AtomicU32, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Base32-sort character set used by ATProto TIDs +const BASE32_SORT: &[u8; 32] = b"234567abcdefghijklmnopqrstuvwxyz"; + +/// Atomic counter for clock ID to avoid collisions within the same microsecond +static CLOCK_ID: AtomicU32 = AtomicU32::new(0); + +/// Generate a TID (Timestamp Identifier) per the ATProto specification. +/// +/// Format: 13 characters of base32-sort encoding +/// - Bits 63..10: microsecond timestamp (54 bits) +/// - Bits 9..0: clock ID (10 bits, wrapping counter) +/// +/// The high bit (bit 63) is always 0 to keep the value positive. +pub fn generate_tid() -> String { + let micros = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system clock before UNIX epoch") + .as_micros() as u64; + + let clk = CLOCK_ID.fetch_add(1, Ordering::Relaxed) & 0x3FF; // 10-bit wrap + + // Combine: timestamp in upper 54 bits, clock ID in lower 10 bits + let tid_value: u64 = (micros << 10) | (clk as u64); + + encode_base32_sort(tid_value) +} + +/// Encode a u64 into a 13-character base32-sort string (big-endian, zero-padded). +fn encode_base32_sort(mut value: u64) -> String { + let mut buf = [b'2'; 13]; // '2' is 0 in base32-sort + + for i in (0..13).rev() { + buf[i] = BASE32_SORT[(value & 0x1F) as usize]; + value >>= 5; + } + + // Safety: all chars are ASCII + String::from_utf8(buf.to_vec()).expect("base32-sort is always valid UTF-8") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn tid_length() { + let tid = generate_tid(); + assert_eq!(tid.len(), 13); + } + + #[test] + fn tid_charset() { + let tid = generate_tid(); + let valid: &str = "234567abcdefghijklmnopqrstuvwxyz"; + for c in tid.chars() { + assert!(valid.contains(c), "invalid char in TID: {}", c); + } + } + + #[test] + fn tid_monotonic() { + let a = generate_tid(); + let b = generate_tid(); + // TIDs generated in sequence should sort correctly + assert!(a < b || a == b, "TIDs should be monotonically increasing: {} >= {}", a, b); + } + + #[test] + fn encode_zero() { + let encoded = encode_base32_sort(0); + assert_eq!(encoded, "2222222222222"); + } + + #[test] + fn encode_known_value() { + // Verify encoding produces consistent results + let encoded = encode_base32_sort(1); + assert_eq!(encoded, "2222222222223"); + } +} diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 0000000..fcddaf5 --- /dev/null +++ b/src/types.rs @@ -0,0 +1,76 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +/// ATProto putRecord request body +#[derive(Debug, Serialize)] +pub struct PutRecordRequest { + pub repo: String, + pub collection: String, + pub rkey: String, + pub record: Value, +} + +/// ATProto deleteRecord request body +#[derive(Debug, Serialize)] +pub struct DeleteRecordRequest { + pub repo: String, + pub collection: String, + pub rkey: String, +} + +/// ATProto putRecord response +#[derive(Debug, Deserialize)] +pub struct PutRecordResponse { + pub uri: String, + pub cid: String, +} + +/// ATProto listRecords response +#[derive(Debug, Deserialize)] +pub struct ListRecordsResponse { + pub records: Vec, + #[serde(default)] + #[allow(dead_code)] + pub cursor: Option, +} + +/// A single ATProto record (from listRecords / getRecord) +#[derive(Debug, Deserialize)] +pub struct Record { + pub uri: String, + pub cid: String, + pub value: Value, +} + +/// ATProto describeRepo response +#[derive(Debug, Deserialize)] +pub struct DescribeRepoResponse { + pub did: String, + pub handle: String, + pub collections: Vec, +} + +/// ATProto createSession request +#[derive(Debug, Serialize)] +pub struct CreateSessionRequest { + pub identifier: String, + pub password: String, +} + +/// ATProto createSession / refreshSession response +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateSessionResponse { + pub did: String, + pub handle: String, + pub access_jwt: String, + pub refresh_jwt: String, +} + +/// Local config.json structure +#[derive(Debug, Deserialize)] +pub struct Config { + pub handle: String, + #[serde(default)] + pub collection: Option, +} diff --git a/src/xrpc.rs b/src/xrpc.rs new file mode 100644 index 0000000..64d8454 --- /dev/null +++ b/src/xrpc.rs @@ -0,0 +1,187 @@ +use anyhow::{Context, Result}; +use serde::de::DeserializeOwned; +use serde::Serialize; + +use crate::error::{AppError, AtprotoErrorResponse}; +use crate::lexicons::{self, Endpoint}; + +/// XRPC client wrapping reqwest with ATProto-specific error handling. +#[derive(Clone)] +pub struct XrpcClient { + inner: reqwest::Client, + pds_host: String, +} + +impl XrpcClient { + /// Create a new XrpcClient targeting the given PDS host (e.g. "syu.is"). + pub fn new(pds_host: &str) -> Self { + Self { + inner: reqwest::Client::new(), + pds_host: pds_host.to_string(), + } + } + + /// Build XRPC URL for an endpoint + fn url(&self, endpoint: &Endpoint) -> String { + lexicons::url(&self.pds_host, endpoint) + } + + /// Authenticated GET query + pub async fn query_auth( + &self, + endpoint: &Endpoint, + params: &[(&str, &str)], + token: &str, + ) -> Result { + let mut url = self.url(endpoint); + if !params.is_empty() { + url.push('?'); + let qs: Vec = params + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect(); + url.push_str(&qs.join("&")); + } + + let res = self + .inner + .get(&url) + .header("Authorization", format!("Bearer {}", token)) + .send() + .await + .context("XRPC authenticated query failed")?; + + self.handle_response(res).await + } + + /// Authenticated POST call (procedure) + pub async fn call( + &self, + endpoint: &Endpoint, + body: &B, + token: &str, + ) -> Result { + let url = self.url(endpoint); + + let res = self + .inner + .post(&url) + .header("Authorization", format!("Bearer {}", token)) + .json(body) + .send() + .await + .context("XRPC call failed")?; + + self.handle_response(res).await + } + + /// Unauthenticated POST call (e.g. createSession) + pub async fn call_unauth( + &self, + endpoint: &Endpoint, + body: &B, + ) -> Result { + let url = self.url(endpoint); + + let res = self + .inner + .post(&url) + .json(body) + .send() + .await + .context("XRPC unauthenticated call failed")?; + + self.handle_response(res).await + } + + /// Authenticated POST that returns no body (or we ignore the response body) + pub async fn call_no_response( + &self, + endpoint: &Endpoint, + body: &B, + token: &str, + ) -> Result<()> { + let url = self.url(endpoint); + + let res = self + .inner + .post(&url) + .header("Authorization", format!("Bearer {}", token)) + .json(body) + .send() + .await + .context("XRPC call failed")?; + + if !res.status().is_success() { + return Err(self.parse_error(res).await); + } + + Ok(()) + } + + /// Authenticated POST with only a bearer token (no JSON body, e.g. refreshSession) + pub async fn call_bearer( + &self, + endpoint: &Endpoint, + token: &str, + ) -> Result { + let url = self.url(endpoint); + + let res = self + .inner + .post(&url) + .header("Authorization", format!("Bearer {}", token)) + .send() + .await + .context("XRPC bearer call failed")?; + + self.handle_response(res).await + } + + /// Handle an XRPC response: check status, parse ATProto errors, deserialize body. + async fn handle_response(&self, res: reqwest::Response) -> Result { + if !res.status().is_success() { + return Err(self.parse_error(res).await); + } + + let body = res.json::().await.context("Failed to parse XRPC response body")?; + Ok(body) + } + + /// Parse an error response into an AppError + async fn parse_error(&self, res: reqwest::Response) -> anyhow::Error { + let status = res.status().as_u16(); + + // Check for rate limiting + if status == 429 { + let retry_after = res + .headers() + .get("retry-after") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) + .unwrap_or(5); + return AppError::RateLimited { + retry_after_secs: retry_after, + } + .into(); + } + + // Try to parse ATProto error body + let body_text = res.text().await.unwrap_or_default(); + let message = if let Ok(at_err) = serde_json::from_str::(&body_text) + { + // Check for expired token + if at_err.error.as_deref() == Some("ExpiredToken") { + return AppError::SessionExpired( + at_err.message.unwrap_or_else(|| "Token expired".to_string()), + ) + .into(); + } + at_err.to_display_string() + } else { + body_text + }; + + AppError::xrpc(status, message).into() + } +}