ai/at
1
0
Files
at/patching/200-feed-generator-custom.patch
2025-12-11 10:26:26 +09:00

430 lines
13 KiB
Diff

diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..3c3629e
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1 @@
+node_modules
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..993c83d
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,11 @@
+FROM node:18-alpine
+
+WORKDIR /app
+
+COPY package.json yarn.lock ./
+RUN yarn install
+
+COPY . .
+
+EXPOSE 3000
+CMD ["yarn", "start"]
diff --git a/package.json b/package.json
index 1431a9e..6a7c33c 100644
--- a/package.json
+++ b/package.json
@@ -23,9 +23,11 @@
"dotenv": "^16.0.3",
"express": "^4.18.2",
"kysely": "^0.27.4",
- "multiformats": "^9.9.0"
+ "multiformats": "^9.9.0",
+ "ws": "^8.14.2"
},
"devDependencies": {
+ "@types/ws": "^8.5.10",
"@types/better-sqlite3": "^7.6.11",
"@types/express": "^4.17.17",
"@types/node": "^20.1.2",
diff --git a/scripts/publish.ts b/scripts/publish.ts
new file mode 100644
index 0000000..044f1d9
--- /dev/null
+++ b/scripts/publish.ts
@@ -0,0 +1,64 @@
+import dotenv from 'dotenv'
+import { AtpAgent, BlobRef } from '@atproto/api'
+import fs from 'fs/promises'
+import { ids } from '../src/lexicon/lexicons'
+
+const run = async () => {
+ dotenv.config()
+
+ const handle = process.env.FEEDGEN_HANDLE
+ const password = process.env.FEEDGEN_PASSWORD
+ const recordName = process.env.FEEDGEN_RECORD_NAME || 'app'
+ const displayName = process.env.FEEDGEN_DISPLAY_NAME || 'App Feed'
+ const description = process.env.FEEDGEN_DESCRIPTION || 'Automated App Feed'
+ const avatar = process.env.FEEDGEN_AVATAR
+ const service = process.env.FEEDGEN_SERVICE_URL || 'https://syu.is'
+
+ if (!handle || !password) {
+ throw new Error('Please provide FEEDGEN_HANDLE and FEEDGEN_PASSWORD environment variables')
+ }
+
+ if (!process.env.FEEDGEN_SERVICE_DID && !process.env.FEEDGEN_HOSTNAME) {
+ throw new Error('Please provide a hostname in the .env file')
+ }
+
+ const feedGenDid =
+ process.env.FEEDGEN_SERVICE_DID ?? `did:web:${process.env.FEEDGEN_HOSTNAME}`
+
+ const agent = new AtpAgent({ service })
+ await agent.login({ identifier: handle, password })
+
+ let avatarRef: BlobRef | undefined
+ if (avatar) {
+ let encoding: string
+ if (avatar.endsWith('png')) {
+ encoding = 'image/png'
+ } else if (avatar.endsWith('jpg') || avatar.endsWith('jpeg')) {
+ encoding = 'image/jpeg'
+ } else {
+ throw new Error('expected png or jpeg')
+ }
+ const img = await fs.readFile(avatar)
+ const blobRes = await agent.api.com.atproto.repo.uploadBlob(img, {
+ encoding,
+ })
+ avatarRef = blobRes.data.blob
+ }
+
+ await agent.api.com.atproto.repo.putRecord({
+ repo: agent.session?.did ?? '',
+ collection: ids.AppBskyFeedGenerator,
+ rkey: recordName,
+ record: {
+ did: feedGenDid,
+ displayName: displayName,
+ description: description,
+ avatar: avatarRef,
+ createdAt: new Date().toISOString(),
+ },
+ })
+
+ console.log('All done')
+}
+
+run()
diff --git a/src/algos/app.ts b/src/algos/app.ts
new file mode 100644
index 0000000..2376be9
--- /dev/null
+++ b/src/algos/app.ts
@@ -0,0 +1,35 @@
+import { QueryParams } from '../lexicon/types/app/bsky/feed/getFeedSkeleton'
+import { AppContext } from '../config'
+
+// max 15 chars
+export const shortname = 'app'
+
+export const handler = async (ctx: AppContext, params: QueryParams) => {
+ let builder = ctx.db
+ .selectFrom('post')
+ .selectAll()
+ .orderBy('indexedAt', 'desc')
+ .orderBy('cid', 'desc')
+ .limit(params.limit)
+
+ if (params.cursor) {
+ const timeStr = new Date(parseInt(params.cursor, 10)).toISOString()
+ builder = builder.where('post.indexedAt', '<', timeStr)
+ }
+ const res = await builder.execute()
+
+ const feed = res.map((row) => ({
+ post: row.uri,
+ }))
+
+ let cursor: string | undefined
+ const last = res.at(-1)
+ if (last) {
+ cursor = new Date(last.indexedAt).getTime().toString(10)
+ }
+
+ return {
+ cursor,
+ feed,
+ }
+}
diff --git a/src/algos/index.ts b/src/algos/index.ts
index b7ee48a..102cb93 100644
--- a/src/algos/index.ts
+++ b/src/algos/index.ts
@@ -4,11 +4,13 @@ import {
OutputSchema as AlgoOutput,
} from '../lexicon/types/app/bsky/feed/getFeedSkeleton'
import * as whatsAlf from './whats-alf'
+import * as app from './app'
type AlgoHandler = (ctx: AppContext, params: QueryParams) => Promise<AlgoOutput>
const algos: Record<string, AlgoHandler> = {
[whatsAlf.shortname]: whatsAlf.handler,
+ [app.shortname]: app.handler,
}
export default algos
diff --git a/src/index.ts b/src/index.ts
index 7128525..40d985c 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -22,8 +22,10 @@ const run = async () => {
})
await server.start()
console.log(
- `🤖 running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`,
+ `running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`,
)
+ console.log('Supported algos:', Object.keys(require('./algos').default))
+ console.log('Publisher DID:', server.cfg.publisherDid)
}
const maybeStr = (val?: string) => {
diff --git a/src/methods/feed-generation.ts b/src/methods/feed-generation.ts
index 0f4989e..17be062 100644
--- a/src/methods/feed-generation.ts
+++ b/src/methods/feed-generation.ts
@@ -10,7 +10,7 @@ export default function (server: Server, ctx: AppContext) {
const feedUri = new AtUri(params.feed)
const algo = algos[feedUri.rkey]
if (
- feedUri.hostname !== ctx.cfg.publisherDid ||
+ //feedUri.hostname !== ctx.cfg.publisherDid ||
feedUri.collection !== 'app.bsky.feed.generator' ||
!algo
) {
diff --git a/src/server.ts b/src/server.ts
index c696749..9b9c382 100644
--- a/src/server.ts
+++ b/src/server.ts
@@ -6,7 +6,7 @@ import { createServer } from './lexicon'
import feedGeneration from './methods/feed-generation'
import describeGenerator from './methods/describe-generator'
import { createDb, Database, migrateToLatest } from './db'
-import { FirehoseSubscription } from './subscription'
+import { JetstreamSubscription } from './subscription'
import { AppContext, Config } from './config'
import wellKnown from './well-known'
@@ -14,25 +14,28 @@ export class FeedGenerator {
public app: express.Application
public server?: http.Server
public db: Database
- public firehose: FirehoseSubscription
+ public jetstream: JetstreamSubscription
public cfg: Config
constructor(
app: express.Application,
db: Database,
- firehose: FirehoseSubscription,
+ jetstream: JetstreamSubscription,
cfg: Config,
) {
this.app = app
this.db = db
- this.firehose = firehose
+ this.jetstream = jetstream
this.cfg = cfg
}
static create(cfg: Config) {
const app = express()
const db = createDb(cfg.sqliteLocation)
- const firehose = new FirehoseSubscription(db, cfg.subscriptionEndpoint)
+
+ // Use Jetstream URL from env or default to internal jetstream service
+ const jetstreamUrl = process.env.FEEDGEN_JETSTREAM_URL || 'ws://jetstream:6008/subscribe'
+ const jetstream = new JetstreamSubscription(db, jetstreamUrl, cfg.subscriptionReconnectDelay)
const didCache = new MemoryCache()
const didResolver = new DidResolver({
@@ -58,12 +61,12 @@ export class FeedGenerator {
app.use(server.xrpc.router)
app.use(wellKnown(ctx))
- return new FeedGenerator(app, db, firehose, cfg)
+ return new FeedGenerator(app, db, jetstream, cfg)
}
async start(): Promise<http.Server> {
await migrateToLatest(this.db)
- this.firehose.run(this.cfg.subscriptionReconnectDelay)
+ this.jetstream.run()
this.server = this.app.listen(this.cfg.port, this.cfg.listenhost)
await events.once(this.server, 'listening')
return this.server
diff --git a/src/subscription.ts b/src/subscription.ts
index 0422a03..7785982 100644
--- a/src/subscription.ts
+++ b/src/subscription.ts
@@ -1,49 +1,126 @@
-import {
- OutputSchema as RepoEvent,
- isCommit,
-} from './lexicon/types/com/atproto/sync/subscribeRepos'
-import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription'
-
-export class FirehoseSubscription extends FirehoseSubscriptionBase {
- async handleEvent(evt: RepoEvent) {
- if (!isCommit(evt)) return
-
- const ops = await getOpsByType(evt)
-
- // This logs the text of every post off the firehose.
- // Just for fun :)
- // Delete before actually using
- for (const post of ops.posts.creates) {
- console.log(post.record.text)
+import WebSocket from 'ws'
+import { Database } from './db'
+
+// Jetstream event types
+interface JetstreamEvent {
+ did: string
+ time_us: number
+ kind: 'commit' | 'identity' | 'account'
+ commit?: {
+ rev: string
+ operation: 'create' | 'update' | 'delete'
+ collection: string
+ rkey: string
+ record?: {
+ $type: string
+ text?: string
+ createdAt?: string
+ [key: string]: unknown
}
+ cid?: string
+ }
+}
- const postsToDelete = ops.posts.deletes.map((del) => del.uri)
- const postsToCreate = ops.posts.creates
- .filter((create) => {
- // only alf-related posts
- return create.record.text.toLowerCase().includes('alf')
- })
- .map((create) => {
- // map alf-related posts to a db row
- return {
- uri: create.uri,
- cid: create.cid,
- indexedAt: new Date().toISOString(),
- }
- })
+export class JetstreamSubscription {
+ private ws: WebSocket | null = null
+ private cursor: number = 0
- if (postsToDelete.length > 0) {
- await this.db
- .deleteFrom('post')
- .where('uri', 'in', postsToDelete)
- .execute()
+ constructor(
+ public db: Database,
+ public jetstreamUrl: string,
+ public reconnectDelay: number = 3000
+ ) {}
+
+ async run() {
+ await this.loadCursor()
+ this.connect()
+ }
+
+ private connect() {
+ const url = new URL(this.jetstreamUrl)
+ url.searchParams.set('wantedCollections', 'app.bsky.feed.post')
+ if (this.cursor > 0) {
+ url.searchParams.set('cursor', this.cursor.toString())
}
- if (postsToCreate.length > 0) {
+
+ console.log(`Connecting to Jetstream: ${url.toString()}`)
+ this.ws = new WebSocket(url.toString())
+
+ this.ws.on('open', () => {
+ console.log('Connected to Jetstream')
+ })
+
+ this.ws.on('message', async (data: WebSocket.Data) => {
+ try {
+ const event: JetstreamEvent = JSON.parse(data.toString())
+ await this.handleEvent(event)
+ } catch (err) {
+ console.error('Failed to handle Jetstream message:', err)
+ }
+ })
+
+ this.ws.on('error', (err) => {
+ console.error('Jetstream WebSocket error:', err)
+ })
+
+ this.ws.on('close', () => {
+ console.log('Jetstream connection closed, reconnecting...')
+ setTimeout(() => this.connect(), this.reconnectDelay)
+ })
+ }
+
+ private async handleEvent(event: JetstreamEvent) {
+ if (event.kind !== 'commit' || !event.commit) return
+ if (event.commit.collection !== 'app.bsky.feed.post') return
+
+ const uri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`
+
+ if (event.commit.operation === 'delete') {
+ await this.db.deleteFrom('post').where('uri', '=', uri).execute()
+ } else if (event.commit.operation === 'create' && event.commit.record) {
+ const text = event.commit.record.text || ''
+
+ // Filter: posts starting with / or @ai
+ if (!text.match(/^\/[a-z]/) && !text.match(/^@ai/)) {
+ return
+ }
+
+ console.log(`[post] ${event.did}: ${text.substring(0, 50)}...`)
+
await this.db
.insertInto('post')
- .values(postsToCreate)
+ .values({
+ uri: uri,
+ cid: event.commit.cid || '',
+ indexedAt: new Date().toISOString(),
+ })
.onConflict((oc) => oc.doNothing())
.execute()
}
+
+ // Update cursor periodically
+ this.cursor = event.time_us
+ if (event.time_us % 20 === 0) {
+ await this.saveCursor()
+ }
+ }
+
+ private async loadCursor() {
+ const res = await this.db
+ .selectFrom('sub_state')
+ .selectAll()
+ .where('service', '=', 'jetstream')
+ .executeTakeFirst()
+ if (res) {
+ this.cursor = res.cursor
+ }
+ }
+
+ private async saveCursor() {
+ await this.db
+ .insertInto('sub_state')
+ .values({ service: 'jetstream', cursor: this.cursor })
+ .onConflict((oc) => oc.column('service').doUpdateSet({ cursor: this.cursor }))
+ .execute()
}
}