--- a/services/bsky/api.js 2025-12-03 11:04:54 +++ b/services/bsky/api.js 2025-12-03 11:00:02 @@ -1,62 +1,105 @@ /* eslint-env node */ /* eslint-disable import/order */ - +// https://github.com/bluesky-social/atproto/blob/main/services/bsky/api.js 'use strict' -const dd = require('dd-trace') +//const dd = require('dd-trace') +// +//dd.tracer +// .init() +// .use('http2', { +// client: true, // calls into dataplane +// server: false, +// }) +// .use('express', { +// hooks: { +// request: (span, req) => { +// maintainXrpcResource(span, req) +// }, +// }, +// }) -dd.tracer - .init() - .use('http2', { - client: true, // calls into dataplane - server: false, - }) - .use('express', { - hooks: { - request: (span, req) => { - maintainXrpcResource(span, req) - }, - }, - }) - // modify tracer in order to track calls to dataplane as a service with proper resource names const DATAPLANE_PREFIX = '/bsky.Service/' -const origStartSpan = dd.tracer._tracer.startSpan -dd.tracer._tracer.startSpan = function (name, options) { - if ( - name !== 'http.request' || - options?.tags?.component !== 'http2' || - !options?.tags?.['http.url'] - ) { - return origStartSpan.call(this, name, options) - } - const uri = new URL(options.tags['http.url']) - if (!uri.pathname.startsWith(DATAPLANE_PREFIX)) { - return origStartSpan.call(this, name, options) - } - options.tags['service.name'] = 'dataplane-bsky' - options.tags['resource.name'] = uri.pathname.slice(DATAPLANE_PREFIX.length) - return origStartSpan.call(this, name, options) -} +//const origStartSpan = dd.tracer._tracer.startSpan +//dd.tracer._tracer.startSpan = function (name, options) { +// if ( +// name !== 'http.request' || +// options?.tags?.component !== 'http2' || +// !options?.tags?.['http.url'] +// ) { +// return origStartSpan.call(this, name, options) +// } +// const uri = new URL(options.tags['http.url']) +// if (!uri.pathname.startsWith(DATAPLANE_PREFIX)) { +// return origStartSpan.call(this, name, options) +// } +// options.tags['service.name'] = 'dataplane-bsky' +// options.tags['resource.name'] = uri.pathname.slice(DATAPLANE_PREFIX.length) +// return origStartSpan.call(this, name, options) +//} // Tracer code above must come before anything else const assert = require('node:assert') const cluster = require('node:cluster') const path = require('node:path') -const { BskyAppView, ServerConfig } = require('@atproto/bsky') -const { Secp256k1Keypair } = require('@atproto/crypto') +const bsky = require('/app/packages/bsky') // import all bsky features +const { Secp256k1Keypair } = require('/app/packages/crypto') const main = async () => { const env = getEnv() - const config = ServerConfig.readEnv() + const config = bsky.ServerConfig.readEnv() assert(env.serviceSigningKey, 'must set BSKY_SERVICE_SIGNING_KEY') const signingKey = await Secp256k1Keypair.import(env.serviceSigningKey) - const bsky = BskyAppView.create({ config, signingKey }) - await bsky.start() + +// starts: involve logics in packages/dev-env/src/bsky.ts >>>>>>>>>>>>> +// Separate migration db in case migration changes some connection state that we need in the tests, e.g. "alter database ... set ..." + const migrationDb = new bsky.Database({ + url: env.dbPostgresUrl, + schema: env.dbPostgresSchema, + }) + if (env.migration) { + await migrationDb.migrateToOrThrow(env.migration) + } else { + await migrationDb.migrateToLatestOrThrow() + } + await migrationDb.close() + + const db = new bsky.Database({ + url: env.dbPostgresUrl, + schema: env.dbPostgresSchema, + poolSize: 10, + }) + + const dataplane = await bsky.DataPlaneServer.create( + db, + env.dataplanePort, + config.didPlcUrl + ) + + const bsync = await bsky.MockBsync.create(db, env.bsyncPort) + +// ends: involve logics in packages/dev-env/src/bsky.ts <<<<<<<<<<<<< + + const server = bsky.BskyAppView.create({ config, signingKey }) +// starts: involve logics in packages/dev-env/src/bsky.ts >>>>>>>>>>>>> + const sub = new bsky.RepoSubscription({ + service: env.repoProvider, + db, + idResolver: dataplane.idResolver, + background: new bsky.BackgroundQueue(db), + }) +// ends: involve logics in packages/dev-env/src/bsky.ts <<<<<<<<<<<<< + await server.start() + sub.start() // involve logics in packages/dev-env/src/bsky.ts // Graceful shutdown (see also https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/) const shutdown = async () => { - await bsky.destroy() + await server.destroy() + await bsync.destroy() + await dataplane.destroy() + await sub.destroy() + await db.close() } process.on('SIGTERM', shutdown) process.on('disconnect', shutdown) // when clustering @@ -64,6 +107,12 @@ const getEnv = () => ({ serviceSigningKey: process.env.BSKY_SERVICE_SIGNING_KEY || undefined, + dbPostgresUrl: process.env.BSKY_DB_POSTGRES_URL || undefined, + dbPostgresSchema: process.env.BSKY_DB_POSTGRES_SCHEMA || undefined, + dataplanePort : maybeParseInt(process.env.BSKY_DATAPLANE_PORT) || undefined, + bsyncPort : maybeParseInt(process.env.BSKY_BSYNC_PORT) || undefined, + migration: process.env.ENABLE_MIGRATIONS === 'true' || undefined, + repoProvider: process.env.BSKY_REPO_PROVIDER || undefined }) const maybeParseInt = (str) => {