diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..3c3629e --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +node_modules diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..993c83d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM node:18-alpine + +WORKDIR /app + +COPY package.json yarn.lock ./ +RUN yarn install + +COPY . . + +EXPOSE 3000 +CMD ["yarn", "start"] diff --git a/package.json b/package.json index 1431a9e..6a7c33c 100644 --- a/package.json +++ b/package.json @@ -23,9 +23,11 @@ "dotenv": "^16.0.3", "express": "^4.18.2", "kysely": "^0.27.4", - "multiformats": "^9.9.0" + "multiformats": "^9.9.0", + "ws": "^8.14.2" }, "devDependencies": { + "@types/ws": "^8.5.10", "@types/better-sqlite3": "^7.6.11", "@types/express": "^4.17.17", "@types/node": "^20.1.2", diff --git a/scripts/publish.ts b/scripts/publish.ts new file mode 100644 index 0000000..044f1d9 --- /dev/null +++ b/scripts/publish.ts @@ -0,0 +1,64 @@ +import dotenv from 'dotenv' +import { AtpAgent, BlobRef } from '@atproto/api' +import fs from 'fs/promises' +import { ids } from '../src/lexicon/lexicons' + +const run = async () => { + dotenv.config() + + const handle = process.env.FEEDGEN_HANDLE + const password = process.env.FEEDGEN_PASSWORD + const recordName = process.env.FEEDGEN_RECORD_NAME || 'app' + const displayName = process.env.FEEDGEN_DISPLAY_NAME || 'App Feed' + const description = process.env.FEEDGEN_DESCRIPTION || 'Automated App Feed' + const avatar = process.env.FEEDGEN_AVATAR + const service = process.env.FEEDGEN_SERVICE_URL || 'https://syu.is' + + if (!handle || !password) { + throw new Error('Please provide FEEDGEN_HANDLE and FEEDGEN_PASSWORD environment variables') + } + + if (!process.env.FEEDGEN_SERVICE_DID && !process.env.FEEDGEN_HOSTNAME) { + throw new Error('Please provide a hostname in the .env file') + } + + const feedGenDid = + process.env.FEEDGEN_SERVICE_DID ?? `did:web:${process.env.FEEDGEN_HOSTNAME}` + + const agent = new AtpAgent({ service }) + await agent.login({ identifier: handle, password }) + + let avatarRef: BlobRef | undefined + if (avatar) { + let encoding: string + if (avatar.endsWith('png')) { + encoding = 'image/png' + } else if (avatar.endsWith('jpg') || avatar.endsWith('jpeg')) { + encoding = 'image/jpeg' + } else { + throw new Error('expected png or jpeg') + } + const img = await fs.readFile(avatar) + const blobRes = await agent.api.com.atproto.repo.uploadBlob(img, { + encoding, + }) + avatarRef = blobRes.data.blob + } + + await agent.api.com.atproto.repo.putRecord({ + repo: agent.session?.did ?? '', + collection: ids.AppBskyFeedGenerator, + rkey: recordName, + record: { + did: feedGenDid, + displayName: displayName, + description: description, + avatar: avatarRef, + createdAt: new Date().toISOString(), + }, + }) + + console.log('All done') +} + +run() diff --git a/src/algos/app.ts b/src/algos/app.ts new file mode 100644 index 0000000..2376be9 --- /dev/null +++ b/src/algos/app.ts @@ -0,0 +1,35 @@ +import { QueryParams } from '../lexicon/types/app/bsky/feed/getFeedSkeleton' +import { AppContext } from '../config' + +// max 15 chars +export const shortname = 'app' + +export const handler = async (ctx: AppContext, params: QueryParams) => { + let builder = ctx.db + .selectFrom('post') + .selectAll() + .orderBy('indexedAt', 'desc') + .orderBy('cid', 'desc') + .limit(params.limit) + + if (params.cursor) { + const timeStr = new Date(parseInt(params.cursor, 10)).toISOString() + builder = builder.where('post.indexedAt', '<', timeStr) + } + const res = await builder.execute() + + const feed = res.map((row) => ({ + post: row.uri, + })) + + let cursor: string | undefined + const last = res.at(-1) + if (last) { + cursor = new Date(last.indexedAt).getTime().toString(10) + } + + return { + cursor, + feed, + } +} diff --git a/src/algos/index.ts b/src/algos/index.ts index b7ee48a..102cb93 100644 --- a/src/algos/index.ts +++ b/src/algos/index.ts @@ -4,11 +4,13 @@ import { OutputSchema as AlgoOutput, } from '../lexicon/types/app/bsky/feed/getFeedSkeleton' import * as whatsAlf from './whats-alf' +import * as app from './app' type AlgoHandler = (ctx: AppContext, params: QueryParams) => Promise const algos: Record = { [whatsAlf.shortname]: whatsAlf.handler, + [app.shortname]: app.handler, } export default algos diff --git a/src/index.ts b/src/index.ts index 7128525..40d985c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,8 +22,10 @@ const run = async () => { }) await server.start() console.log( - `🤖 running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`, + `running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`, ) + console.log('Supported algos:', Object.keys(require('./algos').default)) + console.log('Publisher DID:', server.cfg.publisherDid) } const maybeStr = (val?: string) => { diff --git a/src/methods/feed-generation.ts b/src/methods/feed-generation.ts index 0f4989e..17be062 100644 --- a/src/methods/feed-generation.ts +++ b/src/methods/feed-generation.ts @@ -10,7 +10,7 @@ export default function (server: Server, ctx: AppContext) { const feedUri = new AtUri(params.feed) const algo = algos[feedUri.rkey] if ( - feedUri.hostname !== ctx.cfg.publisherDid || + //feedUri.hostname !== ctx.cfg.publisherDid || feedUri.collection !== 'app.bsky.feed.generator' || !algo ) { diff --git a/src/server.ts b/src/server.ts index c696749..9b9c382 100644 --- a/src/server.ts +++ b/src/server.ts @@ -6,7 +6,7 @@ import { createServer } from './lexicon' import feedGeneration from './methods/feed-generation' import describeGenerator from './methods/describe-generator' import { createDb, Database, migrateToLatest } from './db' -import { FirehoseSubscription } from './subscription' +import { JetstreamSubscription } from './subscription' import { AppContext, Config } from './config' import wellKnown from './well-known' @@ -14,25 +14,28 @@ export class FeedGenerator { public app: express.Application public server?: http.Server public db: Database - public firehose: FirehoseSubscription + public jetstream: JetstreamSubscription public cfg: Config constructor( app: express.Application, db: Database, - firehose: FirehoseSubscription, + jetstream: JetstreamSubscription, cfg: Config, ) { this.app = app this.db = db - this.firehose = firehose + this.jetstream = jetstream this.cfg = cfg } static create(cfg: Config) { const app = express() const db = createDb(cfg.sqliteLocation) - const firehose = new FirehoseSubscription(db, cfg.subscriptionEndpoint) + + // Use Jetstream URL from env or default to internal jetstream service + const jetstreamUrl = process.env.FEEDGEN_JETSTREAM_URL || 'ws://jetstream:6008/subscribe' + const jetstream = new JetstreamSubscription(db, jetstreamUrl, cfg.subscriptionReconnectDelay) const didCache = new MemoryCache() const didResolver = new DidResolver({ @@ -58,12 +61,12 @@ export class FeedGenerator { app.use(server.xrpc.router) app.use(wellKnown(ctx)) - return new FeedGenerator(app, db, firehose, cfg) + return new FeedGenerator(app, db, jetstream, cfg) } async start(): Promise { await migrateToLatest(this.db) - this.firehose.run(this.cfg.subscriptionReconnectDelay) + this.jetstream.run() this.server = this.app.listen(this.cfg.port, this.cfg.listenhost) await events.once(this.server, 'listening') return this.server diff --git a/src/subscription.ts b/src/subscription.ts index 0422a03..7785982 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -1,49 +1,126 @@ -import { - OutputSchema as RepoEvent, - isCommit, -} from './lexicon/types/com/atproto/sync/subscribeRepos' -import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription' - -export class FirehoseSubscription extends FirehoseSubscriptionBase { - async handleEvent(evt: RepoEvent) { - if (!isCommit(evt)) return - - const ops = await getOpsByType(evt) - - // This logs the text of every post off the firehose. - // Just for fun :) - // Delete before actually using - for (const post of ops.posts.creates) { - console.log(post.record.text) +import WebSocket from 'ws' +import { Database } from './db' + +// Jetstream event types +interface JetstreamEvent { + did: string + time_us: number + kind: 'commit' | 'identity' | 'account' + commit?: { + rev: string + operation: 'create' | 'update' | 'delete' + collection: string + rkey: string + record?: { + $type: string + text?: string + createdAt?: string + [key: string]: unknown } + cid?: string + } +} - const postsToDelete = ops.posts.deletes.map((del) => del.uri) - const postsToCreate = ops.posts.creates - .filter((create) => { - // only alf-related posts - return create.record.text.toLowerCase().includes('alf') - }) - .map((create) => { - // map alf-related posts to a db row - return { - uri: create.uri, - cid: create.cid, - indexedAt: new Date().toISOString(), - } - }) +export class JetstreamSubscription { + private ws: WebSocket | null = null + private cursor: number = 0 - if (postsToDelete.length > 0) { - await this.db - .deleteFrom('post') - .where('uri', 'in', postsToDelete) - .execute() + constructor( + public db: Database, + public jetstreamUrl: string, + public reconnectDelay: number = 3000 + ) {} + + async run() { + await this.loadCursor() + this.connect() + } + + private connect() { + const url = new URL(this.jetstreamUrl) + url.searchParams.set('wantedCollections', 'app.bsky.feed.post') + if (this.cursor > 0) { + url.searchParams.set('cursor', this.cursor.toString()) } - if (postsToCreate.length > 0) { + + console.log(`Connecting to Jetstream: ${url.toString()}`) + this.ws = new WebSocket(url.toString()) + + this.ws.on('open', () => { + console.log('Connected to Jetstream') + }) + + this.ws.on('message', async (data: WebSocket.Data) => { + try { + const event: JetstreamEvent = JSON.parse(data.toString()) + await this.handleEvent(event) + } catch (err) { + console.error('Failed to handle Jetstream message:', err) + } + }) + + this.ws.on('error', (err) => { + console.error('Jetstream WebSocket error:', err) + }) + + this.ws.on('close', () => { + console.log('Jetstream connection closed, reconnecting...') + setTimeout(() => this.connect(), this.reconnectDelay) + }) + } + + private async handleEvent(event: JetstreamEvent) { + if (event.kind !== 'commit' || !event.commit) return + if (event.commit.collection !== 'app.bsky.feed.post') return + + const uri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}` + + if (event.commit.operation === 'delete') { + await this.db.deleteFrom('post').where('uri', '=', uri).execute() + } else if (event.commit.operation === 'create' && event.commit.record) { + const text = event.commit.record.text || '' + + // Filter: posts starting with / or @ai + if (!text.match(/^\/[a-z]/) && !text.match(/^@ai/)) { + return + } + + console.log(`[post] ${event.did}: ${text.substring(0, 50)}...`) + await this.db .insertInto('post') - .values(postsToCreate) + .values({ + uri: uri, + cid: event.commit.cid || '', + indexedAt: new Date().toISOString(), + }) .onConflict((oc) => oc.doNothing()) .execute() } + + // Update cursor periodically + this.cursor = event.time_us + if (event.time_us % 20 === 0) { + await this.saveCursor() + } + } + + private async loadCursor() { + const res = await this.db + .selectFrom('sub_state') + .selectAll() + .where('service', '=', 'jetstream') + .executeTakeFirst() + if (res) { + this.cursor = res.cursor + } + } + + private async saveCursor() { + await this.db + .insertInto('sub_state') + .values({ service: 'jetstream', cursor: this.cursor }) + .onConflict((oc) => oc.column('service').doUpdateSet({ cursor: this.cursor })) + .execute() } }