diff --git a/install.zsh b/install.zsh index 07a2275..0f48e73 100755 --- a/install.zsh +++ b/install.zsh @@ -78,6 +78,7 @@ PATCH_FILES=( "152-indigo-newpds-dayper-limit-pr707.diff" "190-bgs-disable-ratelimit.patch" "200-feed-generator-custom-existing.patch" + "210-pds-subscriberepos-no-auth.patch" ) function at-repos-clone() { diff --git a/patching/210-pds-subscriberepos-no-auth.patch b/patching/210-pds-subscriberepos-no-auth.patch new file mode 100644 index 0000000..2783588 --- /dev/null +++ b/patching/210-pds-subscriberepos-no-auth.patch @@ -0,0 +1,133 @@ +diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +index c2418d343..f81789df0 100644 +--- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts ++++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +@@ -5,68 +5,71 @@ import { httpLogger } from '../../../../logger' + import { Outbox } from '../../../../sequencer/outbox' + + export default function (server: Server, ctx: AppContext) { +- server.com.atproto.sync.subscribeRepos(async function* ({ params, signal }) { +- const { cursor } = params +- const outbox = new Outbox(ctx.sequencer, { +- maxBufferSize: ctx.cfg.subscription.maxBuffer, +- }) +- httpLogger.info({ cursor }, 'request to com.atproto.sync.subscribeRepos') ++ server.com.atproto.sync.subscribeRepos({ ++ auth: undefined, ++ handler: async function* ({ params, signal }) { ++ const { cursor } = params ++ const outbox = new Outbox(ctx.sequencer, { ++ maxBufferSize: ctx.cfg.subscription.maxBuffer, ++ }) ++ httpLogger.info({ cursor }, 'request to com.atproto.sync.subscribeRepos') + +- const backfillTime = new Date( +- Date.now() - ctx.cfg.subscription.repoBackfillLimitMs, +- ).toISOString() +- let outboxCursor: number | undefined = undefined +- if (cursor !== undefined) { +- const [next, curr] = await Promise.all([ +- ctx.sequencer.next(cursor), +- ctx.sequencer.curr(), +- ]) +- if (cursor > (curr ?? 0)) { +- throw new InvalidRequestError('Cursor in the future.', 'FutureCursor') +- } else if (next && next.sequencedAt < backfillTime) { +- // if cursor is before backfill time, find earliest cursor from backfill window +- yield { +- $type: '#info', +- name: 'OutdatedCursor', +- message: 'Requested cursor exceeded limit. Possibly missing events', ++ const backfillTime = new Date( ++ Date.now() - ctx.cfg.subscription.repoBackfillLimitMs, ++ ).toISOString() ++ let outboxCursor: number | undefined = undefined ++ if (cursor !== undefined) { ++ const [next, curr] = await Promise.all([ ++ ctx.sequencer.next(cursor), ++ ctx.sequencer.curr(), ++ ]) ++ if (cursor > (curr ?? 0)) { ++ throw new InvalidRequestError('Cursor in the future.', 'FutureCursor') ++ } else if (next && next.sequencedAt < backfillTime) { ++ // if cursor is before backfill time, find earliest cursor from backfill window ++ yield { ++ $type: '#info', ++ name: 'OutdatedCursor', ++ message: 'Requested cursor exceeded limit. Possibly missing events', ++ } ++ const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime) ++ outboxCursor = startEvt?.seq ? startEvt.seq - 1 : undefined ++ } else { ++ outboxCursor = cursor + } +- const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime) +- outboxCursor = startEvt?.seq ? startEvt.seq - 1 : undefined +- } else { +- outboxCursor = cursor + } +- } + +- for await (const evt of outbox.events(outboxCursor, signal)) { +- if (evt.type === 'commit') { +- yield { +- $type: '#commit', +- seq: evt.seq, +- time: evt.time, +- ...evt.evt, +- } +- } else if (evt.type === 'sync') { +- yield { +- $type: '#sync', +- seq: evt.seq, +- time: evt.time, +- ...evt.evt, +- } +- } else if (evt.type === 'identity') { +- yield { +- $type: '#identity', +- seq: evt.seq, +- time: evt.time, +- ...evt.evt, +- } +- } else if (evt.type === 'account') { +- yield { +- $type: '#account', +- seq: evt.seq, +- time: evt.time, +- ...evt.evt, ++ for await (const evt of outbox.events(outboxCursor, signal)) { ++ if (evt.type === 'commit') { ++ yield { ++ $type: '#commit', ++ seq: evt.seq, ++ time: evt.time, ++ ...evt.evt, ++ } ++ } else if (evt.type === 'sync') { ++ yield { ++ $type: '#sync', ++ seq: evt.seq, ++ time: evt.time, ++ ...evt.evt, ++ } ++ } else if (evt.type === 'identity') { ++ yield { ++ $type: '#identity', ++ seq: evt.seq, ++ time: evt.time, ++ ...evt.evt, ++ } ++ } else if (evt.type === 'account') { ++ yield { ++ $type: '#account', ++ seq: evt.seq, ++ time: evt.time, ++ ...evt.evt, ++ } + } + } +- } ++ }, + }) + }