4 Commits

Author SHA1 Message Date
1b7d37243c fix 2025-06-19 15:17:51 +09:00
50516fee21 fix 2025-06-19 15:09:35 +09:00
01c4c543fc fix 2025-06-19 15:02:54 +09:00
ca31760728 fix hugo callback 2025-06-19 14:48:54 +09:00
5 changed files with 63 additions and 335 deletions

View File

@@ -53,9 +53,7 @@
"WebFetch(domain:atproto.com)", "WebFetch(domain:atproto.com)",
"WebFetch(domain:syu.is)", "WebFetch(domain:syu.is)",
"Bash(sed:*)", "Bash(sed:*)",
"Bash(./scpt/run.zsh:*)", "Bash(./scpt/run.zsh:*)"
"Bash(RUST_LOG=debug cargo run -- stream status)",
"Bash(RUST_LOG=debug cargo run -- stream test-api)"
], ],
"deny": [] "deny": []
} }

1
.gitignore vendored
View File

@@ -19,4 +19,3 @@ cloudflared-config.yml
atproto atproto
oauth_old oauth_old
oauth_example oauth_example
my-blog/static/oauth/assets/comment-atproto*

View File

@@ -1,3 +0,0 @@
<!-- OAuth Comment System - Load globally for session management -->
<script type="module" crossorigin src="/assets/comment-atproto-BQKPMV57.js"></script>
<link rel="stylesheet" crossorigin href="/assets/comment-atproto-BUFiApUA.css">

View File

