From 37b5919a7fa4ec7c759e6ea8b01f1b0d713775a9 Mon Sep 17 00:00:00 2001 From: syui Date: Tue, 2 Dec 2025 21:37:41 +0900 Subject: [PATCH] fix service pds, bsky --- compose.yml | 40 ++++++----- patching/api.js | 167 ++++++++++++++++++++++++++++++++++++++++++++++ patching/index.js | 32 +++++++++ 3 files changed, 221 insertions(+), 18 deletions(-) create mode 100644 patching/api.js create mode 100644 patching/index.js diff --git a/compose.yml b/compose.yml index 813393a..f2efcb5 100644 --- a/compose.yml +++ b/compose.yml @@ -48,10 +48,32 @@ services: - ./envs/pds volumes: - ./data/pds/:/data/ + - ./repos/atproto/services/pds/index.js:/app/run-pds.js + command: ['node', '--enable-source-maps', '/app/run-pds.js'] depends_on: database: condition: service_healthy + bsky: + ports: + - 2584:2584 + build: + context: ./repos/atproto/ + dockerfile: services/bsky/Dockerfile + restart: always + env_file: + - ./envs/bsky + user: root + volumes: + - ./data/bsky/:/data/ + - ./repos/atproto/services/bsky/api.js:/app/run-bsky.js + command: ['node', '--enable-source-maps', '/app/run-bsky.js'] + depends_on: + database: + condition: service_healthy + redis: + condition: service_healthy + bgs: ports: - 2470:2470 @@ -78,24 +100,6 @@ services: - ./envs/social-app command: "/usr/bin/bskyweb serve" - bsky: - ports: - - 2584:2584 - build: - context: ./repos/atproto/ - dockerfile: services/bsky/Dockerfile - restart: always - env_file: - - ./envs/bsky - user: root - volumes: - - ./data/bsky/:/data/ - command: node --enable-source-maps api.js - depends_on: - database: - condition: service_healthy - redis: - condition: service_healthy jetstream: build: diff --git a/patching/api.js b/patching/api.js new file mode 100644 index 0000000..74a1875 --- /dev/null +++ b/patching/api.js @@ -0,0 +1,167 @@ +/* eslint-env node */ +/* eslint-disable import/order */ +// https://github.com/bluesky-social/atproto/blob/main/services/bsky/api.js +'use strict' + +//const dd = require('dd-trace') +// +//dd.tracer +// .init() +// .use('http2', { +// client: true, // calls into dataplane +// server: false, +// }) +// .use('express', { +// hooks: { +// request: (span, req) => { +// maintainXrpcResource(span, req) +// }, +// }, +// }) + +// modify tracer in order to track calls to dataplane as a service with proper resource names +const DATAPLANE_PREFIX = '/bsky.Service/' +//const origStartSpan = dd.tracer._tracer.startSpan +//dd.tracer._tracer.startSpan = function (name, options) { +// if ( +// name !== 'http.request' || +// options?.tags?.component !== 'http2' || +// !options?.tags?.['http.url'] +// ) { +// return origStartSpan.call(this, name, options) +// } +// const uri = new URL(options.tags['http.url']) +// if (!uri.pathname.startsWith(DATAPLANE_PREFIX)) { +// return origStartSpan.call(this, name, options) +// } +// options.tags['service.name'] = 'dataplane-bsky' +// options.tags['resource.name'] = uri.pathname.slice(DATAPLANE_PREFIX.length) +// return origStartSpan.call(this, name, options) +//} + +// Tracer code above must come before anything else +const assert = require('node:assert') +const cluster = require('node:cluster') +const path = require('node:path') + +const bsky = require('/app/packages/bsky') // import all bsky features +const { Secp256k1Keypair } = require('/app/packages/crypto') + +const main = async () => { + const env = getEnv() + const config = bsky.ServerConfig.readEnv() + assert(env.serviceSigningKey, 'must set BSKY_SERVICE_SIGNING_KEY') + const signingKey = await Secp256k1Keypair.import(env.serviceSigningKey) + +// starts: involve logics in packages/dev-env/src/bsky.ts >>>>>>>>>>>>> +// Separate migration db in case migration changes some connection state that we need in the tests, e.g. "alter database ... set ..." + const migrationDb = new bsky.Database({ + url: env.dbPostgresUrl, + schema: env.dbPostgresSchema, + }) + if (env.migration) { + await migrationDb.migrateToOrThrow(env.migration) + } else { + await migrationDb.migrateToLatestOrThrow() + } + await migrationDb.close() + + const db = new bsky.Database({ + url: env.dbPostgresUrl, + schema: env.dbPostgresSchema, + poolSize: 10, + }) + + const dataplane = await bsky.DataPlaneServer.create( + db, + env.dataplanePort, + config.didPlcUrl + ) + + const bsync = await bsky.MockBsync.create(db, env.bsyncPort) + +// ends: involve logics in packages/dev-env/src/bsky.ts <<<<<<<<<<<<< + + const server = bsky.BskyAppView.create({ config, signingKey }) +// starts: involve logics in packages/dev-env/src/bsky.ts >>>>>>>>>>>>> + const sub = new bsky.RepoSubscription({ + service: env.repoProvider, + db, + idResolver: dataplane.idResolver, + background: new bsky.BackgroundQueue(db), + }) +// ends: involve logics in packages/dev-env/src/bsky.ts <<<<<<<<<<<<< + await server.start() + sub.start() // involve logics in packages/dev-env/src/bsky.ts + // Graceful shutdown (see also https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/) + const shutdown = async () => { + await server.destroy() + await bsync.destroy() + await dataplane.destroy() + await sub.destroy() + await db.close() + } + process.on('SIGTERM', shutdown) + process.on('disconnect', shutdown) // when clustering +} + +const getEnv = () => ({ + serviceSigningKey: process.env.BSKY_SERVICE_SIGNING_KEY || undefined, + dbPostgresUrl: process.env.BSKY_DB_POSTGRES_URL || undefined, + dbPostgresSchema: process.env.BSKY_DB_POSTGRES_SCHEMA || undefined, + dataplanePort : maybeParseInt(process.env.BSKY_DATAPLANE_PORT) || undefined, + bsyncPort : maybeParseInt(process.env.BSKY_BSYNC_PORT) || undefined, + migration: process.env.ENABLE_MIGRATIONS === 'true' || undefined, + repoProvider: process.env.BSKY_REPO_PROVIDER || undefined +}) + +const maybeParseInt = (str) => { + if (!str) return + const int = parseInt(str, 10) + if (isNaN(int)) return + return int +} + +const maintainXrpcResource = (span, req) => { + // Show actual xrpc method as resource rather than the route pattern + if (span && req.originalUrl?.startsWith('/xrpc/')) { + span.setTag( + 'resource.name', + [ + req.method, + path.posix.join(req.baseUrl || '', req.path || '', '/').slice(0, -1), // Ensures no trailing slash + ] + .filter(Boolean) + .join(' '), + ) + } +} + +const workerCount = maybeParseInt(process.env.CLUSTER_WORKER_COUNT) + +if (workerCount) { + if (cluster.isPrimary) { + console.log(`primary ${process.pid} is running`) + const workers = new Set() + for (let i = 0; i < workerCount; ++i) { + workers.add(cluster.fork()) + } + let teardown = false + cluster.on('exit', (worker) => { + workers.delete(worker) + if (!teardown) { + workers.add(cluster.fork()) // restart on crash + } + }) + process.on('SIGTERM', () => { + teardown = true + console.log('disconnecting workers') + workers.forEach((w) => w.disconnect()) + }) + } else { + console.log(`worker ${process.pid} is running`) + main() + } +} else { + main() // non-clustering +} diff --git a/patching/index.js b/patching/index.js new file mode 100644 index 0000000..2ce3602 --- /dev/null +++ b/patching/index.js @@ -0,0 +1,32 @@ +/* eslint-env node */ +// https://github.com/bluesky-social/atproto/blob/main/services/pds/index.js +'use strict' + +const { + PDS, + envToCfg, + envToSecrets, + httpLogger, + readEnv, +} = require('/app/packages/pds') +const pkg = require('/app/packages/pds/package.json') + +const main = async () => { + const env = readEnv() + env.version ??= pkg.version + const cfg = envToCfg(env) + const secrets = envToSecrets(env) + const pds = await PDS.create(cfg, secrets) + + await pds.start() + + httpLogger.info('pds is running') + // Graceful shutdown (see also https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/) + process.on('SIGTERM', async () => { + httpLogger.info('pds is stopping') + await pds.destroy() + httpLogger.info('pds is stopped') + }) +} + +main()