ai/at
1
0
This commit is contained in:
2025-12-11 09:38:33 +09:00
parent 3b83ae1e8a
commit 744f1d2806
3 changed files with 251 additions and 16 deletions

View File

@@ -22,14 +22,31 @@ index 0000000..993c83d
+
+EXPOSE 3000
+CMD ["yarn", "start"]
diff --git a/package.json b/package.json
index 1431a9e..6a7c33c 100644
--- a/package.json
+++ b/package.json
@@ -23,9 +23,11 @@
"dotenv": "^16.0.3",
"express": "^4.18.2",
"kysely": "^0.27.4",
- "multiformats": "^9.9.0"
+ "multiformats": "^9.9.0",
+ "ws": "^8.14.2"
},
"devDependencies": {
+ "@types/ws": "^8.5.10",
"@types/better-sqlite3": "^7.6.11",
"@types/express": "^4.17.17",
"@types/node": "^20.1.2",
diff --git a/scripts/publish.ts b/scripts/publish.ts
new file mode 100644
index 0000000..966edcf
index 0000000..044f1d9
--- /dev/null
+++ b/scripts/publish.ts
@@ -0,0 +1,64 @@
+import dotenv from 'dotenv'
+import { AtpAgent, BlobRef, AppBskyFeedDefs } from '@atproto/api'
+import { AtpAgent, BlobRef } from '@atproto/api'
+import fs from 'fs/promises'
+import { ids } from '../src/lexicon/lexicons'
+
@@ -88,7 +105,7 @@ index 0000000..966edcf
+ },
+ })
+
+ console.log('All done 🎉')
+ console.log('All done')
+}
+
+run()
@@ -152,12 +169,15 @@ index b7ee48a..102cb93 100644
export default algos
diff --git a/src/index.ts b/src/index.ts
index c3bd006..1e7f0b5 100644
index 7128525..40d985c 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -24,6 +24,8 @@ const run = async () => {
@@ -22,8 +22,10 @@ const run = async () => {
})
await server.start()
console.log(
`🤖 running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`,
- `🤖 running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`,
+ `running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`,
)
+ console.log('Supported algos:', Object.keys(require('./algos').default))
+ console.log('Publisher DID:', server.cfg.publisherDid)
@@ -165,7 +185,7 @@ index c3bd006..1e7f0b5 100644
const maybeStr = (val?: string) => {
diff --git a/src/methods/feed-generation.ts b/src/methods/feed-generation.ts
index b887413..34c5148 100644
index 0f4989e..17be062 100644
--- a/src/methods/feed-generation.ts
+++ b/src/methods/feed-generation.ts
@@ -10,7 +10,7 @@ export default function (server: Server, ctx: AppContext) {
@@ -177,18 +197,233 @@ index b887413..34c5148 100644
feedUri.collection !== 'app.bsky.feed.generator' ||
!algo
) {
diff --git a/src/server.ts b/src/server.ts
index c696749..9b9c382 100644
--- a/src/server.ts
+++ b/src/server.ts
@@ -6,7 +6,7 @@ import { createServer } from './lexicon'
import feedGeneration from './methods/feed-generation'
import describeGenerator from './methods/describe-generator'
import { createDb, Database, migrateToLatest } from './db'
-import { FirehoseSubscription } from './subscription'
+import { JetstreamSubscription } from './subscription'
import { AppContext, Config } from './config'
import wellKnown from './well-known'
@@ -14,25 +14,28 @@ export class FeedGenerator {
public app: express.Application
public server?: http.Server
public db: Database
- public firehose: FirehoseSubscription
+ public jetstream: JetstreamSubscription
public cfg: Config
constructor(
app: express.Application,
db: Database,
- firehose: FirehoseSubscription,
+ jetstream: JetstreamSubscription,
cfg: Config,
) {
this.app = app
this.db = db
- this.firehose = firehose
+ this.jetstream = jetstream
this.cfg = cfg
}
static create(cfg: Config) {
const app = express()
const db = createDb(cfg.sqliteLocation)
- const firehose = new FirehoseSubscription(db, cfg.subscriptionEndpoint)
+
+ // Use Jetstream URL from env or default to internal jetstream service
+ const jetstreamUrl = process.env.FEEDGEN_JETSTREAM_URL || 'ws://jetstream:6008/subscribe'
+ const jetstream = new JetstreamSubscription(db, jetstreamUrl, cfg.subscriptionReconnectDelay)
const didCache = new MemoryCache()
const didResolver = new DidResolver({
@@ -58,12 +61,12 @@ export class FeedGenerator {
app.use(server.xrpc.router)
app.use(wellKnown(ctx))
- return new FeedGenerator(app, db, firehose, cfg)
+ return new FeedGenerator(app, db, jetstream, cfg)
}
async start(): Promise<http.Server> {
await migrateToLatest(this.db)
- this.firehose.run(this.cfg.subscriptionReconnectDelay)
+ this.jetstream.run()
this.server = this.app.listen(this.cfg.port, this.cfg.listenhost)
await events.once(this.server, 'listening')
return this.server
diff --git a/src/subscription.ts b/src/subscription.ts
index 0422a03..d591ef9 100644
index 0422a03..7785982 100644
--- a/src/subscription.ts
+++ b/src/subscription.ts
@@ -19,10 +19,6 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase {
@@ -1,49 +1,126 @@
-import {
- OutputSchema as RepoEvent,
- isCommit,
-} from './lexicon/types/com/atproto/sync/subscribeRepos'
-import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription'
-
-export class FirehoseSubscription extends FirehoseSubscriptionBase {
- async handleEvent(evt: RepoEvent) {
- if (!isCommit(evt)) return
-
- const ops = await getOpsByType(evt)
-
- // This logs the text of every post off the firehose.
- // Just for fun :)
- // Delete before actually using
- for (const post of ops.posts.creates) {
- console.log(post.record.text)
+import WebSocket from 'ws'
+import { Database } from './db'
+
+// Jetstream event types
+interface JetstreamEvent {
+ did: string
+ time_us: number
+ kind: 'commit' | 'identity' | 'account'
+ commit?: {
+ rev: string
+ operation: 'create' | 'update' | 'delete'
+ collection: string
+ rkey: string
+ record?: {
+ $type: string
+ text?: string
+ createdAt?: string
+ [key: string]: unknown
}
+ cid?: string
+ }
+}
const postsToDelete = ops.posts.deletes.map((del) => del.uri)
const postsToCreate = ops.posts.creates
- const postsToDelete = ops.posts.deletes.map((del) => del.uri)
- const postsToCreate = ops.posts.creates
- .filter((create) => {
- // only alf-related posts
- return create.record.text.toLowerCase().includes('alf')
- })
.map((create) => {
// map alf-related posts to a db row
return {
- .map((create) => {
- // map alf-related posts to a db row
- return {
- uri: create.uri,
- cid: create.cid,
- indexedAt: new Date().toISOString(),
- }
- })
+export class JetstreamSubscription {
+ private ws: WebSocket | null = null
+ private cursor: number = 0
- if (postsToDelete.length > 0) {
- await this.db
- .deleteFrom('post')
- .where('uri', 'in', postsToDelete)
- .execute()
+ constructor(
+ public db: Database,
+ public jetstreamUrl: string,
+ public reconnectDelay: number = 3000
+ ) {}
+
+ async run() {
+ await this.loadCursor()
+ this.connect()
+ }
+
+ private connect() {
+ const url = new URL(this.jetstreamUrl)
+ url.searchParams.set('wantedCollections', 'app.bsky.feed.post')
+ if (this.cursor > 0) {
+ url.searchParams.set('cursor', this.cursor.toString())
}
- if (postsToCreate.length > 0) {
+
+ console.log(`Connecting to Jetstream: ${url.toString()}`)
+ this.ws = new WebSocket(url.toString())
+
+ this.ws.on('open', () => {
+ console.log('Connected to Jetstream')
+ })
+
+ this.ws.on('message', async (data: WebSocket.Data) => {
+ try {
+ const event: JetstreamEvent = JSON.parse(data.toString())
+ await this.handleEvent(event)
+ } catch (err) {
+ console.error('Failed to handle Jetstream message:', err)
+ }
+ })
+
+ this.ws.on('error', (err) => {
+ console.error('Jetstream WebSocket error:', err)
+ })
+
+ this.ws.on('close', () => {
+ console.log('Jetstream connection closed, reconnecting...')
+ setTimeout(() => this.connect(), this.reconnectDelay)
+ })
+ }
+
+ private async handleEvent(event: JetstreamEvent) {
+ if (event.kind !== 'commit' || !event.commit) return
+ if (event.commit.collection !== 'app.bsky.feed.post') return
+
+ const uri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`
+
+ if (event.commit.operation === 'delete') {
+ await this.db.deleteFrom('post').where('uri', '=', uri).execute()
+ } else if (event.commit.operation === 'create' && event.commit.record) {
+ const text = event.commit.record.text || ''
+
+ // Filter: posts starting with / or @ai
+ if (!text.match(/^\/[a-z]/) && !text.match(/^@ai/)) {
+ return
+ }
+
+ console.log(`[post] ${event.did}: ${text.substring(0, 50)}...`)
+
await this.db
.insertInto('post')
- .values(postsToCreate)
+ .values({
+ uri: uri,
+ cid: event.commit.cid || '',
+ indexedAt: new Date().toISOString(),
+ })
.onConflict((oc) => oc.doNothing())
.execute()
}
+
+ // Update cursor periodically
+ this.cursor = event.time_us
+ if (event.time_us % 20 === 0) {
+ await this.saveCursor()
+ }
+ }
+
+ private async loadCursor() {
+ const res = await this.db
+ .selectFrom('sub_state')
+ .selectAll()
+ .where('service', '=', 'jetstream')
+ .executeTakeFirst()
+ if (res) {
+ this.cursor = res.cursor
+ }
+ }
+
+ private async saveCursor() {
+ await this.db
+ .insertInto('sub_state')
+ .values({ service: 'jetstream', cursor: this.cursor })
+ .onConflict((oc) => oc.column('service').doUpdateSet({ cursor: this.cursor }))
+ .execute()
}
}