diff --git a/envs/feed b/envs/feed index 60f82c3..1a2464e 100644 --- a/envs/feed +++ b/envs/feed @@ -3,5 +3,5 @@ FEEDGEN_LISTENHOST=0.0.0.0 FEEDGEN_SQLITE_LOCATION=/data/db.sqlite FEEDGEN_HOSTNAME=feed.syu.is FEEDGEN_PUBLISHER_DID=did:plc:6qyecktefllvenje24fcxnie -FEEDGEN_SUBSCRIPTION_ENDPOINT=ws://bgs:2470 FEEDGEN_SERVICE_DID=did:web:feed.syu.is +FEEDGEN_JETSTREAM_URL=ws://jetstream:6008/subscribe diff --git a/envs/jetstream b/envs/jetstream index 324bbd7..950a3c9 100644 --- a/envs/jetstream +++ b/envs/jetstream @@ -1,4 +1,4 @@ -JETSTREAM_WS_URL=wss://bgs.${host}/xrpc/com.atproto.sync.subscribeRepos +JETSTREAM_WS_URL=ws://bgs.${host}/xrpc/com.atproto.sync.subscribeRepos JETSTREAM_DATA_DIR=/data JETSTREAM_LISTEN_ADDR=:6008 JETSTREAM_METRICS_LISTEN_ADDR=:6009 diff --git a/patching/200-feed-generator-custom.patch b/patching/200-feed-generator-custom.patch index 8c68f4c..3ee2a6a 100644 --- a/patching/200-feed-generator-custom.patch +++ b/patching/200-feed-generator-custom.patch @@ -22,14 +22,31 @@ index 0000000..993c83d + +EXPOSE 3000 +CMD ["yarn", "start"] +diff --git a/package.json b/package.json +index 1431a9e..6a7c33c 100644 +--- a/package.json ++++ b/package.json +@@ -23,9 +23,11 @@ + "dotenv": "^16.0.3", + "express": "^4.18.2", + "kysely": "^0.27.4", +- "multiformats": "^9.9.0" ++ "multiformats": "^9.9.0", ++ "ws": "^8.14.2" + }, + "devDependencies": { ++ "@types/ws": "^8.5.10", + "@types/better-sqlite3": "^7.6.11", + "@types/express": "^4.17.17", + "@types/node": "^20.1.2", diff --git a/scripts/publish.ts b/scripts/publish.ts new file mode 100644 -index 0000000..966edcf +index 0000000..044f1d9 --- /dev/null +++ b/scripts/publish.ts @@ -0,0 +1,64 @@ +import dotenv from 'dotenv' -+import { AtpAgent, BlobRef, AppBskyFeedDefs } from '@atproto/api' ++import { AtpAgent, BlobRef } from '@atproto/api' +import fs from 'fs/promises' +import { ids } from '../src/lexicon/lexicons' + @@ -88,7 +105,7 @@ index 0000000..966edcf + }, + }) + -+ console.log('All done 🎉') ++ console.log('All done') +} + +run() @@ -152,12 +169,15 @@ index b7ee48a..102cb93 100644 export default algos diff --git a/src/index.ts b/src/index.ts -index c3bd006..1e7f0b5 100644 +index 7128525..40d985c 100644 --- a/src/index.ts +++ b/src/index.ts -@@ -24,6 +24,8 @@ const run = async () => { +@@ -22,8 +22,10 @@ const run = async () => { + }) + await server.start() console.log( - `🤖 running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`, +- `🤖 running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`, ++ `running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`, ) + console.log('Supported algos:', Object.keys(require('./algos').default)) + console.log('Publisher DID:', server.cfg.publisherDid) @@ -165,7 +185,7 @@ index c3bd006..1e7f0b5 100644 const maybeStr = (val?: string) => { diff --git a/src/methods/feed-generation.ts b/src/methods/feed-generation.ts -index b887413..34c5148 100644 +index 0f4989e..17be062 100644 --- a/src/methods/feed-generation.ts +++ b/src/methods/feed-generation.ts @@ -10,7 +10,7 @@ export default function (server: Server, ctx: AppContext) { @@ -177,18 +197,233 @@ index b887413..34c5148 100644 feedUri.collection !== 'app.bsky.feed.generator' || !algo ) { +diff --git a/src/server.ts b/src/server.ts +index c696749..9b9c382 100644 +--- a/src/server.ts ++++ b/src/server.ts +@@ -6,7 +6,7 @@ import { createServer } from './lexicon' + import feedGeneration from './methods/feed-generation' + import describeGenerator from './methods/describe-generator' + import { createDb, Database, migrateToLatest } from './db' +-import { FirehoseSubscription } from './subscription' ++import { JetstreamSubscription } from './subscription' + import { AppContext, Config } from './config' + import wellKnown from './well-known' + +@@ -14,25 +14,28 @@ export class FeedGenerator { + public app: express.Application + public server?: http.Server + public db: Database +- public firehose: FirehoseSubscription ++ public jetstream: JetstreamSubscription + public cfg: Config + + constructor( + app: express.Application, + db: Database, +- firehose: FirehoseSubscription, ++ jetstream: JetstreamSubscription, + cfg: Config, + ) { + this.app = app + this.db = db +- this.firehose = firehose ++ this.jetstream = jetstream + this.cfg = cfg + } + + static create(cfg: Config) { + const app = express() + const db = createDb(cfg.sqliteLocation) +- const firehose = new FirehoseSubscription(db, cfg.subscriptionEndpoint) ++ ++ // Use Jetstream URL from env or default to internal jetstream service ++ const jetstreamUrl = process.env.FEEDGEN_JETSTREAM_URL || 'ws://jetstream:6008/subscribe' ++ const jetstream = new JetstreamSubscription(db, jetstreamUrl, cfg.subscriptionReconnectDelay) + + const didCache = new MemoryCache() + const didResolver = new DidResolver({ +@@ -58,12 +61,12 @@ export class FeedGenerator { + app.use(server.xrpc.router) + app.use(wellKnown(ctx)) + +- return new FeedGenerator(app, db, firehose, cfg) ++ return new FeedGenerator(app, db, jetstream, cfg) + } + + async start(): Promise { + await migrateToLatest(this.db) +- this.firehose.run(this.cfg.subscriptionReconnectDelay) ++ this.jetstream.run() + this.server = this.app.listen(this.cfg.port, this.cfg.listenhost) + await events.once(this.server, 'listening') + return this.server diff --git a/src/subscription.ts b/src/subscription.ts -index 0422a03..d591ef9 100644 +index 0422a03..7785982 100644 --- a/src/subscription.ts +++ b/src/subscription.ts -@@ -19,10 +19,6 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase { +@@ -1,49 +1,126 @@ +-import { +- OutputSchema as RepoEvent, +- isCommit, +-} from './lexicon/types/com/atproto/sync/subscribeRepos' +-import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription' +- +-export class FirehoseSubscription extends FirehoseSubscriptionBase { +- async handleEvent(evt: RepoEvent) { +- if (!isCommit(evt)) return +- +- const ops = await getOpsByType(evt) +- +- // This logs the text of every post off the firehose. +- // Just for fun :) +- // Delete before actually using +- for (const post of ops.posts.creates) { +- console.log(post.record.text) ++import WebSocket from 'ws' ++import { Database } from './db' ++ ++// Jetstream event types ++interface JetstreamEvent { ++ did: string ++ time_us: number ++ kind: 'commit' | 'identity' | 'account' ++ commit?: { ++ rev: string ++ operation: 'create' | 'update' | 'delete' ++ collection: string ++ rkey: string ++ record?: { ++ $type: string ++ text?: string ++ createdAt?: string ++ [key: string]: unknown + } ++ cid?: string ++ } ++} - const postsToDelete = ops.posts.deletes.map((del) => del.uri) - const postsToCreate = ops.posts.creates +- const postsToDelete = ops.posts.deletes.map((del) => del.uri) +- const postsToCreate = ops.posts.creates - .filter((create) => { - // only alf-related posts - return create.record.text.toLowerCase().includes('alf') - }) - .map((create) => { - // map alf-related posts to a db row - return { +- .map((create) => { +- // map alf-related posts to a db row +- return { +- uri: create.uri, +- cid: create.cid, +- indexedAt: new Date().toISOString(), +- } +- }) ++export class JetstreamSubscription { ++ private ws: WebSocket | null = null ++ private cursor: number = 0 + +- if (postsToDelete.length > 0) { +- await this.db +- .deleteFrom('post') +- .where('uri', 'in', postsToDelete) +- .execute() ++ constructor( ++ public db: Database, ++ public jetstreamUrl: string, ++ public reconnectDelay: number = 3000 ++ ) {} ++ ++ async run() { ++ await this.loadCursor() ++ this.connect() ++ } ++ ++ private connect() { ++ const url = new URL(this.jetstreamUrl) ++ url.searchParams.set('wantedCollections', 'app.bsky.feed.post') ++ if (this.cursor > 0) { ++ url.searchParams.set('cursor', this.cursor.toString()) + } +- if (postsToCreate.length > 0) { ++ ++ console.log(`Connecting to Jetstream: ${url.toString()}`) ++ this.ws = new WebSocket(url.toString()) ++ ++ this.ws.on('open', () => { ++ console.log('Connected to Jetstream') ++ }) ++ ++ this.ws.on('message', async (data: WebSocket.Data) => { ++ try { ++ const event: JetstreamEvent = JSON.parse(data.toString()) ++ await this.handleEvent(event) ++ } catch (err) { ++ console.error('Failed to handle Jetstream message:', err) ++ } ++ }) ++ ++ this.ws.on('error', (err) => { ++ console.error('Jetstream WebSocket error:', err) ++ }) ++ ++ this.ws.on('close', () => { ++ console.log('Jetstream connection closed, reconnecting...') ++ setTimeout(() => this.connect(), this.reconnectDelay) ++ }) ++ } ++ ++ private async handleEvent(event: JetstreamEvent) { ++ if (event.kind !== 'commit' || !event.commit) return ++ if (event.commit.collection !== 'app.bsky.feed.post') return ++ ++ const uri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}` ++ ++ if (event.commit.operation === 'delete') { ++ await this.db.deleteFrom('post').where('uri', '=', uri).execute() ++ } else if (event.commit.operation === 'create' && event.commit.record) { ++ const text = event.commit.record.text || '' ++ ++ // Filter: posts starting with / or @ai ++ if (!text.match(/^\/[a-z]/) && !text.match(/^@ai/)) { ++ return ++ } ++ ++ console.log(`[post] ${event.did}: ${text.substring(0, 50)}...`) ++ + await this.db + .insertInto('post') +- .values(postsToCreate) ++ .values({ ++ uri: uri, ++ cid: event.commit.cid || '', ++ indexedAt: new Date().toISOString(), ++ }) + .onConflict((oc) => oc.doNothing()) + .execute() + } ++ ++ // Update cursor periodically ++ this.cursor = event.time_us ++ if (event.time_us % 20 === 0) { ++ await this.saveCursor() ++ } ++ } ++ ++ private async loadCursor() { ++ const res = await this.db ++ .selectFrom('sub_state') ++ .selectAll() ++ .where('service', '=', 'jetstream') ++ .executeTakeFirst() ++ if (res) { ++ this.cursor = res.cursor ++ } ++ } ++ ++ private async saveCursor() { ++ await this.db ++ .insertInto('sub_state') ++ .values({ service: 'jetstream', cursor: this.cursor }) ++ .onConflict((oc) => oc.column('service').doUpdateSet({ cursor: this.cursor })) ++ .execute() + } + }