diff --git a/Cargo.toml b/Cargo.toml index a90413c..3ea1a29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,4 @@ tracing = "0.1" tracing-subscriber = "0.3" base64 = "0.22" urlencoding = "2" +reqwest = { version = "0.12", features = ["json"] } diff --git a/src/main.rs b/src/main.rs index b2b7f7e..c1cc04a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -274,7 +274,7 @@ struct ErrorResp { // --- App State --- struct AppState { - db: Mutex, + db: Arc>, } // --- Auth --- @@ -398,15 +398,54 @@ fn init_db(conn: &Connection) { } fn ensure_account(conn: &Connection, did: &str) { - // handle must be a valid handle format, not a DID - let handle = format!("{}.chat.invalid", did.split(':').last().unwrap_or("unknown")); + let fallback_handle = format!("{}.chat.invalid", did.split(':').last().unwrap_or("unknown")); conn.execute( "INSERT OR IGNORE INTO accounts (did, handle, created_at) VALUES (?1, ?2, ?3)", - rusqlite::params![did, handle, now_iso()], + rusqlite::params![did, fallback_handle, now_iso()], ) .ok(); } +/// Resolve profile from PDS/appview and update account in DB +async fn resolve_and_update_account(db: Arc>, did: String) { + // Skip if already resolved (handle doesn't end with .chat.invalid) + { + let conn = db.lock().unwrap(); + let handle: Option = conn.query_row( + "SELECT handle FROM accounts WHERE did = ?1", + rusqlite::params![did], + |row| row.get(0), + ).ok(); + if let Some(h) = handle { + if !h.ends_with(".chat.invalid") { + return; + } + } + } + + let appview_url = std::env::var("APPVIEW_URL") + .unwrap_or_else(|_| "https://bsky.syu.is".into()); + let url = format!( + "{}/xrpc/app.bsky.actor.getProfile?actor={}", + appview_url, did + ); + + let Ok(resp) = reqwest::get(&url).await else { return }; + let Ok(profile) = resp.json::().await else { return }; + + let handle = profile.get("handle").and_then(|v| v.as_str()).unwrap_or_default(); + let display_name = profile.get("displayName").and_then(|v| v.as_str()); + let avatar = profile.get("avatar").and_then(|v| v.as_str()); + + if !handle.is_empty() { + let conn = db.lock().unwrap(); + conn.execute( + "UPDATE accounts SET handle = ?1, display_name = ?2, avatar = ?3 WHERE did = ?4", + rusqlite::params![handle, display_name, avatar, did], + ).ok(); + } +} + fn get_or_create_convo(conn: &Connection, members: &[String]) -> ConvoView { let mut sorted = members.to_vec(); sorted.sort(); @@ -779,10 +818,21 @@ async fn get_convo_for_members( message: "Caller must be a member".into(), }))); } - let db = state.db.lock().unwrap(); - for m in &members { - ensure_account(&db, m); + { + let db = state.db.lock().unwrap(); + for m in &members { + ensure_account(&db, m); + } } + // Resolve profiles in background + for m in &members { + let db = state.db.clone(); + let did = m.clone(); + tokio::spawn(async move { + resolve_and_update_account(db, did).await; + }); + } + let db = state.db.lock().unwrap(); let convo = get_or_create_convo(&db, &members); Ok(Json(GetConvoResp { convo })) } @@ -811,23 +861,35 @@ async fn list_convos( Query(params): Query, ) -> Result, (StatusCode, Json)> { let did = require_auth(&headers)?; - let db = state.db.lock().unwrap(); - let limit = params.limit.unwrap_or(20).min(100); - let status_filter = params.status.unwrap_or_else(|| "accepted".into()); + let (convos, member_dids) = { + let db = state.db.lock().unwrap(); + let limit = params.limit.unwrap_or(20).min(100); + let status_filter = params.status.unwrap_or_else(|| "accepted".into()); - let mut stmt = db.prepare( - "SELECT cm.convo_id FROM convo_members cm - JOIN convos c ON c.id = cm.convo_id - WHERE cm.did = ?1 AND cm.status = ?2 - ORDER BY c.updated_at DESC LIMIT ?3", - ).unwrap(); + let mut stmt = db.prepare( + "SELECT cm.convo_id FROM convo_members cm + JOIN convos c ON c.id = cm.convo_id + WHERE cm.did = ?1 AND cm.status = ?2 + ORDER BY c.updated_at DESC LIMIT ?3", + ).unwrap(); - let convo_ids: Vec = stmt - .query_map(rusqlite::params![did, status_filter, limit], |row| row.get(0)) - .unwrap().filter_map(|r| r.ok()).collect(); + let convo_ids: Vec = stmt + .query_map(rusqlite::params![did, status_filter, limit], |row| row.get(0)) + .unwrap().filter_map(|r| r.ok()).collect(); - let convos: Vec = convo_ids.iter() - .map(|id| load_convo(&db, id, Some(&did))).collect(); + let convos: Vec = convo_ids.iter() + .map(|id| load_convo(&db, id, Some(&did))).collect(); + + let member_dids: Vec = convos.iter() + .flat_map(|c| c.members.iter().map(|m| m.did.clone())).collect(); + + (convos, member_dids) + }; + // Resolve profiles in background + for m in member_dids { + let db = state.db.clone(); + tokio::spawn(async move { resolve_and_update_account(db, m).await; }); + } Ok(Json(ListConvosResp { cursor: None, convos })) } @@ -1134,7 +1196,7 @@ async fn main() { let conn = Connection::open(&db_path).expect("Failed to open database"); init_db(&conn); - let state = Arc::new(AppState { db: Mutex::new(conn) }); + let state = Arc::new(AppState { db: Arc::new(Mutex::new(conn)) }); let cors = CorsLayer::new() .allow_origin(Any)