Compare commits
3 Commits
20c2bc8e6a
...
c99a14b7a6
| Author | SHA1 | Date | |
|---|---|---|---|
|
c99a14b7a6
|
|||
|
d4473ca84f
|
|||
|
15ba3fa3e8
|
57
install.zsh
57
install.zsh
@@ -361,19 +361,18 @@ function at-repos-push-reset() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function at-repos-push-docker() {
|
function at-repos-push-docker() {
|
||||||
if [ -z "$1" ];then
|
if [ -z "$1" ] || [ "$1" = "push" ]; then
|
||||||
for ((i=1; i<=${#services}; i++)); do
|
for service in "${services[@]}"; do
|
||||||
service=${services[$i]}
|
|
||||||
docker tag at-${service}:latest localhost:${dport}/${service}:latest
|
docker tag at-${service}:latest localhost:${dport}/${service}:latest
|
||||||
docker push localhost:${dport}/${service}:latest
|
docker push localhost:${dport}/${service}:latest
|
||||||
if [ "$service" == "ozone" ];then
|
if [ "$service" = "ozone" ]; then
|
||||||
docker tag at-${service}-web:latest localhost:${dport}/${service}-web:latest
|
docker tag at-"${service}"-web:latest localhost:"${dport}"/"${service}"-web:latest
|
||||||
docker push localhost:${dport}/${service}-web:latest
|
docker push localhost:"${dport}"/"${service}"-web:latest
|
||||||
fi
|
fi
|
||||||
done
|
done
|
||||||
else
|
else
|
||||||
docker tag at-${1}:latest localhost:${dport}/${1}:latest
|
docker tag at-"${1}":latest localhost:"${dport}"/"${1}":latest
|
||||||
docker push localhost:${dport}/${1}:latest
|
docker push localhost:"${dport}"/"${1}":latest
|
||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -411,28 +410,54 @@ function at-repos-reset-bgs-db() {
|
|||||||
echo "⚙️ Updating Slurp Config..."
|
echo "⚙️ Updating Slurp Config..."
|
||||||
docker exec -i $dp psql -U postgres -d bgs -c "UPDATE slurp_configs SET new_subs_disabled = false, new_pds_per_day_limit = 1000 WHERE id = 1;"
|
docker exec -i $dp psql -U postgres -d bgs -c "UPDATE slurp_configs SET new_subs_disabled = false, new_pds_per_day_limit = 1000 WHERE id = 1;"
|
||||||
|
|
||||||
echo "🔗 Registering Trusted Domain & Resetting Repos..."
|
echo "🔗 Registering Trusted Domain..."
|
||||||
# Retry loop for addTrustedDomain as BGS might still be warming up
|
# Retry loop for addTrustedDomain as BGS might still be warming up
|
||||||
for i in {1..5}; do
|
for i in {1..5}; do
|
||||||
if curl -f -X POST "https://bgs.${host}/admin/pds/addTrustedDomain?domain=${host}" -H "Authorization: Bearer ${BGS_ADMIN_KEY}"; then
|
if curl -f -X POST "https://bgs.${host}/admin/pds/addTrustedDomain?domain=${host}" -H "Authorization: Bearer ${BGS_ADMIN_KEY}"; then
|
||||||
|
echo ""
|
||||||
echo "✅ Trusted domain registered"
|
echo "✅ Trusted domain registered"
|
||||||
break
|
break
|
||||||
fi
|
fi
|
||||||
echo "Bot failed to contact BGS (attempt $i/5)... waiting 5s"
|
echo "Failed to contact BGS (attempt $i/5)... waiting 5s"
|
||||||
sleep 5
|
sleep 5
|
||||||
done
|
done
|
||||||
|
|
||||||
|
echo "🔗 Requesting PDS Crawl..."
|
||||||
|
# Request BGS to crawl the PDS - this registers the PDS and starts subscription
|
||||||
|
for i in {1..5}; do
|
||||||
|
result=$(curl -s -X POST "https://bgs.${host}/admin/pds/requestCrawl" \
|
||||||
|
-H "Authorization: Bearer ${BGS_ADMIN_KEY}" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d "{\"hostname\":\"${host}\"}" \
|
||||||
|
-w "%{http_code}" -o /dev/null)
|
||||||
|
if [ "$result" = "200" ]; then
|
||||||
|
echo "✅ PDS crawl requested successfully"
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
echo "Failed to request crawl (attempt $i/5, status: $result)... waiting 5s"
|
||||||
|
sleep 5
|
||||||
|
done
|
||||||
|
|
||||||
|
echo "⏳ Waiting 5s for BGS to connect to PDS..."
|
||||||
|
sleep 5
|
||||||
|
|
||||||
|
echo "🔄 Triggering repo sync for existing users..."
|
||||||
for ((i=1; i<=${#handles}; i++)); do
|
for ((i=1; i<=${#handles}; i++)); do
|
||||||
handle=${handles[$i]}
|
handle=${handles[$i]}
|
||||||
did=`curl -sL "https://${host}/xrpc/com.atproto.repo.describeRepo?repo=${handle}" |jq -r .did`
|
did=$(curl -sL "https://${host}/xrpc/com.atproto.repo.describeRepo?repo=${handle}" | jq -r .did)
|
||||||
if [ ! -z "$did" ] && [ "$did" != "null" ]; then
|
if [ -n "$did" ] && [ "$did" != "null" ]; then
|
||||||
echo "Resetting repo: $handle ($did)"
|
echo " Syncing repo: $handle ($did)"
|
||||||
curl -X POST "https://bgs.${host}/admin/repo/reset?did=${did}" \
|
# Use takedown=false to trigger a resync without actually taking down
|
||||||
-H "Authorization: Bearer ${BGS_ADMIN_KEY}"
|
curl -s -X POST "https://bgs.${host}/admin/repo/takedown?did=${did}&takedown=false" \
|
||||||
|
-H "Authorization: Bearer ${BGS_ADMIN_KEY}" || true
|
||||||
else
|
else
|
||||||
echo "Skipping reset for $handle (DID not found)"
|
echo " Skipping $handle (DID not found)"
|
||||||
fi
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "✅ BGS reset complete!"
|
||||||
|
echo " PDS should now be subscribed and syncing repos."
|
||||||
}
|
}
|
||||||
|
|
||||||
function at-repos-feed-generator-start-push() {
|
function at-repos-feed-generator-start-push() {
|
||||||
|
|||||||
133
patching/210-pds-subscriberepos-no-auth.patch
Normal file
133
patching/210-pds-subscriberepos-no-auth.patch
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
|
||||||
|
index c2418d343..f81789df0 100644
|
||||||
|
--- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
|
||||||
|
+++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
|
||||||
|
@@ -5,68 +5,71 @@ import { httpLogger } from '../../../../logger'
|
||||||
|
import { Outbox } from '../../../../sequencer/outbox'
|
||||||
|
|
||||||
|
export default function (server: Server, ctx: AppContext) {
|
||||||
|
- server.com.atproto.sync.subscribeRepos(async function* ({ params, signal }) {
|
||||||
|
- const { cursor } = params
|
||||||
|
- const outbox = new Outbox(ctx.sequencer, {
|
||||||
|
- maxBufferSize: ctx.cfg.subscription.maxBuffer,
|
||||||
|
- })
|
||||||
|
- httpLogger.info({ cursor }, 'request to com.atproto.sync.subscribeRepos')
|
||||||
|
+ server.com.atproto.sync.subscribeRepos({
|
||||||
|
+ auth: undefined,
|
||||||
|
+ handler: async function* ({ params, signal }) {
|
||||||
|
+ const { cursor } = params
|
||||||
|
+ const outbox = new Outbox(ctx.sequencer, {
|
||||||
|
+ maxBufferSize: ctx.cfg.subscription.maxBuffer,
|
||||||
|
+ })
|
||||||
|
+ httpLogger.info({ cursor }, 'request to com.atproto.sync.subscribeRepos')
|
||||||
|
|
||||||
|
- const backfillTime = new Date(
|
||||||
|
- Date.now() - ctx.cfg.subscription.repoBackfillLimitMs,
|
||||||
|
- ).toISOString()
|
||||||
|
- let outboxCursor: number | undefined = undefined
|
||||||
|
- if (cursor !== undefined) {
|
||||||
|
- const [next, curr] = await Promise.all([
|
||||||
|
- ctx.sequencer.next(cursor),
|
||||||
|
- ctx.sequencer.curr(),
|
||||||
|
- ])
|
||||||
|
- if (cursor > (curr ?? 0)) {
|
||||||
|
- throw new InvalidRequestError('Cursor in the future.', 'FutureCursor')
|
||||||
|
- } else if (next && next.sequencedAt < backfillTime) {
|
||||||
|
- // if cursor is before backfill time, find earliest cursor from backfill window
|
||||||
|
- yield {
|
||||||
|
- $type: '#info',
|
||||||
|
- name: 'OutdatedCursor',
|
||||||
|
- message: 'Requested cursor exceeded limit. Possibly missing events',
|
||||||
|
+ const backfillTime = new Date(
|
||||||
|
+ Date.now() - ctx.cfg.subscription.repoBackfillLimitMs,
|
||||||
|
+ ).toISOString()
|
||||||
|
+ let outboxCursor: number | undefined = undefined
|
||||||
|
+ if (cursor !== undefined) {
|
||||||
|
+ const [next, curr] = await Promise.all([
|
||||||
|
+ ctx.sequencer.next(cursor),
|
||||||
|
+ ctx.sequencer.curr(),
|
||||||
|
+ ])
|
||||||
|
+ if (cursor > (curr ?? 0)) {
|
||||||
|
+ throw new InvalidRequestError('Cursor in the future.', 'FutureCursor')
|
||||||
|
+ } else if (next && next.sequencedAt < backfillTime) {
|
||||||
|
+ // if cursor is before backfill time, find earliest cursor from backfill window
|
||||||
|
+ yield {
|
||||||
|
+ $type: '#info',
|
||||||
|
+ name: 'OutdatedCursor',
|
||||||
|
+ message: 'Requested cursor exceeded limit. Possibly missing events',
|
||||||
|
+ }
|
||||||
|
+ const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime)
|
||||||
|
+ outboxCursor = startEvt?.seq ? startEvt.seq - 1 : undefined
|
||||||
|
+ } else {
|
||||||
|
+ outboxCursor = cursor
|
||||||
|
}
|
||||||
|
- const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime)
|
||||||
|
- outboxCursor = startEvt?.seq ? startEvt.seq - 1 : undefined
|
||||||
|
- } else {
|
||||||
|
- outboxCursor = cursor
|
||||||
|
}
|
||||||
|
- }
|
||||||
|
|
||||||
|
- for await (const evt of outbox.events(outboxCursor, signal)) {
|
||||||
|
- if (evt.type === 'commit') {
|
||||||
|
- yield {
|
||||||
|
- $type: '#commit',
|
||||||
|
- seq: evt.seq,
|
||||||
|
- time: evt.time,
|
||||||
|
- ...evt.evt,
|
||||||
|
- }
|
||||||
|
- } else if (evt.type === 'sync') {
|
||||||
|
- yield {
|
||||||
|
- $type: '#sync',
|
||||||
|
- seq: evt.seq,
|
||||||
|
- time: evt.time,
|
||||||
|
- ...evt.evt,
|
||||||
|
- }
|
||||||
|
- } else if (evt.type === 'identity') {
|
||||||
|
- yield {
|
||||||
|
- $type: '#identity',
|
||||||
|
- seq: evt.seq,
|
||||||
|
- time: evt.time,
|
||||||
|
- ...evt.evt,
|
||||||
|
- }
|
||||||
|
- } else if (evt.type === 'account') {
|
||||||
|
- yield {
|
||||||
|
- $type: '#account',
|
||||||
|
- seq: evt.seq,
|
||||||
|
- time: evt.time,
|
||||||
|
- ...evt.evt,
|
||||||
|
+ for await (const evt of outbox.events(outboxCursor, signal)) {
|
||||||
|
+ if (evt.type === 'commit') {
|
||||||
|
+ yield {
|
||||||
|
+ $type: '#commit',
|
||||||
|
+ seq: evt.seq,
|
||||||
|
+ time: evt.time,
|
||||||
|
+ ...evt.evt,
|
||||||
|
+ }
|
||||||
|
+ } else if (evt.type === 'sync') {
|
||||||
|
+ yield {
|
||||||
|
+ $type: '#sync',
|
||||||
|
+ seq: evt.seq,
|
||||||
|
+ time: evt.time,
|
||||||
|
+ ...evt.evt,
|
||||||
|
+ }
|
||||||
|
+ } else if (evt.type === 'identity') {
|
||||||
|
+ yield {
|
||||||
|
+ $type: '#identity',
|
||||||
|
+ seq: evt.seq,
|
||||||
|
+ time: evt.time,
|
||||||
|
+ ...evt.evt,
|
||||||
|
+ }
|
||||||
|
+ } else if (evt.type === 'account') {
|
||||||
|
+ yield {
|
||||||
|
+ $type: '#account',
|
||||||
|
+ seq: evt.seq,
|
||||||
|
+ time: evt.time,
|
||||||
|
+ ...evt.evt,
|
||||||
|
+ }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
- }
|
||||||
|
+ },
|
||||||
|
})
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user