@@ -315,8 +315,6 @@ struct JetstreamMessage {
struct JetstreamCommit { struct JetstreamCommit {
operation: Option<String>, operation: Option<String>,
uri: Option<String>, uri: Option<String>,
record: Option<Value>,
collection: Option<String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -335,6 +333,7 @@ struct UserListRecord {
created_at: String, created_at: String,
#[serde(rename = "updatedBy")] #[serde(rename = "updatedBy")]
updated_by: UserInfo, updated_by: UserInfo,
metadata: Option<Value>,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@@ -424,7 +423,10 @@ pub async fn init_user_list(project_dir: Option<PathBuf>, handles: Option<String
// Create the initial user list // Create the initial user list
println!("{}", format!("📝 Creating user list with {} users...", users.len()).cyan()); println!("{}", format!("📝 Creating user list with {} users...", users.len()).cyan());
match post_user_list(&mut config, &users).await { match post_user_list(&mut config, &users, json!({
"reason": "initial_setup",
"created_by": "ailog_stream_init"
})).await {
Ok(_) => println!("{}", "✅ User list created successfully!".green()), Ok(_) => println!("{}", "✅ User list created successfully!".green()),
Err(e) => { Err(e) => {
println!("{}", format!("❌ Failed to create user list: {}", e).red()); println!("{}", format!("❌ Failed to create user list: {}", e).red());
@@ -604,15 +606,13 @@ async fn run_monitor(config: &mut AuthConfig) -> Result<()> {
let (mut write, mut read) = ws_stream.split(); let (mut write, mut read) = ws_stream.split();
// Subscribe to collections using Jetstream 2.0 format // Subscribe to collections
let subscribe_msg = json!({ let subscribe_msg = json!({
"wantedCollections": config.jetstream.collections, "wantedCollections": config.jetstream.collections
"wantedDids": [config.admin.did]
}); });
write.send(Message::Text(subscribe_msg.to_string())).await?; write.send(Message::Text(subscribe_msg.to_string())).await?;
println!("{}", format!("📨 Subscribed to collections: {:?} for DID: {}", println!("{}", "📨 Subscribed to collections".blue());
config.jetstream.collections, config.admin.did).blue());
// Start periodic polling task // Start periodic polling task
let config_clone = config.clone(); let config_clone = config.clone();
@@ -625,27 +625,15 @@ async fn run_monitor(config: &mut AuthConfig) -> Result<()> {
while let Some(msg) = read.next().await { while let Some(msg) = read.next().await {
match msg? { match msg? {
Message::Text(text) => { Message::Text(text) => {
// Check if this is a commit message with our collection // Filter out standard Bluesky collections for cleaner output
let is_custom_collection = text.contains("ai.syui.log") ||
text.contains(&config.admin.did);
let should_debug = std::env::var("AILOG_DEBUG").is_ok(); let should_debug = std::env::var("AILOG_DEBUG").is_ok();
let is_standard_collection = text.contains("app.bsky.feed.") || let is_standard_collection = text.contains("app.bsky.feed.") ||
text.contains("app.bsky.actor.") || text.contains("app.bsky.actor.") ||
text.contains("app.bsky.graph.") || text.contains("app.bsky.graph.");
text.contains("blue.flashes.") ||
text.contains("\"kind\":\"identity\"") ||
text.contains("\"kind\":\"account\"");
// Always show custom collection messages // Only show debug for custom collections or when explicitly requested
if is_custom_collection {
println!("{}", format!("🎯 Custom collection message: {}", text).green().bold());
}
// Only show debug for non-standard collections or when explicitly requested
if should_debug && (!is_standard_collection || std::env::var("AILOG_DEBUG_ALL").is_ok()) { if should_debug && (!is_standard_collection || std::env::var("AILOG_DEBUG_ALL").is_ok()) {
if !is_custom_collection { // Avoid double printing println!("{}", format!("🔍 Received: {}", text).blue());
println!("{}", format!("🔍 Received: {}", text).blue());
}
} }
if let Err(e) = handle_message(&text, config).await { if let Err(e) = handle_message(&text, config).await {
@@ -683,17 +671,7 @@ async fn run_monitor(config: &mut AuthConfig) -> Result<()> {
} }
async fn handle_message(text: &str, config: &mut AuthConfig) -> Result<()> { async fn handle_message(text: &str, config: &mut AuthConfig) -> Result<()> {
// println!("🔧 handle_message called with text length: {}", text.len()); let message: JetstreamMessage = serde_json::from_str(text)?;
let message: JetstreamMessage = match serde_json::from_str(text) {
Ok(msg) => {
// println!("✅ JSON parsed successfully");
msg
}
Err(e) => {
println!("❌ JSON parse error: {}", e);
return Err(e.into());
}
};
// Debug: Check all received collections (but filter standard ones) // Debug: Check all received collections (but filter standard ones)
if let Some(collection) = &message.collection { if let Some(collection) = &message.collection {
@@ -710,60 +688,35 @@ async fn handle_message(text: &str, config: &mut AuthConfig) -> Result<()> {
} }
// Check if this is a comment creation // Check if this is a comment creation
if let (Some(commit), Some(did)) = (&message.commit, &message.did) { if let (Some(collection), Some(commit), Some(did)) =
if let Some(collection) = &commit.collection { (&message.collection, &message.commit, &message.did) {
// Monitor both ai.syui.log and ai.syui.log.chat.comment collections if collection == &config.collections.comment() && commit.operation.as_deref() == Some("create") {
let is_main_collection = collection == &config.collections.comment();
let is_chat_comment_collection = collection == "ai.syui.log.chat.comment";
if collection == "ai.syui.log" || collection == "ai.syui.log.chat.comment" {
println!(" 🔍 Debug: collection='{}', expected='{}', is_main={}, is_chat={}",
collection, config.collections.comment(), is_main_collection, is_chat_comment_collection);
println!(" 🔍 Debug: operation={:?}", commit.operation);
}
if (is_main_collection || is_chat_comment_collection) && commit.operation.as_deref() == Some("create") {
let unknown_uri = "unknown".to_string(); let unknown_uri = "unknown".to_string();
let uri = commit.uri.as_ref().unwrap_or(&unknown_uri); let uri = commit.uri.as_ref().unwrap_or(&unknown_uri);
let collection_type = if is_main_collection { println!("{}", "🆕 New comment detected!".green().bold());
"main collection (ai.syui.log)"
} else {
"chat comment (ai.syui.log.chat.comment)"
};
println!("{}", format!("🆕 New comment detected from {}!", collection_type).green().bold());
println!(" 📝 URI: {}", uri); println!(" 📝 URI: {}", uri);
println!(" 👤 Author DID: {}", did); println!(" 👤 Author DID: {}", did);
// Extract author info from the jetstream record // Resolve handle
if let Some(record) = &commit.record { let ai_config = load_ai_config_from_project().unwrap_or_default();
if let Some(author) = record.get("author") { match resolve_handle(did, &ai_config.network).await {
if let (Some(author_did), Some(author_handle)) = ( Ok(handle) => {
author.get("did").and_then(|v| v.as_str()), println!(" 🏷️ Handle: {}", handle.cyan());
author.get("handle").and_then(|v| v.as_str())
) { // Update user list
println!(" 🏷️ Handle from record: {}", author_handle.cyan()); if let Err(e) = update_user_list(config, did, &handle).await {
println!("{}", format!(" ⚠️ Failed to update user list: {}", e).yellow());
// Update user list with jetstream author info
if let Err(e) = update_user_list(config, author_did, author_handle).await {
println!("{}", format!(" ⚠️ Failed to update user list: {}", e).yellow());
} else {
println!("{}", " ✅ User list updated successfully".green());
}
} else {
println!("{}", " ⚠️ Missing author DID or handle in record".yellow());
} }
} else {
println!("{}", " ⚠️ No author info in record".yellow());
} }
} else { Err(e) => {
println!("{}", " ⚠️ No record data in commit".yellow()); println!("{}", format!(" ⚠️ Failed to resolve handle: {}", e).yellow());
}
} }
println!(); println!();
} }
}
} }
Ok(()) Ok(())
@@ -909,7 +862,11 @@ async fn update_user_list(config: &mut AuthConfig, did: &str, handle: &str) -> R
updated_users.push(new_user); updated_users.push(new_user);
// Post updated user list // Post updated user list
post_user_list(config, &updated_users).await?; post_user_list(config, &updated_users, json!({
"reason": "auto_add_commenter",
"trigger_did": did,
"trigger_handle": handle
})).await?;
println!("{}", " ✅ User list updated successfully".green()); println!("{}", " ✅ User list updated successfully".green());
@@ -973,7 +930,7 @@ async fn get_current_user_list(config: &mut AuthConfig) -> Result<Vec<UserRecord
Ok(user_list) Ok(user_list)
} }
async fn post_user_list(config: &mut AuthConfig, users: &[UserRecord]) -> Result<()> { async fn post_user_list(config: &mut AuthConfig, users: &[UserRecord], metadata: Value) -> Result<()> {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let now = chrono::Utc::now(); let now = chrono::Utc::now();
@@ -991,6 +948,7 @@ async fn post_user_list(config: &mut AuthConfig, users: &[UserRecord]) -> Result
did: config.admin.did.clone(), did: config.admin.did.clone(),
handle: config.admin.handle.clone(), handle: config.admin.handle.clone(),
}, },
metadata: Some(metadata.clone()),
}; };
let url = format!("{}/xrpc/com.atproto.repo.putRecord", config.admin.pds); let url = format!("{}/xrpc/com.atproto.repo.putRecord", config.admin.pds);
@@ -1017,7 +975,7 @@ async fn post_user_list(config: &mut AuthConfig, users: &[UserRecord]) -> Result
if let Ok(_) = super::auth::load_config_with_refresh().await { if let Ok(_) = super::auth::load_config_with_refresh().await {
let refreshed_config = super::auth::load_config()?; let refreshed_config = super::auth::load_config()?;
*config = refreshed_config; *config = refreshed_config;
return Box::pin(post_user_list(config, users)).await; return Box::pin(post_user_list(config, users, metadata)).await;
} }
} }
let error_text = response.text().await?; let error_text = response.text().await?;
@@ -1155,35 +1113,17 @@ async fn poll_comments_periodically(mut config: AuthConfig) -> Result<()> {
let mut known_comments = HashSet::new(); let mut known_comments = HashSet::new();
let mut interval = interval(Duration::from_secs(30)); // Poll every 30 seconds let mut interval = interval(Duration::from_secs(30)); // Poll every 30 seconds
// Initial population of known comments - only add comments older than 5 minutes to allow recent ones to be processed // Initial population of known comments
if let Ok(comments) = get_recent_comments(&mut config).await { if let Ok(comments) = get_recent_comments(&mut config).await {
for comment in &comments { for comment in &comments {
if let Some(uri) = comment.get("uri").and_then(|v| v.as_str()) { if let Some(uri) = comment.get("uri").and_then(|v| v.as_str()) {
// Check if this comment is old enough to be considered "existing" known_comments.insert(uri.to_string());
let is_old_comment = if let Some(value) = comment.get("value") { if std::env::var("AILOG_DEBUG").is_ok() {
if let Some(created_at) = value.get("createdAt").and_then(|v| v.as_str()) { println!("{}", format!("🔍 Existing comment: {}", uri).blue());
!is_recent_comment(created_at)
} else {
true // If no timestamp, consider it old
}
} else {
true
};
if is_old_comment {
known_comments.insert(uri.to_string());
if std::env::var("AILOG_DEBUG").is_ok() {
println!("{}", format!("🔍 Existing comment: {}", uri).blue());
}
} else {
if std::env::var("AILOG_DEBUG").is_ok() {
println!("{}", format!("🆕 Recent comment will be processed: {}", uri).green());
}
} }
} }
} }
println!("{}", format!("📝 Found {} existing comments, {} recent comments will be processed", println!("{}", format!("📝 Found {} existing comments", known_comments.len()).blue());
known_comments.len(), comments.len() - known_comments.len()).blue());
// Debug: Show full response for first comment // Debug: Show full response for first comment
if std::env::var("AILOG_DEBUG").is_ok() && !comments.is_empty() { if std::env::var("AILOG_DEBUG").is_ok() && !comments.is_empty() {
@@ -1221,23 +1161,24 @@ async fn poll_comments_periodically(mut config: AuthConfig) -> Result<()> {
println!("{}", "🆕 New comment detected via polling!".green().bold()); println!("{}", "🆕 New comment detected via polling!".green().bold());
println!(" 📝 URI: {}", uri); println!(" 📝 URI: {}", uri);
// Extract author DID and handle from comment value // Extract author DID from URI
if let Some(author) = value.get("author") { if let Some(did) = extract_did_from_uri(uri) {
if let (Some(author_did), Some(author_handle)) = ( println!(" 👤 Author DID: {}", did);
author.get("did").and_then(|v| v.as_str()),
author.get("handle").and_then(|v| v.as_str()) // Resolve handle and update user list
) { let ai_config = load_ai_config_from_project().unwrap_or_default();
println!(" 👤 Author DID: {}", author_did); match resolve_handle(&did, &ai_config.network).await {
println!(" 🏷️ Handle: {}", author_handle.cyan()); Ok(handle) => {
println!(" 🏷️ Handle: {}", handle.cyan());
if let Err(e) = update_user_list(&mut config, author_did, author_handle).await {
println!("{}", format!(" ⚠️ Failed to update user list: {}", e).yellow()); if let Err(e) = update_user_list(&mut config, &did, &handle).await {
println!("{}", format!(" ⚠️ Failed to update user list: {}", e).yellow());
}
}
Err(e) => {
println!("{}", format!(" ⚠️ Failed to resolve handle: {}", e).yellow());
} }
} else {
println!("{}", " ⚠️ Comment missing author DID or handle".yellow());
} }
} else {
println!("{}", " ⚠️ Comment missing author information".yellow());
} }
println!(); println!();
@@ -1307,8 +1248,8 @@ fn is_recent_comment(created_at: &str) -> bool {
let comment_utc = comment_time.with_timezone(&Utc); let comment_utc = comment_time.with_timezone(&Utc);
let diff = now.signed_duration_since(comment_utc); let diff = now.signed_duration_since(comment_utc);
// Consider comments from the last 15 minutes as "recent" (more generous for testing) // Consider comments from the last 5 minutes as "recent"
diff <= Duration::minutes(15) && diff >= Duration::zero() diff <= Duration::minutes(5) && diff >= Duration::zero()
} else { } else {
false false
} }
@@ -1386,197 +1327,6 @@ async fn resolve_handle_to_did(handle: &str, network_config: &NetworkConfig) ->
Ok(did.to_string()) Ok(did.to_string())
} }
pub async fn test_polling_cycle() -> Result<()> {
println!("{}", "🧪 Testing complete polling cycle logic...".cyan().bold());
let mut config = load_config_with_refresh().await?;
println!("👤 Testing as: {}", config.admin.handle.green());
println!("🌐 PDS: {}", config.admin.pds);
println!("🆔 DID: {}", config.admin.did);
println!();
// Simulate the polling logic exactly as it runs in the stream
let mut known_comments = HashSet::new();
// Initial population (skip recent comments)
println!("{}", "📊 Step 1: Initial population of known comments".cyan());
if let Ok(comments) = get_recent_comments(&mut config).await {
for comment in &comments {
if let Some(uri) = comment.get("uri").and_then(|v| v.as_str()) {
let is_old_comment = if let Some(value) = comment.get("value") {
if let Some(created_at) = value.get("createdAt").and_then(|v| v.as_str()) {
!is_recent_comment(created_at)
} else {
true
}
} else {
true
};
if is_old_comment {
known_comments.insert(uri.to_string());
println!(" 🔍 Added to known: {}", uri);
} else {
println!(" 🆕 Skipped (recent): {}", uri);
}
}
}
println!("{}", format!("📝 Found {} existing comments, {} recent comments will be processed",
known_comments.len(), comments.len() - known_comments.len()).blue());
}
// Simulate first polling cycle
println!("{}", "\n📊 Step 2: First polling cycle".cyan());
if let Ok(comments) = get_recent_comments(&mut config).await {
for comment in comments {
if let (Some(uri), Some(value)) = (
comment.get("uri").and_then(|v| v.as_str()),
comment.get("value")
) {
if !known_comments.contains(uri) {
known_comments.insert(uri.to_string());
if let Some(created_at) = value.get("createdAt").and_then(|v| v.as_str()) {
if is_recent_comment(created_at) {
println!("{}", "🆕 New comment detected via polling!".green().bold());
println!(" 📝 URI: {}", uri);
// Extract author DID and handle from comment value
if let Some(author) = value.get("author") {
if let (Some(author_did), Some(author_handle)) = (
author.get("did").and_then(|v| v.as_str()),
author.get("handle").and_then(|v| v.as_str())
) {
println!(" 👤 Author DID: {}", author_did);
println!(" 🏷️ Handle: {}", author_handle.cyan());
println!(" 🧪 Calling update_user_list...");
match update_user_list(&mut config, author_did, author_handle).await {
Ok(_) => {
println!(" ✅ User list updated successfully!");
}
Err(e) => {
println!("{}", format!(" ❌ Failed to update user list: {}", e).red());
}
}
} else {
println!("{}", " ⚠️ Comment missing author DID or handle".yellow());
}
} else {
println!("{}", " ⚠️ Comment missing author information".yellow());
}
println!();
} else {
println!(" ⏭️ Not recent: {}", uri);
}
}
} else {
println!(" ⏭️ Already known: {}", uri);
}
}
}
}
Ok(())
}
pub async fn test_recent_detection() -> Result<()> {
println!("{}", "🧪 Testing recent comment detection logic...".cyan().bold());
let mut config = load_config_with_refresh().await?;
println!("👤 Testing as: {}", config.admin.handle.green());
println!("🌐 PDS: {}", config.admin.pds);
println!("🆔 DID: {}", config.admin.did);
println!();
// Get recent comments and test the detection logic
match get_recent_comments(&mut config).await {
Ok(comments) => {
println!("{}", format!("📊 Retrieved {} comments from API", comments.len()).cyan());
for (i, comment) in comments.iter().enumerate() {
if let (Some(uri), Some(value)) = (
comment.get("uri").and_then(|v| v.as_str()),
comment.get("value")
) {
println!(" {}. URI: {}", i + 1, uri);
if let Some(created_at) = value.get("createdAt").and_then(|v| v.as_str()) {
let is_recent = is_recent_comment(created_at);
println!(" Created: {} - Recent: {}", created_at,
if is_recent { "✅ YES".green() } else { "❌ NO".red() });
if is_recent {
if let Some(did) = extract_did_from_uri(uri) {
println!(" 🎯 Would process: DID {} from recent comment", did.cyan());
}
}
}
if let Some(author) = value.get("author") {
if let Some(author_did) = author.get("did").and_then(|v| v.as_str()) {
if let Some(handle) = author.get("handle").and_then(|v| v.as_str()) {
println!(" 👤 Author: {} ({})", handle, author_did);
}
}
}
println!();
}
}
}
Err(e) => {
println!("{}", format!("❌ Failed to get comments: {}", e).red());
return Err(e);
}
}
Ok(())
}
pub async fn test_user_update() -> Result<()> {
println!("{}", "🧪 Testing user list update functionality...".cyan().bold());
let mut config = load_config_with_refresh().await?;
println!("👤 Testing as: {}", config.admin.handle.green());
println!("🌐 PDS: {}", config.admin.pds);
println!("🆔 DID: {}", config.admin.did);
println!();
// Get existing user list
let current_users = get_current_user_list(&mut config).await?;
println!("{}", format!("📋 Current user list has {} users", current_users.len()).cyan());
for user in &current_users {
println!(" 👤 {} ({}) - {}", user.handle, user.did, user.pds);
}
// Test adding a dummy user (simulate a new commenter)
let test_did = "did:plc:test-user-update-12345";
let test_handle = "test.user.bsky.social";
println!("{}", format!("🧪 Simulating new user: {} ({})", test_handle, test_did).yellow());
match update_user_list(&mut config, test_did, test_handle).await {
Ok(_) => {
println!("{}", "✅ User list update test successful!".green());
// Verify the update
let updated_users = get_current_user_list(&mut config).await?;
println!("{}", format!("📋 Updated user list now has {} users", updated_users.len()).cyan());
}
Err(e) => {
println!("{}", format!("❌ User list update test failed: {}", e).red());
return Err(e);
}
}
Ok(())
}
pub async fn test_api() -> Result<()> { pub async fn test_api() -> Result<()> {
println!("{}", "🧪 Testing API access to comments collection...".cyan().bold()); println!("{}", "🧪 Testing API access to comments collection...".cyan().bold());
@@ -1606,9 +1356,8 @@ pub async fn test_api() -> Result<()> {
println!(" Created: {}", created_at); println!(" Created: {}", created_at);
} }
if let Some(text) = value.get("text").and_then(|v| v.as_str()) { if let Some(text) = value.get("text").and_then(|v| v.as_str()) {
let preview = if text.chars().count() > 50 { let preview = if text.len() > 50 {
let truncated: String = text.chars().take(50).collect(); format!("{}...", &text[..50])
format!("{}...", truncated)
} else { } else {
text.to_string() text.to_string()
}; };

View File

@@ -152,12 +152,6 @@ enum StreamCommands {
Status, Status,
/// Test API access to comments collection /// Test API access to comments collection
Test, Test,
/// Test user list update functionality
TestUserUpdate,
/// Test recent comment detection logic
TestRecentDetection,
/// Test complete polling cycle logic
TestPollingCycle,
} }
#[derive(Subcommand)] #[derive(Subcommand)]
@@ -241,15 +235,6 @@ async fn main() -> Result<()> {
StreamCommands::Test => { StreamCommands::Test => {
commands::stream::test_api().await?; commands::stream::test_api().await?;
} }
StreamCommands::TestUserUpdate => {
commands::stream::test_user_update().await?;
}
StreamCommands::TestRecentDetection => {
commands::stream::test_recent_detection().await?;
}
StreamCommands::TestPollingCycle => {
commands::stream::test_polling_cycle().await?;
}
} }
} }
Commands::Oauth { command } => { Commands::Oauth { command } => {