diff --git a/at/feed-generator/readme.md b/at/feed-generator/readme.md index 1554a05..17854e5 100644 --- a/at/feed-generator/readme.md +++ b/at/feed-generator/readme.md @@ -13,3 +13,8 @@ uri=at://$did/$col/$cid echo $uri ``` +## bsky-feed + +```sh +$ git clone https://github.com/bluesky-social/feed-generator +``` diff --git a/at/feed-generator/src/algos/cmd.ts b/at/feed-generator/src/algos/cmd.ts new file mode 100644 index 0000000..bac1f4a --- /dev/null +++ b/at/feed-generator/src/algos/cmd.ts @@ -0,0 +1,43 @@ +import { InvalidRequestError } from '@atproto/xrpc-server' +import { QueryParams } from '../lexicon/types/app/bsky/feed/getFeedSkeleton' +import { AppContext } from '../config' + +// max 15 chars +export const shortname = 'cmd' + +export const handler = async (ctx: AppContext, params: QueryParams) => { + let builder = ctx.db + .selectFrom('post') + .selectAll() + .orderBy('indexedAt', 'desc') + .orderBy('cid', 'desc') + .limit(params.limit) + + if (params.cursor) { + const [indexedAt, cid] = params.cursor.split('::') + if (!indexedAt || !cid) { + throw new InvalidRequestError('malformed cursor') + } + const timeStr = new Date(parseInt(indexedAt, 10)).toISOString() + builder = builder + .where('post.indexedAt', '<', timeStr) + .orWhere((qb) => qb.where('post.indexedAt', '=', timeStr)) + .where('post.cid', '<', cid) + } + const res = await builder.execute() + + const feed = res.map((row) => ({ + post: row.uri, + })) + + let cursor: string | undefined + const last = res.at(-1) + if (last) { + cursor = `${new Date(last.indexedAt).getTime()}::${last.cid}` + } + + return { + cursor, + feed, + } +} diff --git a/at/feed-generator/src/algos/index.ts b/at/feed-generator/src/algos/index.ts new file mode 100644 index 0000000..4490a2d --- /dev/null +++ b/at/feed-generator/src/algos/index.ts @@ -0,0 +1,14 @@ +import { AppContext } from '../config' +import { + QueryParams, + OutputSchema as AlgoOutput, +} from '../lexicon/types/app/bsky/feed/getFeedSkeleton' +import * as cmd from './cmd' + +type AlgoHandler = (ctx: AppContext, params: QueryParams) => Promise + +const algos: Record = { + [cmd.shortname]: cmd.handler, +} + +export default algos diff --git a/at/feed-generator/src/subscription.ts b/at/feed-generator/src/subscription.ts new file mode 100644 index 0000000..bfec084 --- /dev/null +++ b/at/feed-generator/src/subscription.ts @@ -0,0 +1,51 @@ +import { + OutputSchema as RepoEvent, + isCommit, +} from './lexicon/types/com/atproto/sync/subscribeRepos' +import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription' + +export class FirehoseSubscription extends FirehoseSubscriptionBase { + async handleEvent(evt: RepoEvent) { + if (!isCommit(evt)) return + const ops = await getOpsByType(evt) + + // This logs the text of every post off the firehose. + // Just for fun :) + // Delete before actually using + for (const post of ops.posts.creates) { + console.log(post.record.text) + } + + const postsToDelete = ops.posts.deletes.map((del) => del.uri) + const postsToCreate = ops.posts.creates + .filter((create) => { + // only alf-related posts + return create.record.text.match('^/[a-z]'); + //return create.record.text.toLowerCase().includes('alf') + }) + .map((create) => { + // map alf-related posts to a db row + return { + uri: create.uri, + cid: create.cid, + replyParent: create.record?.reply?.parent.uri ?? null, + replyRoot: create.record?.reply?.root.uri ?? null, + indexedAt: new Date().toISOString(), + } + }) + + if (postsToDelete.length > 0) { + await this.db + .deleteFrom('post') + .where('uri', 'in', postsToDelete) + .execute() + } + if (postsToCreate.length > 0) { + await this.db + .insertInto('post') + .values(postsToCreate) + .onConflict((oc) => oc.doNothing()) + .execute() + } + } +}