fix
This commit is contained in:
@@ -77,7 +77,7 @@ PATCH_FILES=(
|
|||||||
"130-atproto-ozone-enable-daemon-v2.patch"
|
"130-atproto-ozone-enable-daemon-v2.patch"
|
||||||
"152-indigo-newpds-dayper-limit-pr707.diff"
|
"152-indigo-newpds-dayper-limit-pr707.diff"
|
||||||
"190-bgs-disable-ratelimit.patch"
|
"190-bgs-disable-ratelimit.patch"
|
||||||
"200-feed-generator-custom-existing.patch"
|
"200-feed-generator-custom.patch"
|
||||||
)
|
)
|
||||||
|
|
||||||
function at-repos-clone() {
|
function at-repos-clone() {
|
||||||
|
|||||||
@@ -1,64 +0,0 @@
|
|||||||
diff --git a/src/algos/index.ts b/src/algos/index.ts
|
|
||||||
index b7ee48a..102cb93 100644
|
|
||||||
--- a/src/algos/index.ts
|
|
||||||
+++ b/src/algos/index.ts
|
|
||||||
@@ -4,11 +4,13 @@ import {
|
|
||||||
OutputSchema as AlgoOutput,
|
|
||||||
} from '../lexicon/types/app/bsky/feed/getFeedSkeleton'
|
|
||||||
import * as whatsAlf from './whats-alf'
|
|
||||||
+import * as app from './app'
|
|
||||||
|
|
||||||
type AlgoHandler = (ctx: AppContext, params: QueryParams) => Promise<AlgoOutput>
|
|
||||||
|
|
||||||
const algos: Record<string, AlgoHandler> = {
|
|
||||||
[whatsAlf.shortname]: whatsAlf.handler,
|
|
||||||
+ [app.shortname]: app.handler,
|
|
||||||
}
|
|
||||||
|
|
||||||
export default algos
|
|
||||||
diff --git a/src/index.ts b/src/index.ts
|
|
||||||
index 7128525..40d985c 100644
|
|
||||||
--- a/src/index.ts
|
|
||||||
+++ b/src/index.ts
|
|
||||||
@@ -22,8 +22,10 @@ const run = async () => {
|
|
||||||
})
|
|
||||||
await server.start()
|
|
||||||
console.log(
|
|
||||||
- `🤖 running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`,
|
|
||||||
+ `running feed generator at http://${server.cfg.listenhost}:${server.cfg.port}`,
|
|
||||||
)
|
|
||||||
+ console.log('Supported algos:', Object.keys(require('./algos').default))
|
|
||||||
+ console.log('Publisher DID:', server.cfg.publisherDid)
|
|
||||||
}
|
|
||||||
|
|
||||||
const maybeStr = (val?: string) => {
|
|
||||||
diff --git a/src/methods/feed-generation.ts b/src/methods/feed-generation.ts
|
|
||||||
index 0f4989e..17be062 100644
|
|
||||||
--- a/src/methods/feed-generation.ts
|
|
||||||
+++ b/src/methods/feed-generation.ts
|
|
||||||
@@ -10,7 +10,7 @@ export default function (server: Server, ctx: AppContext) {
|
|
||||||
const feedUri = new AtUri(params.feed)
|
|
||||||
const algo = algos[feedUri.rkey]
|
|
||||||
if (
|
|
||||||
- feedUri.hostname !== ctx.cfg.publisherDid ||
|
|
||||||
+ //feedUri.hostname !== ctx.cfg.publisherDid ||
|
|
||||||
feedUri.collection !== 'app.bsky.feed.generator' ||
|
|
||||||
!algo
|
|
||||||
) {
|
|
||||||
diff --git a/package.json b/package.json
|
|
||||||
index 1431a9e..6a7c33c 100644
|
|
||||||
--- a/package.json
|
|
||||||
+++ b/package.json
|
|
||||||
@@ -23,9 +23,11 @@
|
|
||||||
"dotenv": "^16.0.3",
|
|
||||||
"express": "^4.18.2",
|
|
||||||
"kysely": "^0.27.4",
|
|
||||||
- "multiformats": "^9.9.0"
|
|
||||||
+ "multiformats": "^9.9.0",
|
|
||||||
+ "ws": "^8.14.2"
|
|
||||||
},
|
|
||||||
"devDependencies": {
|
|
||||||
+ "@types/ws": "^8.5.10",
|
|
||||||
"@types/better-sqlite3": "^7.6.11",
|
|
||||||
"@types/express": "^4.17.17",
|
|
||||||
"@types/node": "^20.1.2",
|
|
||||||
@@ -1,133 +0,0 @@
|
|||||||
diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
|
|
||||||
index c2418d343..f81789df0 100644
|
|
||||||
--- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
|
|
||||||
+++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
|
|
||||||
@@ -5,68 +5,71 @@ import { httpLogger } from '../../../../logger'
|
|
||||||
import { Outbox } from '../../../../sequencer/outbox'
|
|
||||||
|
|
||||||
export default function (server: Server, ctx: AppContext) {
|
|
||||||
- server.com.atproto.sync.subscribeRepos(async function* ({ params, signal }) {
|
|
||||||
- const { cursor } = params
|
|
||||||
- const outbox = new Outbox(ctx.sequencer, {
|
|
||||||
- maxBufferSize: ctx.cfg.subscription.maxBuffer,
|
|
||||||
- })
|
|
||||||
- httpLogger.info({ cursor }, 'request to com.atproto.sync.subscribeRepos')
|
|
||||||
+ server.com.atproto.sync.subscribeRepos({
|
|
||||||
+ auth: undefined,
|
|
||||||
+ handler: async function* ({ params, signal }) {
|
|
||||||
+ const { cursor } = params
|
|
||||||
+ const outbox = new Outbox(ctx.sequencer, {
|
|
||||||
+ maxBufferSize: ctx.cfg.subscription.maxBuffer,
|
|
||||||
+ })
|
|
||||||
+ httpLogger.info({ cursor }, 'request to com.atproto.sync.subscribeRepos')
|
|
||||||
|
|
||||||
- const backfillTime = new Date(
|
|
||||||
- Date.now() - ctx.cfg.subscription.repoBackfillLimitMs,
|
|
||||||
- ).toISOString()
|
|
||||||
- let outboxCursor: number | undefined = undefined
|
|
||||||
- if (cursor !== undefined) {
|
|
||||||
- const [next, curr] = await Promise.all([
|
|
||||||
- ctx.sequencer.next(cursor),
|
|
||||||
- ctx.sequencer.curr(),
|
|
||||||
- ])
|
|
||||||
- if (cursor > (curr ?? 0)) {
|
|
||||||
- throw new InvalidRequestError('Cursor in the future.', 'FutureCursor')
|
|
||||||
- } else if (next && next.sequencedAt < backfillTime) {
|
|
||||||
- // if cursor is before backfill time, find earliest cursor from backfill window
|
|
||||||
- yield {
|
|
||||||
- $type: '#info',
|
|
||||||
- name: 'OutdatedCursor',
|
|
||||||
- message: 'Requested cursor exceeded limit. Possibly missing events',
|
|
||||||
+ const backfillTime = new Date(
|
|
||||||
+ Date.now() - ctx.cfg.subscription.repoBackfillLimitMs,
|
|
||||||
+ ).toISOString()
|
|
||||||
+ let outboxCursor: number | undefined = undefined
|
|
||||||
+ if (cursor !== undefined) {
|
|
||||||
+ const [next, curr] = await Promise.all([
|
|
||||||
+ ctx.sequencer.next(cursor),
|
|
||||||
+ ctx.sequencer.curr(),
|
|
||||||
+ ])
|
|
||||||
+ if (cursor > (curr ?? 0)) {
|
|
||||||
+ throw new InvalidRequestError('Cursor in the future.', 'FutureCursor')
|
|
||||||
+ } else if (next && next.sequencedAt < backfillTime) {
|
|
||||||
+ // if cursor is before backfill time, find earliest cursor from backfill window
|
|
||||||
+ yield {
|
|
||||||
+ $type: '#info',
|
|
||||||
+ name: 'OutdatedCursor',
|
|
||||||
+ message: 'Requested cursor exceeded limit. Possibly missing events',
|
|
||||||
+ }
|
|
||||||
+ const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime)
|
|
||||||
+ outboxCursor = startEvt?.seq ? startEvt.seq - 1 : undefined
|
|
||||||
+ } else {
|
|
||||||
+ outboxCursor = cursor
|
|
||||||
}
|
|
||||||
- const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime)
|
|
||||||
- outboxCursor = startEvt?.seq ? startEvt.seq - 1 : undefined
|
|
||||||
- } else {
|
|
||||||
- outboxCursor = cursor
|
|
||||||
}
|
|
||||||
- }
|
|
||||||
|
|
||||||
- for await (const evt of outbox.events(outboxCursor, signal)) {
|
|
||||||
- if (evt.type === 'commit') {
|
|
||||||
- yield {
|
|
||||||
- $type: '#commit',
|
|
||||||
- seq: evt.seq,
|
|
||||||
- time: evt.time,
|
|
||||||
- ...evt.evt,
|
|
||||||
- }
|
|
||||||
- } else if (evt.type === 'sync') {
|
|
||||||
- yield {
|
|
||||||
- $type: '#sync',
|
|
||||||
- seq: evt.seq,
|
|
||||||
- time: evt.time,
|
|
||||||
- ...evt.evt,
|
|
||||||
- }
|
|
||||||
- } else if (evt.type === 'identity') {
|
|
||||||
- yield {
|
|
||||||
- $type: '#identity',
|
|
||||||
- seq: evt.seq,
|
|
||||||
- time: evt.time,
|
|
||||||
- ...evt.evt,
|
|
||||||
- }
|
|
||||||
- } else if (evt.type === 'account') {
|
|
||||||
- yield {
|
|
||||||
- $type: '#account',
|
|
||||||
- seq: evt.seq,
|
|
||||||
- time: evt.time,
|
|
||||||
- ...evt.evt,
|
|
||||||
+ for await (const evt of outbox.events(outboxCursor, signal)) {
|
|
||||||
+ if (evt.type === 'commit') {
|
|
||||||
+ yield {
|
|
||||||
+ $type: '#commit',
|
|
||||||
+ seq: evt.seq,
|
|
||||||
+ time: evt.time,
|
|
||||||
+ ...evt.evt,
|
|
||||||
+ }
|
|
||||||
+ } else if (evt.type === 'sync') {
|
|
||||||
+ yield {
|
|
||||||
+ $type: '#sync',
|
|
||||||
+ seq: evt.seq,
|
|
||||||
+ time: evt.time,
|
|
||||||
+ ...evt.evt,
|
|
||||||
+ }
|
|
||||||
+ } else if (evt.type === 'identity') {
|
|
||||||
+ yield {
|
|
||||||
+ $type: '#identity',
|
|
||||||
+ seq: evt.seq,
|
|
||||||
+ time: evt.time,
|
|
||||||
+ ...evt.evt,
|
|
||||||
+ }
|
|
||||||
+ } else if (evt.type === 'account') {
|
|
||||||
+ yield {
|
|
||||||
+ $type: '#account',
|
|
||||||
+ seq: evt.seq,
|
|
||||||
+ time: evt.time,
|
|
||||||
+ ...evt.evt,
|
|
||||||
+ }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
- }
|
|
||||||
+ },
|
|
||||||
})
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user