430 lines
13 KiB
Diff
430 lines
13 KiB
Diff
diff --git a/.dockerignore b/.dockerignore
|
|
new file mode 100644
|
|
index 0000000..3c3629e
|
|
--- /dev/null
|
|
+++ b/.dockerignore
|
|
@@ -0,0 +1 @@
|
|
+node_modules
|
|
diff --git a/Dockerfile b/Dockerfile
|
|
new file mode 100644
|
|
index 0000000..993c83d
|
|
--- /dev/null
|
|
+++ b/Dockerfile
|
|
@@ -0,0 +1,11 @@
|
|
+FROM node:18-alpine
|
|
+
|
|
+WORKDIR /app
|
|
+
|
|
+COPY package.json yarn.lock ./
|
|
+RUN yarn install
|
|
+
|
|
+COPY . .
|
|
+
|
|
+EXPOSE 3000
|
|
+CMD ["yarn", "start"]
|
|
diff --git a/package.json b/package.json
|
|
index 1431a9e..6a7c33c 100644
|
|
--- a/package.json
|
|
+++ b/package.json
|
|
@@ -23,9 +23,11 @@
|
|
"dotenv": "^16.0.3",
|
|
"express": "^4.18.2",
|
|
"kysely": "^0.27.4",
|
|
- "multiformats": "^9.9.0"
|
|
+ "multiformats": "^9.9.0",
|
|
+ "ws": "^8.14.2"
|
|
},
|
|
"devDependencies": {
|
|
+ "@types/ws": "^8.5.10",
|
|
"@types/better-sqlite3": "^7.6.11",
|
|
"@types/express": "^4.17.17",
|
|
"@types/node": "^20.1.2",
|
|
diff --git a/scripts/publish.ts b/scripts/publish.ts
|
|
new file mode 100644
|
|
index 0000000..044f1d9
|
|
--- /dev/null
|
|
+++ b/scripts/publish.ts
|
|
@@ -0,0 +1,64 @@
|
|
+import dotenv from 'dotenv'
|
|
+import { AtpAgent, BlobRef } from '@atproto/api'
|
|
+import fs from 'fs/promises'
|
|
+import { ids } from '../src/lexicon/lexicons'
|
|
+
|
|
+const run = async () => {
|
|
+ dotenv.config()
|
|
+
|
|
+ const handle = process.env.FEEDGEN_HANDLE
|
|
+ const password = process.env.FEEDGEN_PASSWORD
|
|
+ const recordName = process.env.FEEDGEN_RECORD_NAME || 'app'
|
|
+ const displayName = process.env.FEEDGEN_DISPLAY_NAME || 'App Feed'
|
|
+ const description = process.env.FEEDGEN_DESCRIPTION || 'Automated App Feed'
|
|
+ const avatar = process.env.FEEDGEN_AVATAR
|
|
+ const service = process.env.FEEDGEN_SERVICE_URL || 'https://syu.is'
|
|
+
|
|
+ if (!handle || !password) {
|
|
+ throw new Error('Please provide FEEDGEN_HANDLE and FEEDGEN_PASSWORD environment variables')
|
|
+ }
|
|
+
|
|
+ if (!process.env.FEEDGEN_SERVICE_DID && !process.env.FEEDGEN_HOSTNAME) {
|
|
+ throw new Error('Please provide a hostname in the .env file')
|
|
+ }
|
|
+
|
|
+ const feedGenDid =
|
|
+ process.env.FEEDGEN_SERVICE_DID ?? `did:web:${process.env.FEEDGEN_HOSTNAME}`
|
|
+
|
|
+ const agent = new AtpAgent({ service })
|
|
+ await agent.login({ identifier: handle, password })
|
|
+
|
|
+ let avatarRef: BlobRef | undefined
|
|
+ if (avatar) {
|
|
+ let encoding: string
|
|
+ if (avatar.endsWith('png')) {
|
|
+ encoding = 'image/png'
|
|
+ } else if (avatar.endsWith('jpg') || avatar.endsWith('jpeg')) {
|
|
+ encoding = 'image/jpeg'
|
|
+ } else {
|
|
+ throw new Error('expected png or jpeg')
|
|
+ }
|
|
+ const img = await fs.readFile(avatar)
|
|
+ const blobRes = await agent.api.com.atproto.repo.uploadBlob(img, {
|
|
+ encoding,
|
|
+ })
|
|
+ avatarRef = blobRes.data.blob
|
|
+ }
|
|
+
|
|
+ await agent.api.com.atproto.repo.putRecord({
|
|
+ repo: agent.session?.did ?? '',
|
|
+ collection: ids.AppBskyFeedGenerator,
|
|
+ rkey: recordName,
|
|
+ record: {
|
|
+ did: feedGenDid,
|
|
+ displayName: displayName,
|
|
+ description: description,
|
|
+ avatar: avatarRef,
|
|
+ createdAt: new Date().toISOString(),
|
|
+ },
|
|
+ })
|
|
+
|
|
+ console.log('All done')
|
|
+}
|
|
+
|
|
+run()
|
|
diff --git a/src/algos/app.ts b/src/algos/app.ts
|
|
new file mode 100644
|
|
index 0000000..2376be9
|
|
--- /dev/null
|
|
+++ b/src/algos/app.ts
|
|
@@ -0,0 +1,35 @@
|
|
+import { QueryParams } from '../lexicon/types/app/bsky/feed/getFeedSkeleton'
|
|
+import { AppContext } from '../config'
|
|
+
|
|
+// max 15 chars
|
|
+export const shortname = 'app'
|
|
+
|
|
+export const handler = async (ctx: AppContext, params: QueryParams) => {
|
|
+ let builder = ctx.db
|
|
+ .selectFrom('post')
|
|
+ .selectAll()
|
|
+ .orderBy('indexedAt', 'desc')
|
|
+ .orderBy('cid', 'desc')
|
|
+ .limit(params.limit)
|
|
+
|
|
+ if (params.cursor) {
|
|
+ const timeStr = new Date(parseInt(params.cursor, 10)).toISOString()
|
|
+ builder = builder.where('post.indexedAt', '<', timeStr)
|
|
+ }
|
|
+ const res = await builder.execute()
|
|
+
|
|
+ const feed = res.map((row) => ({
|
|
+ post: row.uri,
|
|
+ }))
|
|
+
|
|
+ let cursor: string | undefined
|
|
+ const last = res.at(-1)
|
|
+ if (last) {
|
|
+ cursor = new Date(last.indexedAt).getTime().toString(10)
|
|
+ }
|
|
+
|
|
+ return {
|
|
+ cursor,
|
|
+ feed,
|
|
+ }
|
|
+}
|
|
diff --git a/src/algos/index.ts b/src/algos/index.ts
|
|
index b7ee48a..102cb93 100644
|
|
--- a/src/algos/index.ts
|
|
+++ b/src/algos/index.ts
|
|
@@ -4,11 +4,13 @@ import {
|
|
OutputSchema as AlgoOutput,
|
|
} from '../lexicon/types/app/bsky/feed/getFeedSkeleton'
|
|
import * as whatsAlf from './whats-alf'
|
|
+import * as app from './app'
|
|
|
|
type AlgoHandler = (ctx: AppContext, params: QueryParams) => Promise<AlgoOutput>
|
|
|
|
const algos: Record<string, AlgoHandler> = {
|
|
[whatsAlf.shortname]: whatsAlf.handler,
|
|
+ [app.shortname]: app.handler,
|
|
}
|
|
|
|
export default algos
|
|
diff --git a/src/index.ts b/src/index.ts
|
|
index 7128525..40d985c 100644
|
|
--- a/src/index.ts
|
|
+++ b/src/index.ts
|
|
@@ -22,8 +22,10 @@ const run = async () => {
|
|
})
|
|
await server.start()
|
|
console.log(
|
|
- `🤖 running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`,
|
|
+ `running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`,
|
|
)
|
|
+ console.log('Supported algos:', Object.keys(require('./algos').default))
|
|
+ console.log('Publisher DID:', server.cfg.publisherDid)
|
|
}
|
|
|
|
const maybeStr = (val?: string) => {
|
|
diff --git a/src/methods/feed-generation.ts b/src/methods/feed-generation.ts
|
|
index 0f4989e..17be062 100644
|
|
--- a/src/methods/feed-generation.ts
|
|
+++ b/src/methods/feed-generation.ts
|
|
@@ -10,7 +10,7 @@ export default function (server: Server, ctx: AppContext) {
|
|
const feedUri = new AtUri(params.feed)
|
|
const algo = algos[feedUri.rkey]
|
|
if (
|
|
- feedUri.hostname !== ctx.cfg.publisherDid ||
|
|
+ //feedUri.hostname !== ctx.cfg.publisherDid ||
|
|
feedUri.collection !== 'app.bsky.feed.generator' ||
|
|
!algo
|
|
) {
|
|
diff --git a/src/server.ts b/src/server.ts
|
|
index c696749..9b9c382 100644
|
|
--- a/src/server.ts
|
|
+++ b/src/server.ts
|
|
@@ -6,7 +6,7 @@ import { createServer } from './lexicon'
|
|
import feedGeneration from './methods/feed-generation'
|
|
import describeGenerator from './methods/describe-generator'
|
|
import { createDb, Database, migrateToLatest } from './db'
|
|
-import { FirehoseSubscription } from './subscription'
|
|
+import { JetstreamSubscription } from './subscription'
|
|
import { AppContext, Config } from './config'
|
|
import wellKnown from './well-known'
|
|
|
|
@@ -14,25 +14,28 @@ export class FeedGenerator {
|
|
public app: express.Application
|
|
public server?: http.Server
|
|
public db: Database
|
|
- public firehose: FirehoseSubscription
|
|
+ public jetstream: JetstreamSubscription
|
|
public cfg: Config
|
|
|
|
constructor(
|
|
app: express.Application,
|
|
db: Database,
|
|
- firehose: FirehoseSubscription,
|
|
+ jetstream: JetstreamSubscription,
|
|
cfg: Config,
|
|
) {
|
|
this.app = app
|
|
this.db = db
|
|
- this.firehose = firehose
|
|
+ this.jetstream = jetstream
|
|
this.cfg = cfg
|
|
}
|
|
|
|
static create(cfg: Config) {
|
|
const app = express()
|
|
const db = createDb(cfg.sqliteLocation)
|
|
- const firehose = new FirehoseSubscription(db, cfg.subscriptionEndpoint)
|
|
+
|
|
+ // Use Jetstream URL from env or default to internal jetstream service
|
|
+ const jetstreamUrl = process.env.FEEDGEN_JETSTREAM_URL || 'ws://jetstream:6008/subscribe'
|
|
+ const jetstream = new JetstreamSubscription(db, jetstreamUrl, cfg.subscriptionReconnectDelay)
|
|
|
|
const didCache = new MemoryCache()
|
|
const didResolver = new DidResolver({
|
|
@@ -58,12 +61,12 @@ export class FeedGenerator {
|
|
app.use(server.xrpc.router)
|
|
app.use(wellKnown(ctx))
|
|
|
|
- return new FeedGenerator(app, db, firehose, cfg)
|
|
+ return new FeedGenerator(app, db, jetstream, cfg)
|
|
}
|
|
|
|
async start(): Promise<http.Server> {
|
|
await migrateToLatest(this.db)
|
|
- this.firehose.run(this.cfg.subscriptionReconnectDelay)
|
|
+ this.jetstream.run()
|
|
this.server = this.app.listen(this.cfg.port, this.cfg.listenhost)
|
|
await events.once(this.server, 'listening')
|
|
return this.server
|
|
diff --git a/src/subscription.ts b/src/subscription.ts
|
|
index 0422a03..7785982 100644
|
|
--- a/src/subscription.ts
|
|
+++ b/src/subscription.ts
|
|
@@ -1,49 +1,126 @@
|
|
-import {
|
|
- OutputSchema as RepoEvent,
|
|
- isCommit,
|
|
-} from './lexicon/types/com/atproto/sync/subscribeRepos'
|
|
-import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription'
|
|
-
|
|
-export class FirehoseSubscription extends FirehoseSubscriptionBase {
|
|
- async handleEvent(evt: RepoEvent) {
|
|
- if (!isCommit(evt)) return
|
|
-
|
|
- const ops = await getOpsByType(evt)
|
|
-
|
|
- // This logs the text of every post off the firehose.
|
|
- // Just for fun :)
|
|
- // Delete before actually using
|
|
- for (const post of ops.posts.creates) {
|
|
- console.log(post.record.text)
|
|
+import WebSocket from 'ws'
|
|
+import { Database } from './db'
|
|
+
|
|
+// Jetstream event types
|
|
+interface JetstreamEvent {
|
|
+ did: string
|
|
+ time_us: number
|
|
+ kind: 'commit' | 'identity' | 'account'
|
|
+ commit?: {
|
|
+ rev: string
|
|
+ operation: 'create' | 'update' | 'delete'
|
|
+ collection: string
|
|
+ rkey: string
|
|
+ record?: {
|
|
+ $type: string
|
|
+ text?: string
|
|
+ createdAt?: string
|
|
+ [key: string]: unknown
|
|
}
|
|
+ cid?: string
|
|
+ }
|
|
+}
|
|
|
|
- const postsToDelete = ops.posts.deletes.map((del) => del.uri)
|
|
- const postsToCreate = ops.posts.creates
|
|
- .filter((create) => {
|
|
- // only alf-related posts
|
|
- return create.record.text.toLowerCase().includes('alf')
|
|
- })
|
|
- .map((create) => {
|
|
- // map alf-related posts to a db row
|
|
- return {
|
|
- uri: create.uri,
|
|
- cid: create.cid,
|
|
- indexedAt: new Date().toISOString(),
|
|
- }
|
|
- })
|
|
+export class JetstreamSubscription {
|
|
+ private ws: WebSocket | null = null
|
|
+ private cursor: number = 0
|
|
|
|
- if (postsToDelete.length > 0) {
|
|
- await this.db
|
|
- .deleteFrom('post')
|
|
- .where('uri', 'in', postsToDelete)
|
|
- .execute()
|
|
+ constructor(
|
|
+ public db: Database,
|
|
+ public jetstreamUrl: string,
|
|
+ public reconnectDelay: number = 3000
|
|
+ ) {}
|
|
+
|
|
+ async run() {
|
|
+ await this.loadCursor()
|
|
+ this.connect()
|
|
+ }
|
|
+
|
|
+ private connect() {
|
|
+ const url = new URL(this.jetstreamUrl)
|
|
+ url.searchParams.set('wantedCollections', 'app.bsky.feed.post')
|
|
+ if (this.cursor > 0) {
|
|
+ url.searchParams.set('cursor', this.cursor.toString())
|
|
}
|
|
- if (postsToCreate.length > 0) {
|
|
+
|
|
+ console.log(`Connecting to Jetstream: ${url.toString()}`)
|
|
+ this.ws = new WebSocket(url.toString())
|
|
+
|
|
+ this.ws.on('open', () => {
|
|
+ console.log('Connected to Jetstream')
|
|
+ })
|
|
+
|
|
+ this.ws.on('message', async (data: WebSocket.Data) => {
|
|
+ try {
|
|
+ const event: JetstreamEvent = JSON.parse(data.toString())
|
|
+ await this.handleEvent(event)
|
|
+ } catch (err) {
|
|
+ console.error('Failed to handle Jetstream message:', err)
|
|
+ }
|
|
+ })
|
|
+
|
|
+ this.ws.on('error', (err) => {
|
|
+ console.error('Jetstream WebSocket error:', err)
|
|
+ })
|
|
+
|
|
+ this.ws.on('close', () => {
|
|
+ console.log('Jetstream connection closed, reconnecting...')
|
|
+ setTimeout(() => this.connect(), this.reconnectDelay)
|
|
+ })
|
|
+ }
|
|
+
|
|
+ private async handleEvent(event: JetstreamEvent) {
|
|
+ if (event.kind !== 'commit' || !event.commit) return
|
|
+ if (event.commit.collection !== 'app.bsky.feed.post') return
|
|
+
|
|
+ const uri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`
|
|
+
|
|
+ if (event.commit.operation === 'delete') {
|
|
+ await this.db.deleteFrom('post').where('uri', '=', uri).execute()
|
|
+ } else if (event.commit.operation === 'create' && event.commit.record) {
|
|
+ const text = event.commit.record.text || ''
|
|
+
|
|
+ // Filter: posts starting with / or @ai
|
|
+ if (!text.match(/^\/[a-z]/) && !text.match(/^@ai/)) {
|
|
+ return
|
|
+ }
|
|
+
|
|
+ console.log(`[post] ${event.did}: ${text.substring(0, 50)}...`)
|
|
+
|
|
await this.db
|
|
.insertInto('post')
|
|
- .values(postsToCreate)
|
|
+ .values({
|
|
+ uri: uri,
|
|
+ cid: event.commit.cid || '',
|
|
+ indexedAt: new Date().toISOString(),
|
|
+ })
|
|
.onConflict((oc) => oc.doNothing())
|
|
.execute()
|
|
}
|
|
+
|
|
+ // Update cursor periodically
|
|
+ this.cursor = event.time_us
|
|
+ if (event.time_us % 20 === 0) {
|
|
+ await this.saveCursor()
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private async loadCursor() {
|
|
+ const res = await this.db
|
|
+ .selectFrom('sub_state')
|
|
+ .selectAll()
|
|
+ .where('service', '=', 'jetstream')
|
|
+ .executeTakeFirst()
|
|
+ if (res) {
|
|
+ this.cursor = res.cursor
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private async saveCursor() {
|
|
+ await this.db
|
|
+ .insertInto('sub_state')
|
|
+ .values({ service: 'jetstream', cursor: this.cursor })
|
|
+ .onConflict((oc) => oc.column('service').doUpdateSet({ cursor: this.cursor }))
|
|
+ .execute()
|
|
}
|
|
}
|