134 lines
4.6 KiB
Diff
134 lines
4.6 KiB
Diff
diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
|
|
index c2418d343..f81789df0 100644
|
|
--- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
|
|
+++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
|
|
@@ -5,68 +5,71 @@ import { httpLogger } from '../../../../logger'
|
|
import { Outbox } from '../../../../sequencer/outbox'
|
|
|
|
export default function (server: Server, ctx: AppContext) {
|
|
- server.com.atproto.sync.subscribeRepos(async function* ({ params, signal }) {
|
|
- const { cursor } = params
|
|
- const outbox = new Outbox(ctx.sequencer, {
|
|
- maxBufferSize: ctx.cfg.subscription.maxBuffer,
|
|
- })
|
|
- httpLogger.info({ cursor }, 'request to com.atproto.sync.subscribeRepos')
|
|
+ server.com.atproto.sync.subscribeRepos({
|
|
+ auth: undefined,
|
|
+ handler: async function* ({ params, signal }) {
|
|
+ const { cursor } = params
|
|
+ const outbox = new Outbox(ctx.sequencer, {
|
|
+ maxBufferSize: ctx.cfg.subscription.maxBuffer,
|
|
+ })
|
|
+ httpLogger.info({ cursor }, 'request to com.atproto.sync.subscribeRepos')
|
|
|
|
- const backfillTime = new Date(
|
|
- Date.now() - ctx.cfg.subscription.repoBackfillLimitMs,
|
|
- ).toISOString()
|
|
- let outboxCursor: number | undefined = undefined
|
|
- if (cursor !== undefined) {
|
|
- const [next, curr] = await Promise.all([
|
|
- ctx.sequencer.next(cursor),
|
|
- ctx.sequencer.curr(),
|
|
- ])
|
|
- if (cursor > (curr ?? 0)) {
|
|
- throw new InvalidRequestError('Cursor in the future.', 'FutureCursor')
|
|
- } else if (next && next.sequencedAt < backfillTime) {
|
|
- // if cursor is before backfill time, find earliest cursor from backfill window
|
|
- yield {
|
|
- $type: '#info',
|
|
- name: 'OutdatedCursor',
|
|
- message: 'Requested cursor exceeded limit. Possibly missing events',
|
|
+ const backfillTime = new Date(
|
|
+ Date.now() - ctx.cfg.subscription.repoBackfillLimitMs,
|
|
+ ).toISOString()
|
|
+ let outboxCursor: number | undefined = undefined
|
|
+ if (cursor !== undefined) {
|
|
+ const [next, curr] = await Promise.all([
|
|
+ ctx.sequencer.next(cursor),
|
|
+ ctx.sequencer.curr(),
|
|
+ ])
|
|
+ if (cursor > (curr ?? 0)) {
|
|
+ throw new InvalidRequestError('Cursor in the future.', 'FutureCursor')
|
|
+ } else if (next && next.sequencedAt < backfillTime) {
|
|
+ // if cursor is before backfill time, find earliest cursor from backfill window
|
|
+ yield {
|
|
+ $type: '#info',
|
|
+ name: 'OutdatedCursor',
|
|
+ message: 'Requested cursor exceeded limit. Possibly missing events',
|
|
+ }
|
|
+ const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime)
|
|
+ outboxCursor = startEvt?.seq ? startEvt.seq - 1 : undefined
|
|
+ } else {
|
|
+ outboxCursor = cursor
|
|
}
|
|
- const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime)
|
|
- outboxCursor = startEvt?.seq ? startEvt.seq - 1 : undefined
|
|
- } else {
|
|
- outboxCursor = cursor
|
|
}
|
|
- }
|
|
|
|
- for await (const evt of outbox.events(outboxCursor, signal)) {
|
|
- if (evt.type === 'commit') {
|
|
- yield {
|
|
- $type: '#commit',
|
|
- seq: evt.seq,
|
|
- time: evt.time,
|
|
- ...evt.evt,
|
|
- }
|
|
- } else if (evt.type === 'sync') {
|
|
- yield {
|
|
- $type: '#sync',
|
|
- seq: evt.seq,
|
|
- time: evt.time,
|
|
- ...evt.evt,
|
|
- }
|
|
- } else if (evt.type === 'identity') {
|
|
- yield {
|
|
- $type: '#identity',
|
|
- seq: evt.seq,
|
|
- time: evt.time,
|
|
- ...evt.evt,
|
|
- }
|
|
- } else if (evt.type === 'account') {
|
|
- yield {
|
|
- $type: '#account',
|
|
- seq: evt.seq,
|
|
- time: evt.time,
|
|
- ...evt.evt,
|
|
+ for await (const evt of outbox.events(outboxCursor, signal)) {
|
|
+ if (evt.type === 'commit') {
|
|
+ yield {
|
|
+ $type: '#commit',
|
|
+ seq: evt.seq,
|
|
+ time: evt.time,
|
|
+ ...evt.evt,
|
|
+ }
|
|
+ } else if (evt.type === 'sync') {
|
|
+ yield {
|
|
+ $type: '#sync',
|
|
+ seq: evt.seq,
|
|
+ time: evt.time,
|
|
+ ...evt.evt,
|
|
+ }
|
|
+ } else if (evt.type === 'identity') {
|
|
+ yield {
|
|
+ $type: '#identity',
|
|
+ seq: evt.seq,
|
|
+ time: evt.time,
|
|
+ ...evt.evt,
|
|
+ }
|
|
+ } else if (evt.type === 'account') {
|
|
+ yield {
|
|
+ $type: '#account',
|
|
+ seq: evt.seq,
|
|
+ time: evt.time,
|
|
+ ...evt.evt,
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
+ },
|
|
})
|
|
}
|