This commit is contained in:
parent
3b2c9eb1a2
commit
38a14cd0d7
@ -13,3 +13,8 @@ uri=at://$did/$col/$cid
|
||||
echo $uri
|
||||
```
|
||||
|
||||
## bsky-feed
|
||||
|
||||
```sh
|
||||
$ git clone https://github.com/bluesky-social/feed-generator
|
||||
```
|
||||
|
43
at/feed-generator/src/algos/cmd.ts
Normal file
43
at/feed-generator/src/algos/cmd.ts
Normal file
@ -0,0 +1,43 @@
|
||||
import { InvalidRequestError } from '@atproto/xrpc-server'
|
||||
import { QueryParams } from '../lexicon/types/app/bsky/feed/getFeedSkeleton'
|
||||
import { AppContext } from '../config'
|
||||
|
||||
// max 15 chars
|
||||
export const shortname = 'cmd'
|
||||
|
||||
export const handler = async (ctx: AppContext, params: QueryParams) => {
|
||||
let builder = ctx.db
|
||||
.selectFrom('post')
|
||||
.selectAll()
|
||||
.orderBy('indexedAt', 'desc')
|
||||
.orderBy('cid', 'desc')
|
||||
.limit(params.limit)
|
||||
|
||||
if (params.cursor) {
|
||||
const [indexedAt, cid] = params.cursor.split('::')
|
||||
if (!indexedAt || !cid) {
|
||||
throw new InvalidRequestError('malformed cursor')
|
||||
}
|
||||
const timeStr = new Date(parseInt(indexedAt, 10)).toISOString()
|
||||
builder = builder
|
||||
.where('post.indexedAt', '<', timeStr)
|
||||
.orWhere((qb) => qb.where('post.indexedAt', '=', timeStr))
|
||||
.where('post.cid', '<', cid)
|
||||
}
|
||||
const res = await builder.execute()
|
||||
|
||||
const feed = res.map((row) => ({
|
||||
post: row.uri,
|
||||
}))
|
||||
|
||||
let cursor: string | undefined
|
||||
const last = res.at(-1)
|
||||
if (last) {
|
||||
cursor = `${new Date(last.indexedAt).getTime()}::${last.cid}`
|
||||
}
|
||||
|
||||
return {
|
||||
cursor,
|
||||
feed,
|
||||
}
|
||||
}
|
14
at/feed-generator/src/algos/index.ts
Normal file
14
at/feed-generator/src/algos/index.ts
Normal file
@ -0,0 +1,14 @@
|
||||
import { AppContext } from '../config'
|
||||
import {
|
||||
QueryParams,
|
||||
OutputSchema as AlgoOutput,
|
||||
} from '../lexicon/types/app/bsky/feed/getFeedSkeleton'
|
||||
import * as cmd from './cmd'
|
||||
|
||||
type AlgoHandler = (ctx: AppContext, params: QueryParams) => Promise<AlgoOutput>
|
||||
|
||||
const algos: Record<string, AlgoHandler> = {
|
||||
[cmd.shortname]: cmd.handler,
|
||||
}
|
||||
|
||||
export default algos
|
51
at/feed-generator/src/subscription.ts
Normal file
51
at/feed-generator/src/subscription.ts
Normal file
@ -0,0 +1,51 @@
|
||||
import {
|
||||
OutputSchema as RepoEvent,
|
||||
isCommit,
|
||||
} from './lexicon/types/com/atproto/sync/subscribeRepos'
|
||||
import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription'
|
||||
|
||||
export class FirehoseSubscription extends FirehoseSubscriptionBase {
|
||||
async handleEvent(evt: RepoEvent) {
|
||||
if (!isCommit(evt)) return
|
||||
const ops = await getOpsByType(evt)
|
||||
|
||||
// This logs the text of every post off the firehose.
|
||||
// Just for fun :)
|
||||
// Delete before actually using
|
||||
for (const post of ops.posts.creates) {
|
||||
console.log(post.record.text)
|
||||
}
|
||||
|
||||
const postsToDelete = ops.posts.deletes.map((del) => del.uri)
|
||||
const postsToCreate = ops.posts.creates
|
||||
.filter((create) => {
|
||||
// only alf-related posts
|
||||
return create.record.text.match('^/[a-z]');
|
||||
//return create.record.text.toLowerCase().includes('alf')
|
||||
})
|
||||
.map((create) => {
|
||||
// map alf-related posts to a db row
|
||||
return {
|
||||
uri: create.uri,
|
||||
cid: create.cid,
|
||||
replyParent: create.record?.reply?.parent.uri ?? null,
|
||||
replyRoot: create.record?.reply?.root.uri ?? null,
|
||||
indexedAt: new Date().toISOString(),
|
||||
}
|
||||
})
|
||||
|
||||
if (postsToDelete.length > 0) {
|
||||
await this.db
|
||||
.deleteFrom('post')
|
||||
.where('uri', 'in', postsToDelete)
|
||||
.execute()
|
||||
}
|
||||
if (postsToCreate.length > 0) {
|
||||
await this.db
|
||||
.insertInto('post')
|
||||
.values(postsToCreate)
|
||||
.onConflict((oc) => oc.doNothing())
|
||||
.execute()
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user