diff --git a/changelog.md b/changelog.md index 3bb51a2..1b1dd47 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,16 @@ # Changelog +## 2025-11-27 - 2.3.0 - feat(upstream) +Add upstream proxy/cache subsystem and integrate per-protocol upstreams + +- Introduce a complete upstream subsystem (BaseUpstream, UpstreamCache, CircuitBreaker) with caching, negative-cache, stale-while-revalidate, retries, exponential backoff and per-upstream circuit breakers. +- Add upstream interfaces and defaults (ts/upstream/interfaces.upstream.ts) and export upstream utilities from ts/upstream/index.ts and root ts/index.ts. +- Implement protocol-specific upstream clients for npm, pypi, maven, composer, cargo and rubygems (classes.*upstream.ts) to fetch metadata and artifacts from configured upstream registries. +- Integrate upstream usage into registries: registries now accept an upstream config, attempt to fetch missing metadata/artifacts from upstreams, cache results locally, and expose destroy() to stop upstream resources. +- Add SmartRequest and minimatch to dependencies and expose smartrequest/minimatch via ts/plugins.ts for HTTP requests and glob-based scope matching. +- Update package.json to add @push.rocks/smartrequest and minimatch dependencies. +- Various registry implementations updated to utilize upstreams (npm, pypi, maven, composer, cargo, rubygems, oci) including URL rewrites and caching behavior. + ## 2025-11-27 - 2.2.3 - fix(tests) Use unique test run IDs and add S3 cleanup in test helpers to avoid cross-run conflicts diff --git a/package.json b/package.json index fb6e484..28eb5a8 100644 --- a/package.json +++ b/package.json @@ -50,8 +50,10 @@ "@push.rocks/smartbucket": "^4.3.0", "@push.rocks/smartlog": "^3.1.10", "@push.rocks/smartpath": "^6.0.0", + "@push.rocks/smartrequest": "^5.0.1", "@tsclass/tsclass": "^9.3.0", - "adm-zip": "^0.5.10" + "adm-zip": "^0.5.10", + "minimatch": "^10.1.1" }, "packageManager": "pnpm@10.18.1+sha512.77a884a165cbba2d8d1c19e3b4880eee6d2fcabd0d879121e282196b80042351d5eb3ca0935fa599da1dc51265cc68816ad2bddd2a2de5ea9fdf92adbec7cd34" } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c9e0aac..e4f03af 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -20,12 +20,18 @@ importers: '@push.rocks/smartpath': specifier: ^6.0.0 version: 6.0.0 + '@push.rocks/smartrequest': + specifier: ^5.0.1 + version: 5.0.1 '@tsclass/tsclass': specifier: ^9.3.0 version: 9.3.0 adm-zip: specifier: ^0.5.10 version: 0.5.16 + minimatch: + specifier: ^10.1.1 + version: 10.1.1 devDependencies: '@git.zone/tsbuild': specifier: ^3.1.0 diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 5a0420f..a67cfa0 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartregistry', - version: '2.2.3', + version: '2.3.0', description: 'A composable TypeScript library implementing OCI, NPM, Maven, Cargo, Composer, PyPI, and RubyGems registries for building unified container and package registries' } diff --git a/ts/cargo/classes.cargoregistry.ts b/ts/cargo/classes.cargoregistry.ts index a0441fd..4e865bf 100644 --- a/ts/cargo/classes.cargoregistry.ts +++ b/ts/cargo/classes.cargoregistry.ts @@ -3,6 +3,7 @@ import { BaseRegistry } from '../core/classes.baseregistry.js'; import { RegistryStorage } from '../core/classes.registrystorage.js'; import { AuthManager } from '../core/classes.authmanager.js'; import type { IRequestContext, IResponse, IAuthToken } from '../core/interfaces.core.js'; +import type { IProtocolUpstreamConfig } from '../upstream/interfaces.upstream.js'; import type { ICargoIndexEntry, ICargoPublishMetadata, @@ -13,6 +14,7 @@ import type { ICargoSearchResponse, ICargoSearchResult, } from './interfaces.cargo.js'; +import { CargoUpstream } from './classes.cargoupstream.js'; /** * Cargo/crates.io registry implementation @@ -25,12 +27,14 @@ export class CargoRegistry extends BaseRegistry { private basePath: string = '/cargo'; private registryUrl: string; private logger: Smartlog; + private upstream: CargoUpstream | null = null; constructor( storage: RegistryStorage, authManager: AuthManager, basePath: string = '/cargo', - registryUrl: string = 'http://localhost:5000/cargo' + registryUrl: string = 'http://localhost:5000/cargo', + upstreamConfig?: IProtocolUpstreamConfig ) { super(); this.storage = storage; @@ -50,6 +54,20 @@ export class CargoRegistry extends BaseRegistry { } }); this.logger.enableConsole(); + + // Initialize upstream if configured + if (upstreamConfig?.enabled) { + this.upstream = new CargoUpstream(upstreamConfig, undefined, this.logger); + } + } + + /** + * Clean up resources (timers, connections, etc.) + */ + public destroy(): void { + if (this.upstream) { + this.upstream.stop(); + } } public async init(): Promise { @@ -207,7 +225,25 @@ export class CargoRegistry extends BaseRegistry { * Serve index file for a crate */ private async handleIndexFile(crateName: string): Promise { - const index = await this.storage.getCargoIndex(crateName); + let index = await this.storage.getCargoIndex(crateName); + + // Try upstream if not found locally + if ((!index || index.length === 0) && this.upstream) { + const upstreamIndex = await this.upstream.fetchCrateIndex(crateName); + if (upstreamIndex) { + // Parse the newline-delimited JSON + const parsedIndex: ICargoIndexEntry[] = upstreamIndex + .split('\n') + .filter(line => line.trim()) + .map(line => JSON.parse(line)); + + if (parsedIndex.length > 0) { + // Cache locally + await this.storage.putCargoIndex(crateName, parsedIndex); + index = parsedIndex; + } + } + } if (!index || index.length === 0) { return { @@ -399,7 +435,16 @@ export class CargoRegistry extends BaseRegistry { ): Promise { this.logger.log('debug', 'handleDownload', { crate: crateName, version }); - const crateFile = await this.storage.getCargoCrate(crateName, version); + let crateFile = await this.storage.getCargoCrate(crateName, version); + + // Try upstream if not found locally + if (!crateFile && this.upstream) { + crateFile = await this.upstream.fetchCrate(crateName, version); + if (crateFile) { + // Cache locally + await this.storage.putCargoCrate(crateName, version, crateFile); + } + } if (!crateFile) { return { diff --git a/ts/cargo/classes.cargoupstream.ts b/ts/cargo/classes.cargoupstream.ts new file mode 100644 index 0000000..5539ca8 --- /dev/null +++ b/ts/cargo/classes.cargoupstream.ts @@ -0,0 +1,159 @@ +import * as plugins from '../plugins.js'; +import { BaseUpstream } from '../upstream/classes.baseupstream.js'; +import type { + IProtocolUpstreamConfig, + IUpstreamFetchContext, + IUpstreamRegistryConfig, +} from '../upstream/interfaces.upstream.js'; + +/** + * Cargo-specific upstream implementation. + * + * Handles: + * - Crate metadata (index) fetching + * - Crate file (.crate) downloading + * - Sparse index protocol support + * - Content-addressable caching for .crate files + */ +export class CargoUpstream extends BaseUpstream { + protected readonly protocolName = 'cargo'; + + /** Base URL for crate downloads (may differ from index URL) */ + private readonly downloadUrl: string; + + constructor( + config: IProtocolUpstreamConfig, + downloadUrl?: string, + logger?: plugins.smartlog.Smartlog, + ) { + super(config, logger); + // Default to crates.io download URL if not specified + this.downloadUrl = downloadUrl || 'https://static.crates.io/crates'; + } + + /** + * Fetch crate metadata from the sparse index. + */ + public async fetchCrateIndex(crateName: string): Promise { + const path = this.buildIndexPath(crateName); + + const context: IUpstreamFetchContext = { + protocol: 'cargo', + resource: crateName, + resourceType: 'index', + path, + method: 'GET', + headers: { + 'accept': 'text/plain', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return result.body.toString('utf8'); + } + + return typeof result.body === 'string' ? result.body : null; + } + + /** + * Fetch a crate file from upstream. + */ + public async fetchCrate(crateName: string, version: string): Promise { + // Crate downloads typically go to a different URL than the index + const path = `/${crateName}/${crateName}-${version}.crate`; + + const context: IUpstreamFetchContext = { + protocol: 'cargo', + resource: crateName, + resourceType: 'crate', + path, + method: 'GET', + headers: { + 'accept': 'application/octet-stream', + }, + query: {}, + }; + + // Use special handling for crate downloads + const result = await this.fetchCrateFile(crateName, version); + return result; + } + + /** + * Fetch crate file directly from the download URL. + */ + private async fetchCrateFile(crateName: string, version: string): Promise { + const context: IUpstreamFetchContext = { + protocol: 'cargo', + resource: crateName, + resourceType: 'crate', + path: `/${crateName}/${crateName}-${version}.crate`, + method: 'GET', + headers: { + 'accept': 'application/octet-stream', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + return Buffer.isBuffer(result.body) ? result.body : Buffer.from(result.body); + } + + /** + * Build the sparse index path for a crate. + * + * Path structure: + * - 1 char: /1/{name} + * - 2 chars: /2/{name} + * - 3 chars: /3/{first char}/{name} + * - 4+ chars: /{first 2}/{next 2}/{name} + */ + private buildIndexPath(crateName: string): string { + const lowerName = crateName.toLowerCase(); + const len = lowerName.length; + + if (len === 1) { + return `/1/${lowerName}`; + } else if (len === 2) { + return `/2/${lowerName}`; + } else if (len === 3) { + return `/3/${lowerName[0]}/${lowerName}`; + } else { + return `/${lowerName.slice(0, 2)}/${lowerName.slice(2, 4)}/${lowerName}`; + } + } + + /** + * Override URL building for Cargo-specific handling. + */ + protected buildUpstreamUrl( + upstream: IUpstreamRegistryConfig, + context: IUpstreamFetchContext, + ): string { + let baseUrl = upstream.url; + + // For crate downloads, use the download URL + if (context.resourceType === 'crate') { + baseUrl = this.downloadUrl; + } + + // Remove trailing slash + if (baseUrl.endsWith('/')) { + baseUrl = baseUrl.slice(0, -1); + } + + return `${baseUrl}${context.path}`; + } +} diff --git a/ts/cargo/index.ts b/ts/cargo/index.ts index 6d4c49b..09bd1e9 100644 --- a/ts/cargo/index.ts +++ b/ts/cargo/index.ts @@ -3,4 +3,5 @@ */ export { CargoRegistry } from './classes.cargoregistry.js'; +export { CargoUpstream } from './classes.cargoupstream.js'; export * from './interfaces.cargo.js'; diff --git a/ts/classes.smartregistry.ts b/ts/classes.smartregistry.ts index 4998963..ae6cc33 100644 --- a/ts/classes.smartregistry.ts +++ b/ts/classes.smartregistry.ts @@ -46,7 +46,13 @@ export class SmartRegistry { realm: this.config.auth.ociTokens.realm, service: this.config.auth.ociTokens.service, } : undefined; - const ociRegistry = new OciRegistry(this.storage, this.authManager, ociBasePath, ociTokens); + const ociRegistry = new OciRegistry( + this.storage, + this.authManager, + ociBasePath, + ociTokens, + this.config.oci.upstream + ); await ociRegistry.init(); this.registries.set('oci', ociRegistry); } @@ -55,7 +61,13 @@ export class SmartRegistry { if (this.config.npm?.enabled) { const npmBasePath = this.config.npm.basePath ?? '/npm'; const registryUrl = `http://localhost:5000${npmBasePath}`; // TODO: Make configurable - const npmRegistry = new NpmRegistry(this.storage, this.authManager, npmBasePath, registryUrl); + const npmRegistry = new NpmRegistry( + this.storage, + this.authManager, + npmBasePath, + registryUrl, + this.config.npm.upstream + ); await npmRegistry.init(); this.registries.set('npm', npmRegistry); } @@ -64,7 +76,13 @@ export class SmartRegistry { if (this.config.maven?.enabled) { const mavenBasePath = this.config.maven.basePath ?? '/maven'; const registryUrl = `http://localhost:5000${mavenBasePath}`; // TODO: Make configurable - const mavenRegistry = new MavenRegistry(this.storage, this.authManager, mavenBasePath, registryUrl); + const mavenRegistry = new MavenRegistry( + this.storage, + this.authManager, + mavenBasePath, + registryUrl, + this.config.maven.upstream + ); await mavenRegistry.init(); this.registries.set('maven', mavenRegistry); } @@ -73,7 +91,13 @@ export class SmartRegistry { if (this.config.cargo?.enabled) { const cargoBasePath = this.config.cargo.basePath ?? '/cargo'; const registryUrl = `http://localhost:5000${cargoBasePath}`; // TODO: Make configurable - const cargoRegistry = new CargoRegistry(this.storage, this.authManager, cargoBasePath, registryUrl); + const cargoRegistry = new CargoRegistry( + this.storage, + this.authManager, + cargoBasePath, + registryUrl, + this.config.cargo.upstream + ); await cargoRegistry.init(); this.registries.set('cargo', cargoRegistry); } @@ -82,7 +106,13 @@ export class SmartRegistry { if (this.config.composer?.enabled) { const composerBasePath = this.config.composer.basePath ?? '/composer'; const registryUrl = `http://localhost:5000${composerBasePath}`; // TODO: Make configurable - const composerRegistry = new ComposerRegistry(this.storage, this.authManager, composerBasePath, registryUrl); + const composerRegistry = new ComposerRegistry( + this.storage, + this.authManager, + composerBasePath, + registryUrl, + this.config.composer.upstream + ); await composerRegistry.init(); this.registries.set('composer', composerRegistry); } @@ -91,7 +121,13 @@ export class SmartRegistry { if (this.config.pypi?.enabled) { const pypiBasePath = this.config.pypi.basePath ?? '/pypi'; const registryUrl = `http://localhost:5000`; // TODO: Make configurable - const pypiRegistry = new PypiRegistry(this.storage, this.authManager, pypiBasePath, registryUrl); + const pypiRegistry = new PypiRegistry( + this.storage, + this.authManager, + pypiBasePath, + registryUrl, + this.config.pypi.upstream + ); await pypiRegistry.init(); this.registries.set('pypi', pypiRegistry); } @@ -100,7 +136,13 @@ export class SmartRegistry { if (this.config.rubygems?.enabled) { const rubygemsBasePath = this.config.rubygems.basePath ?? '/rubygems'; const registryUrl = `http://localhost:5000${rubygemsBasePath}`; // TODO: Make configurable - const rubygemsRegistry = new RubyGemsRegistry(this.storage, this.authManager, rubygemsBasePath, registryUrl); + const rubygemsRegistry = new RubyGemsRegistry( + this.storage, + this.authManager, + rubygemsBasePath, + registryUrl, + this.config.rubygems.upstream + ); await rubygemsRegistry.init(); this.registries.set('rubygems', rubygemsRegistry); } diff --git a/ts/composer/classes.composerregistry.ts b/ts/composer/classes.composerregistry.ts index fdb657f..7b174ab 100644 --- a/ts/composer/classes.composerregistry.ts +++ b/ts/composer/classes.composerregistry.ts @@ -7,6 +7,7 @@ import { BaseRegistry } from '../core/classes.baseregistry.js'; import type { RegistryStorage } from '../core/classes.registrystorage.js'; import type { AuthManager } from '../core/classes.authmanager.js'; import type { IRequestContext, IResponse, IAuthToken } from '../core/interfaces.core.js'; +import type { IProtocolUpstreamConfig } from '../upstream/interfaces.upstream.js'; import { isBinaryData, toBuffer } from '../core/helpers.buffer.js'; import type { IComposerPackage, @@ -22,24 +23,41 @@ import { generatePackagesJson, sortVersions, } from './helpers.composer.js'; +import { ComposerUpstream } from './classes.composerupstream.js'; export class ComposerRegistry extends BaseRegistry { private storage: RegistryStorage; private authManager: AuthManager; private basePath: string = '/composer'; private registryUrl: string; + private upstream: ComposerUpstream | null = null; constructor( storage: RegistryStorage, authManager: AuthManager, basePath: string = '/composer', - registryUrl: string = 'http://localhost:5000/composer' + registryUrl: string = 'http://localhost:5000/composer', + upstreamConfig?: IProtocolUpstreamConfig ) { super(); this.storage = storage; this.authManager = authManager; this.basePath = basePath; this.registryUrl = registryUrl; + + // Initialize upstream if configured + if (upstreamConfig?.enabled) { + this.upstream = new ComposerUpstream(upstreamConfig); + } + } + + /** + * Clean up resources (timers, connections, etc.) + */ + public destroy(): void { + if (this.upstream) { + this.upstream.stop(); + } } public async init(): Promise { @@ -161,7 +179,26 @@ export class ComposerRegistry extends BaseRegistry { token: IAuthToken | null ): Promise { // Read operations are public, no authentication required - const metadata = await this.storage.getComposerPackageMetadata(vendorPackage); + let metadata = await this.storage.getComposerPackageMetadata(vendorPackage); + + // Try upstream if not found locally + if (!metadata && this.upstream) { + const [vendor, packageName] = vendorPackage.split('/'); + if (vendor && packageName) { + const upstreamMetadata = includeDev + ? await this.upstream.fetchPackageDevMetadata(vendor, packageName) + : await this.upstream.fetchPackageMetadata(vendor, packageName); + + if (upstreamMetadata && upstreamMetadata.packages) { + // Store upstream metadata locally + metadata = { + packages: upstreamMetadata.packages, + lastModified: new Date().toUTCString(), + }; + await this.storage.putComposerPackageMetadata(vendorPackage, metadata); + } + } + } if (!metadata) { return { diff --git a/ts/composer/classes.composerupstream.ts b/ts/composer/classes.composerupstream.ts new file mode 100644 index 0000000..3295bd5 --- /dev/null +++ b/ts/composer/classes.composerupstream.ts @@ -0,0 +1,200 @@ +import * as plugins from '../plugins.js'; +import { BaseUpstream } from '../upstream/classes.baseupstream.js'; +import type { + IProtocolUpstreamConfig, + IUpstreamFetchContext, + IUpstreamRegistryConfig, +} from '../upstream/interfaces.upstream.js'; + +/** + * Composer-specific upstream implementation. + * + * Handles: + * - Package metadata fetching (packages.json, provider-includes) + * - Package version metadata (p2/{vendor}/{package}.json) + * - Dist file (zip) proxying + * - Packagist v2 API support + */ +export class ComposerUpstream extends BaseUpstream { + protected readonly protocolName = 'composer'; + + constructor( + config: IProtocolUpstreamConfig, + logger?: plugins.smartlog.Smartlog, + ) { + super(config, logger); + } + + /** + * Fetch the root packages.json from upstream. + */ + public async fetchPackagesJson(): Promise { + const context: IUpstreamFetchContext = { + protocol: 'composer', + resource: '*', + resourceType: 'root', + path: '/packages.json', + method: 'GET', + headers: { + 'accept': 'application/json', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return JSON.parse(result.body.toString('utf8')); + } + + return result.body; + } + + /** + * Fetch package metadata using v2 API (p2/{vendor}/{package}.json). + */ + public async fetchPackageMetadata(vendor: string, packageName: string): Promise { + const fullName = `${vendor}/${packageName}`; + const path = `/p2/${vendor}/${packageName}.json`; + + const context: IUpstreamFetchContext = { + protocol: 'composer', + resource: fullName, + resourceType: 'metadata', + path, + method: 'GET', + headers: { + 'accept': 'application/json', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return JSON.parse(result.body.toString('utf8')); + } + + return result.body; + } + + /** + * Fetch package metadata with dev versions (p2/{vendor}/{package}~dev.json). + */ + public async fetchPackageDevMetadata(vendor: string, packageName: string): Promise { + const fullName = `${vendor}/${packageName}`; + const path = `/p2/${vendor}/${packageName}~dev.json`; + + const context: IUpstreamFetchContext = { + protocol: 'composer', + resource: fullName, + resourceType: 'metadata-dev', + path, + method: 'GET', + headers: { + 'accept': 'application/json', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return JSON.parse(result.body.toString('utf8')); + } + + return result.body; + } + + /** + * Fetch a provider-includes file. + */ + public async fetchProviderIncludes(path: string): Promise { + const context: IUpstreamFetchContext = { + protocol: 'composer', + resource: '*', + resourceType: 'provider', + path: path.startsWith('/') ? path : `/${path}`, + method: 'GET', + headers: { + 'accept': 'application/json', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return JSON.parse(result.body.toString('utf8')); + } + + return result.body; + } + + /** + * Fetch a dist file (zip) from upstream. + */ + public async fetchDist(url: string): Promise { + // Parse the URL to get the path + let path: string; + try { + const parsed = new URL(url); + path = parsed.pathname; + } catch { + path = url; + } + + const context: IUpstreamFetchContext = { + protocol: 'composer', + resource: '*', + resourceType: 'dist', + path, + method: 'GET', + headers: { + 'accept': 'application/zip, application/octet-stream', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + return Buffer.isBuffer(result.body) ? result.body : Buffer.from(result.body); + } + + /** + * Override URL building for Composer-specific handling. + */ + protected buildUpstreamUrl( + upstream: IUpstreamRegistryConfig, + context: IUpstreamFetchContext, + ): string { + let baseUrl = upstream.url; + + // Remove trailing slash + if (baseUrl.endsWith('/')) { + baseUrl = baseUrl.slice(0, -1); + } + + return `${baseUrl}${context.path}`; + } +} diff --git a/ts/composer/index.ts b/ts/composer/index.ts index e2997a1..979c515 100644 --- a/ts/composer/index.ts +++ b/ts/composer/index.ts @@ -4,5 +4,6 @@ */ export { ComposerRegistry } from './classes.composerregistry.js'; +export { ComposerUpstream } from './classes.composerupstream.js'; export * from './interfaces.composer.js'; export * from './helpers.composer.js'; diff --git a/ts/core/interfaces.core.ts b/ts/core/interfaces.core.ts index 62c1c28..c1dad7a 100644 --- a/ts/core/interfaces.core.ts +++ b/ts/core/interfaces.core.ts @@ -3,6 +3,7 @@ */ import type * as plugins from '../plugins.js'; +import type { IProtocolUpstreamConfig } from '../upstream/interfaces.upstream.js'; /** * Registry protocol types @@ -86,6 +87,8 @@ export interface IProtocolConfig { enabled: boolean; basePath: string; features?: Record; + /** Upstream registry configuration for proxying/caching */ + upstream?: IProtocolUpstreamConfig; } /** diff --git a/ts/index.ts b/ts/index.ts index 8506044..7cd3853 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -9,6 +9,9 @@ export { SmartRegistry } from './classes.smartregistry.js'; // Core infrastructure export * from './core/index.js'; +// Upstream infrastructure +export * from './upstream/index.js'; + // OCI Registry export * from './oci/index.js'; diff --git a/ts/maven/classes.mavenregistry.ts b/ts/maven/classes.mavenregistry.ts index 221a592..f01b500 100644 --- a/ts/maven/classes.mavenregistry.ts +++ b/ts/maven/classes.mavenregistry.ts @@ -7,6 +7,7 @@ import { BaseRegistry } from '../core/classes.baseregistry.js'; import type { RegistryStorage } from '../core/classes.registrystorage.js'; import type { AuthManager } from '../core/classes.authmanager.js'; import type { IRequestContext, IResponse, IAuthToken } from '../core/interfaces.core.js'; +import type { IProtocolUpstreamConfig } from '../upstream/interfaces.upstream.js'; import { toBuffer } from '../core/helpers.buffer.js'; import type { IMavenCoordinate, IMavenMetadata, IChecksums } from './interfaces.maven.js'; import { @@ -21,6 +22,7 @@ import { extractGAVFromPom, gavToPath, } from './helpers.maven.js'; +import { MavenUpstream } from './classes.mavenupstream.js'; /** * Maven Registry class @@ -31,18 +33,34 @@ export class MavenRegistry extends BaseRegistry { private authManager: AuthManager; private basePath: string = '/maven'; private registryUrl: string; + private upstream: MavenUpstream | null = null; constructor( storage: RegistryStorage, authManager: AuthManager, basePath: string, - registryUrl: string + registryUrl: string, + upstreamConfig?: IProtocolUpstreamConfig ) { super(); this.storage = storage; this.authManager = authManager; this.basePath = basePath; this.registryUrl = registryUrl; + + // Initialize upstream if configured + if (upstreamConfig?.enabled) { + this.upstream = new MavenUpstream(upstreamConfig); + } + } + + /** + * Clean up resources (timers, connections, etc.) + */ + public destroy(): void { + if (this.upstream) { + this.upstream.stop(); + } } public async init(): Promise { @@ -234,7 +252,23 @@ export class MavenRegistry extends BaseRegistry { version: string, filename: string ): Promise { - const data = await this.storage.getMavenArtifact(groupId, artifactId, version, filename); + let data = await this.storage.getMavenArtifact(groupId, artifactId, version, filename); + + // Try upstream if not found locally + if (!data && this.upstream) { + // Parse the filename to extract extension and classifier + const { extension, classifier } = this.parseFilename(filename, artifactId, version); + if (extension) { + data = await this.upstream.fetchArtifact(groupId, artifactId, version, extension, classifier); + if (data) { + // Cache the artifact locally + await this.storage.putMavenArtifact(groupId, artifactId, version, filename, data); + // Generate and store checksums + const checksums = await calculateChecksums(data); + await this.storeChecksums(groupId, artifactId, version, filename, checksums); + } + } + } if (!data) { return { @@ -462,7 +496,17 @@ export class MavenRegistry extends BaseRegistry { // ======================================================================== private async getMetadata(groupId: string, artifactId: string): Promise { - const metadataBuffer = await this.storage.getMavenMetadata(groupId, artifactId); + let metadataBuffer = await this.storage.getMavenMetadata(groupId, artifactId); + + // Try upstream if not found locally + if (!metadataBuffer && this.upstream) { + const upstreamMetadata = await this.upstream.fetchMetadata(groupId, artifactId); + if (upstreamMetadata) { + metadataBuffer = Buffer.from(upstreamMetadata, 'utf-8'); + // Cache the metadata locally + await this.storage.putMavenMetadata(groupId, artifactId, metadataBuffer); + } + } if (!metadataBuffer) { // Generate empty metadata if none exists @@ -578,4 +622,41 @@ export class MavenRegistry extends BaseRegistry { return contentTypes[extension] || 'application/octet-stream'; } + + /** + * Parse a Maven filename to extract extension and classifier. + * Filename format: {artifactId}-{version}[-{classifier}].{extension} + */ + private parseFilename( + filename: string, + artifactId: string, + version: string + ): { extension: string; classifier?: string } { + const prefix = `${artifactId}-${version}`; + + if (!filename.startsWith(prefix)) { + // Fallback: just get the extension + const lastDot = filename.lastIndexOf('.'); + return { extension: lastDot > 0 ? filename.slice(lastDot + 1) : '' }; + } + + const remainder = filename.slice(prefix.length); + // remainder is either ".extension" or "-classifier.extension" + + if (remainder.startsWith('.')) { + return { extension: remainder.slice(1) }; + } + + if (remainder.startsWith('-')) { + const lastDot = remainder.lastIndexOf('.'); + if (lastDot > 1) { + return { + classifier: remainder.slice(1, lastDot), + extension: remainder.slice(lastDot + 1), + }; + } + } + + return { extension: '' }; + } } diff --git a/ts/maven/classes.mavenupstream.ts b/ts/maven/classes.mavenupstream.ts new file mode 100644 index 0000000..d193152 --- /dev/null +++ b/ts/maven/classes.mavenupstream.ts @@ -0,0 +1,220 @@ +import * as plugins from '../plugins.js'; +import { BaseUpstream } from '../upstream/classes.baseupstream.js'; +import type { + IProtocolUpstreamConfig, + IUpstreamFetchContext, + IUpstreamRegistryConfig, +} from '../upstream/interfaces.upstream.js'; +import type { IMavenCoordinate } from './interfaces.maven.js'; + +/** + * Maven-specific upstream implementation. + * + * Handles: + * - Artifact fetching (JAR, POM, WAR, etc.) + * - Metadata fetching (maven-metadata.xml) + * - Checksum files (.md5, .sha1, .sha256, .sha512) + * - SNAPSHOT version handling + * - Content-addressable caching for release artifacts + */ +export class MavenUpstream extends BaseUpstream { + protected readonly protocolName = 'maven'; + + constructor( + config: IProtocolUpstreamConfig, + logger?: plugins.smartlog.Smartlog, + ) { + super(config, logger); + } + + /** + * Fetch an artifact from upstream registries. + */ + public async fetchArtifact( + groupId: string, + artifactId: string, + version: string, + extension: string, + classifier?: string, + ): Promise { + const path = this.buildArtifactPath(groupId, artifactId, version, extension, classifier); + const resource = `${groupId}:${artifactId}`; + + const context: IUpstreamFetchContext = { + protocol: 'maven', + resource, + resourceType: 'artifact', + path, + method: 'GET', + headers: {}, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + return Buffer.isBuffer(result.body) ? result.body : Buffer.from(result.body); + } + + /** + * Fetch maven-metadata.xml from upstream. + */ + public async fetchMetadata(groupId: string, artifactId: string, version?: string): Promise { + const groupPath = groupId.replace(/\./g, '/'); + let path: string; + + if (version) { + // Version-level metadata (for SNAPSHOTs) + path = `/${groupPath}/${artifactId}/${version}/maven-metadata.xml`; + } else { + // Artifact-level metadata (lists all versions) + path = `/${groupPath}/${artifactId}/maven-metadata.xml`; + } + + const resource = `${groupId}:${artifactId}`; + + const context: IUpstreamFetchContext = { + protocol: 'maven', + resource, + resourceType: 'metadata', + path, + method: 'GET', + headers: { + 'accept': 'application/xml, text/xml', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return result.body.toString('utf8'); + } + + return typeof result.body === 'string' ? result.body : null; + } + + /** + * Fetch a checksum file from upstream. + */ + public async fetchChecksum( + groupId: string, + artifactId: string, + version: string, + extension: string, + checksumType: 'md5' | 'sha1' | 'sha256' | 'sha512', + classifier?: string, + ): Promise { + const basePath = this.buildArtifactPath(groupId, artifactId, version, extension, classifier); + const path = `${basePath}.${checksumType}`; + const resource = `${groupId}:${artifactId}`; + + const context: IUpstreamFetchContext = { + protocol: 'maven', + resource, + resourceType: 'checksum', + path, + method: 'GET', + headers: { + 'accept': 'text/plain', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return result.body.toString('utf8').trim(); + } + + return typeof result.body === 'string' ? result.body.trim() : null; + } + + /** + * Check if an artifact exists in upstream (HEAD request). + */ + public async headArtifact( + groupId: string, + artifactId: string, + version: string, + extension: string, + classifier?: string, + ): Promise<{ exists: boolean; size?: number; lastModified?: string } | null> { + const path = this.buildArtifactPath(groupId, artifactId, version, extension, classifier); + const resource = `${groupId}:${artifactId}`; + + const context: IUpstreamFetchContext = { + protocol: 'maven', + resource, + resourceType: 'artifact', + path, + method: 'HEAD', + headers: {}, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result) { + return null; + } + + if (!result.success) { + return { exists: false }; + } + + return { + exists: true, + size: result.headers['content-length'] ? parseInt(result.headers['content-length'], 10) : undefined, + lastModified: result.headers['last-modified'], + }; + } + + /** + * Build the path for a Maven artifact. + */ + private buildArtifactPath( + groupId: string, + artifactId: string, + version: string, + extension: string, + classifier?: string, + ): string { + const groupPath = groupId.replace(/\./g, '/'); + let filename = `${artifactId}-${version}`; + if (classifier) { + filename += `-${classifier}`; + } + filename += `.${extension}`; + + return `/${groupPath}/${artifactId}/${version}/${filename}`; + } + + /** + * Override URL building for Maven-specific handling. + */ + protected buildUpstreamUrl( + upstream: IUpstreamRegistryConfig, + context: IUpstreamFetchContext, + ): string { + let baseUrl = upstream.url; + + // Remove trailing slash + if (baseUrl.endsWith('/')) { + baseUrl = baseUrl.slice(0, -1); + } + + return `${baseUrl}${context.path}`; + } +} diff --git a/ts/maven/index.ts b/ts/maven/index.ts index 5b2e687..67d0e1b 100644 --- a/ts/maven/index.ts +++ b/ts/maven/index.ts @@ -3,5 +3,6 @@ */ export { MavenRegistry } from './classes.mavenregistry.js'; +export { MavenUpstream } from './classes.mavenupstream.js'; export * from './interfaces.maven.js'; export * from './helpers.maven.js'; diff --git a/ts/npm/classes.npmregistry.ts b/ts/npm/classes.npmregistry.ts index cfaa438..090e28f 100644 --- a/ts/npm/classes.npmregistry.ts +++ b/ts/npm/classes.npmregistry.ts @@ -3,6 +3,8 @@ import { BaseRegistry } from '../core/classes.baseregistry.js'; import { RegistryStorage } from '../core/classes.registrystorage.js'; import { AuthManager } from '../core/classes.authmanager.js'; import type { IRequestContext, IResponse, IAuthToken } from '../core/interfaces.core.js'; +import type { IProtocolUpstreamConfig } from '../upstream/interfaces.upstream.js'; +import { NpmUpstream } from './classes.npmupstream.js'; import type { IPackument, INpmVersion, @@ -25,12 +27,14 @@ export class NpmRegistry extends BaseRegistry { private basePath: string = '/npm'; private registryUrl: string; private logger: Smartlog; + private upstream: NpmUpstream | null = null; constructor( storage: RegistryStorage, authManager: AuthManager, basePath: string = '/npm', - registryUrl: string = 'http://localhost:5000/npm' + registryUrl: string = 'http://localhost:5000/npm', + upstreamConfig?: IProtocolUpstreamConfig ) { super(); this.storage = storage; @@ -50,6 +54,14 @@ export class NpmRegistry extends BaseRegistry { } }); this.logger.enableConsole(); + + // Initialize upstream if configured + if (upstreamConfig?.enabled) { + this.upstream = new NpmUpstream(upstreamConfig, registryUrl, this.logger); + this.logger.log('info', 'NPM upstream initialized', { + upstreams: upstreamConfig.upstreams.map(u => u.name), + }); + } } public async init(): Promise { @@ -209,13 +221,28 @@ export class NpmRegistry extends BaseRegistry { token: IAuthToken | null, query: Record ): Promise { - const packument = await this.storage.getNpmPackument(packageName); + let packument = await this.storage.getNpmPackument(packageName); this.logger.log('debug', `getPackument: ${packageName}`, { packageName, found: !!packument, versions: packument ? Object.keys(packument.versions).length : 0 }); + // If not found locally, try upstream + if (!packument && this.upstream) { + this.logger.log('debug', `getPackument: fetching from upstream`, { packageName }); + const upstreamPackument = await this.upstream.fetchPackument(packageName); + if (upstreamPackument) { + this.logger.log('debug', `getPackument: found in upstream`, { + packageName, + versions: Object.keys(upstreamPackument.versions || {}).length + }); + packument = upstreamPackument; + // Optionally cache the packument locally (without tarballs) + // We don't store tarballs here - they'll be fetched on demand + } + } + if (!packument) { return { status: 404, @@ -255,11 +282,21 @@ export class NpmRegistry extends BaseRegistry { token: IAuthToken | null ): Promise { this.logger.log('debug', 'handlePackageVersion', { packageName, version }); - const packument = await this.storage.getNpmPackument(packageName); + let packument = await this.storage.getNpmPackument(packageName); this.logger.log('debug', 'handlePackageVersion packument', { found: !!packument }); if (packument) { this.logger.log('debug', 'handlePackageVersion versions', { versions: Object.keys(packument.versions || {}) }); } + + // If not found locally, try upstream + if (!packument && this.upstream) { + this.logger.log('debug', 'handlePackageVersion: fetching from upstream', { packageName }); + const upstreamPackument = await this.upstream.fetchPackument(packageName); + if (upstreamPackument) { + packument = upstreamPackument; + } + } + if (!packument) { return { status: 404, @@ -529,7 +566,7 @@ export class NpmRegistry extends BaseRegistry { token: IAuthToken | null ): Promise { // Extract version from filename: package-name-1.0.0.tgz - const versionMatch = filename.match(/-([\d.]+(?:-[a-z0-9.]+)?)\.tgz$/); + const versionMatch = filename.match(/-([\d.]+(?:-[a-z0-9.]+)?)\.tgz$/i); if (!versionMatch) { return { status: 400, @@ -539,7 +576,26 @@ export class NpmRegistry extends BaseRegistry { } const version = versionMatch[1]; - const tarball = await this.storage.getNpmTarball(packageName, version); + let tarball = await this.storage.getNpmTarball(packageName, version); + + // If not found locally, try upstream + if (!tarball && this.upstream) { + this.logger.log('debug', 'handleTarballDownload: fetching from upstream', { + packageName, + version, + }); + const upstreamTarball = await this.upstream.fetchTarball(packageName, version); + if (upstreamTarball) { + tarball = upstreamTarball; + // Cache the tarball locally for future requests + await this.storage.putNpmTarball(packageName, version, tarball); + this.logger.log('debug', 'handleTarballDownload: cached tarball locally', { + packageName, + version, + size: tarball.length, + }); + } + } if (!tarball) { return { diff --git a/ts/npm/classes.npmupstream.ts b/ts/npm/classes.npmupstream.ts new file mode 100644 index 0000000..22d2bab --- /dev/null +++ b/ts/npm/classes.npmupstream.ts @@ -0,0 +1,260 @@ +import * as plugins from '../plugins.js'; +import { BaseUpstream } from '../upstream/classes.baseupstream.js'; +import type { + IProtocolUpstreamConfig, + IUpstreamFetchContext, + IUpstreamResult, + IUpstreamRegistryConfig, +} from '../upstream/interfaces.upstream.js'; +import type { IPackument, INpmVersion } from './interfaces.npm.js'; + +/** + * NPM-specific upstream implementation. + * + * Handles: + * - Package metadata (packument) fetching + * - Tarball proxying + * - Scoped package routing (@scope/* patterns) + * - NPM-specific URL rewriting + */ +export class NpmUpstream extends BaseUpstream { + protected readonly protocolName = 'npm'; + + /** Local registry URL for rewriting tarball URLs */ + private readonly localRegistryUrl: string; + + constructor( + config: IProtocolUpstreamConfig, + localRegistryUrl: string, + logger?: plugins.smartlog.Smartlog, + ) { + super(config, logger); + this.localRegistryUrl = localRegistryUrl; + } + + /** + * Fetch a packument from upstream registries. + */ + public async fetchPackument(packageName: string): Promise { + const context: IUpstreamFetchContext = { + protocol: 'npm', + resource: packageName, + resourceType: 'packument', + path: `/${encodeURIComponent(packageName).replace('%40', '@')}`, + method: 'GET', + headers: { + 'accept': 'application/json', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + // Parse and process packument + let packument: IPackument; + if (Buffer.isBuffer(result.body)) { + packument = JSON.parse(result.body.toString('utf8')); + } else { + packument = result.body; + } + + // Rewrite tarball URLs to point to local registry + packument = this.rewriteTarballUrls(packument); + + return packument; + } + + /** + * Fetch a specific version from upstream registries. + */ + public async fetchVersion(packageName: string, version: string): Promise { + const context: IUpstreamFetchContext = { + protocol: 'npm', + resource: packageName, + resourceType: 'version', + path: `/${encodeURIComponent(packageName).replace('%40', '@')}/${version}`, + method: 'GET', + headers: { + 'accept': 'application/json', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + let versionData: INpmVersion; + if (Buffer.isBuffer(result.body)) { + versionData = JSON.parse(result.body.toString('utf8')); + } else { + versionData = result.body; + } + + // Rewrite tarball URL + if (versionData.dist?.tarball) { + versionData.dist.tarball = this.rewriteSingleTarballUrl( + packageName, + versionData.version, + versionData.dist.tarball, + ); + } + + return versionData; + } + + /** + * Fetch a tarball from upstream registries. + */ + public async fetchTarball(packageName: string, version: string): Promise { + // First, try to get the tarball URL from packument + const packument = await this.fetchPackument(packageName); + let tarballPath: string; + + if (packument?.versions?.[version]?.dist?.tarball) { + // Extract path from original (upstream) tarball URL + const tarballUrl = packument.versions[version].dist.tarball; + try { + const url = new URL(tarballUrl); + tarballPath = url.pathname; + } catch { + // Fallback to standard NPM tarball path + tarballPath = this.buildTarballPath(packageName, version); + } + } else { + tarballPath = this.buildTarballPath(packageName, version); + } + + const context: IUpstreamFetchContext = { + protocol: 'npm', + resource: packageName, + resourceType: 'tarball', + path: tarballPath, + method: 'GET', + headers: { + 'accept': 'application/octet-stream', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + return Buffer.isBuffer(result.body) ? result.body : Buffer.from(result.body); + } + + /** + * Search packages in upstream registries. + */ + public async search(text: string, size: number = 20, from: number = 0): Promise { + const context: IUpstreamFetchContext = { + protocol: 'npm', + resource: '*', + resourceType: 'search', + path: '/-/v1/search', + method: 'GET', + headers: { + 'accept': 'application/json', + }, + query: { + text, + size: size.toString(), + from: from.toString(), + }, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return JSON.parse(result.body.toString('utf8')); + } + + return result.body; + } + + /** + * Build the standard NPM tarball path. + */ + private buildTarballPath(packageName: string, version: string): string { + // NPM uses: /{package}/-/{package-name}-{version}.tgz + // For scoped packages: /@scope/name/-/name-version.tgz + if (packageName.startsWith('@')) { + const [scope, name] = packageName.split('/'); + return `/${scope}/${name}/-/${name}-${version}.tgz`; + } else { + return `/${packageName}/-/${packageName}-${version}.tgz`; + } + } + + /** + * Rewrite all tarball URLs in a packument to point to local registry. + */ + private rewriteTarballUrls(packument: IPackument): IPackument { + if (!packument.versions) { + return packument; + } + + const rewritten = { ...packument }; + rewritten.versions = {}; + + for (const [version, versionData] of Object.entries(packument.versions)) { + const newVersionData = { ...versionData }; + if (newVersionData.dist?.tarball) { + newVersionData.dist = { + ...newVersionData.dist, + tarball: this.rewriteSingleTarballUrl( + packument.name, + version, + newVersionData.dist.tarball, + ), + }; + } + rewritten.versions[version] = newVersionData; + } + + return rewritten; + } + + /** + * Rewrite a single tarball URL to point to local registry. + */ + private rewriteSingleTarballUrl( + packageName: string, + version: string, + _originalUrl: string, + ): string { + // Generate local tarball URL + // Format: {localRegistryUrl}/{package}/-/{package-name}-{version}.tgz + const safeName = packageName.replace('@', '').replace('/', '-'); + return `${this.localRegistryUrl}/${packageName}/-/${safeName}-${version}.tgz`; + } + + /** + * Override URL building for NPM-specific handling. + */ + protected buildUpstreamUrl( + upstream: IUpstreamRegistryConfig, + context: IUpstreamFetchContext, + ): string { + // NPM registries often don't have trailing slashes + let baseUrl = upstream.url; + if (baseUrl.endsWith('/')) { + baseUrl = baseUrl.slice(0, -1); + } + + return `${baseUrl}${context.path}`; + } +} diff --git a/ts/npm/index.ts b/ts/npm/index.ts index b7fd083..fb78c6b 100644 --- a/ts/npm/index.ts +++ b/ts/npm/index.ts @@ -3,4 +3,5 @@ */ export { NpmRegistry } from './classes.npmregistry.js'; +export { NpmUpstream } from './classes.npmupstream.js'; export * from './interfaces.npm.js'; diff --git a/ts/oci/classes.ociregistry.ts b/ts/oci/classes.ociregistry.ts index ac8d9ee..bc9c54e 100644 --- a/ts/oci/classes.ociregistry.ts +++ b/ts/oci/classes.ociregistry.ts @@ -1,7 +1,10 @@ +import { Smartlog } from '@push.rocks/smartlog'; import { BaseRegistry } from '../core/classes.baseregistry.js'; import { RegistryStorage } from '../core/classes.registrystorage.js'; import { AuthManager } from '../core/classes.authmanager.js'; import type { IRequestContext, IResponse, IAuthToken, IRegistryError } from '../core/interfaces.core.js'; +import type { IProtocolUpstreamConfig } from '../upstream/interfaces.upstream.js'; +import { OciUpstream } from './classes.ociupstream.js'; import type { IUploadSession, IOciManifest, @@ -21,18 +24,42 @@ export class OciRegistry extends BaseRegistry { private basePath: string = '/oci'; private cleanupInterval?: NodeJS.Timeout; private ociTokens?: { realm: string; service: string }; + private upstream: OciUpstream | null = null; + private logger: Smartlog; constructor( storage: RegistryStorage, authManager: AuthManager, basePath: string = '/oci', - ociTokens?: { realm: string; service: string } + ociTokens?: { realm: string; service: string }, + upstreamConfig?: IProtocolUpstreamConfig ) { super(); this.storage = storage; this.authManager = authManager; this.basePath = basePath; this.ociTokens = ociTokens; + + // Initialize logger + this.logger = new Smartlog({ + logContext: { + company: 'push.rocks', + companyunit: 'smartregistry', + containerName: 'oci-registry', + environment: (process.env.NODE_ENV as any) || 'development', + runtime: 'node', + zone: 'oci' + } + }); + this.logger.enableConsole(); + + // Initialize upstream if configured + if (upstreamConfig?.enabled) { + this.upstream = new OciUpstream(upstreamConfig, basePath, this.logger); + this.logger.log('info', 'OCI upstream initialized', { + upstreams: upstreamConfig.upstreams.map(u => u.name), + }); + } } public async init(): Promise { @@ -302,16 +329,50 @@ export class OciRegistry extends BaseRegistry { if (!reference.startsWith('sha256:')) { const tags = await this.getTagsData(repository); digest = tags[reference]; - if (!digest) { - return { - status: 404, - headers: {}, - body: this.createError('MANIFEST_UNKNOWN', 'Manifest not found'), - }; + } + + // Try local storage first (if we have a digest) + let manifestData: Buffer | null = null; + let contentType: string | null = null; + + if (digest) { + manifestData = await this.storage.getOciManifest(repository, digest); + if (manifestData) { + contentType = await this.storage.getOciManifestContentType(repository, digest); + if (!contentType) { + contentType = this.detectManifestContentType(manifestData); + } + } + } + + // If not found locally, try upstream + if (!manifestData && this.upstream) { + this.logger.log('debug', 'getManifest: fetching from upstream', { repository, reference }); + const upstreamResult = await this.upstream.fetchManifest(repository, reference); + if (upstreamResult) { + manifestData = Buffer.from(JSON.stringify(upstreamResult.manifest), 'utf8'); + contentType = upstreamResult.contentType; + digest = upstreamResult.digest; + + // Cache the manifest locally + await this.storage.putOciManifest(repository, digest, manifestData, contentType); + + // If reference is a tag, update tags mapping + if (!reference.startsWith('sha256:')) { + const tags = await this.getTagsData(repository); + tags[reference] = digest; + const tagsPath = `oci/tags/${repository}/tags.json`; + await this.storage.putObject(tagsPath, Buffer.from(JSON.stringify(tags), 'utf-8')); + } + + this.logger.log('debug', 'getManifest: cached manifest locally', { + repository, + reference, + digest, + }); } } - const manifestData = await this.storage.getOciManifest(repository, digest); if (!manifestData) { return { status: 404, @@ -320,17 +381,10 @@ export class OciRegistry extends BaseRegistry { }; } - // Get stored content type, falling back to detecting from manifest content - let contentType = await this.storage.getOciManifestContentType(repository, digest); - if (!contentType) { - // Fallback: detect content type from manifest content - contentType = this.detectManifestContentType(manifestData); - } - return { status: 200, headers: { - 'Content-Type': contentType, + 'Content-Type': contentType || 'application/vnd.oci.image.manifest.v1+json', 'Docker-Content-Digest': digest, }, body: manifestData, @@ -466,7 +520,25 @@ export class OciRegistry extends BaseRegistry { return this.createUnauthorizedResponse(repository, 'pull'); } - const data = await this.storage.getOciBlob(digest); + // Try local storage first + let data = await this.storage.getOciBlob(digest); + + // If not found locally, try upstream + if (!data && this.upstream) { + this.logger.log('debug', 'getBlob: fetching from upstream', { repository, digest }); + const upstreamBlob = await this.upstream.fetchBlob(repository, digest); + if (upstreamBlob) { + data = upstreamBlob; + // Cache the blob locally (blobs are content-addressable and immutable) + await this.storage.putOciBlob(digest, data); + this.logger.log('debug', 'getBlob: cached blob locally', { + repository, + digest, + size: data.length, + }); + } + } + if (!data) { return { status: 404, diff --git a/ts/oci/classes.ociupstream.ts b/ts/oci/classes.ociupstream.ts new file mode 100644 index 0000000..8efb3b1 --- /dev/null +++ b/ts/oci/classes.ociupstream.ts @@ -0,0 +1,263 @@ +import * as plugins from '../plugins.js'; +import { BaseUpstream } from '../upstream/classes.baseupstream.js'; +import type { + IProtocolUpstreamConfig, + IUpstreamFetchContext, + IUpstreamResult, + IUpstreamRegistryConfig, +} from '../upstream/interfaces.upstream.js'; +import type { IOciManifest, IOciImageIndex, ITagList } from './interfaces.oci.js'; + +/** + * OCI-specific upstream implementation. + * + * Handles: + * - Manifest fetching (image manifests and index manifests) + * - Blob proxying (layers, configs) + * - Tag list fetching + * - Content-addressable caching (blobs are immutable) + * - Docker Hub authentication flow + */ +export class OciUpstream extends BaseUpstream { + protected readonly protocolName = 'oci'; + + /** Local registry base path for URL building */ + private readonly localBasePath: string; + + constructor( + config: IProtocolUpstreamConfig, + localBasePath: string = '/oci', + logger?: plugins.smartlog.Smartlog, + ) { + super(config, logger); + this.localBasePath = localBasePath; + } + + /** + * Fetch a manifest from upstream registries. + */ + public async fetchManifest( + repository: string, + reference: string, + ): Promise<{ manifest: IOciManifest | IOciImageIndex; contentType: string; digest: string } | null> { + const context: IUpstreamFetchContext = { + protocol: 'oci', + resource: repository, + resourceType: 'manifest', + path: `/v2/${repository}/manifests/${reference}`, + method: 'GET', + headers: { + 'accept': [ + 'application/vnd.oci.image.manifest.v1+json', + 'application/vnd.oci.image.index.v1+json', + 'application/vnd.docker.distribution.manifest.v2+json', + 'application/vnd.docker.distribution.manifest.list.v2+json', + 'application/vnd.docker.distribution.manifest.v1+json', + ].join(', '), + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + let manifest: IOciManifest | IOciImageIndex; + if (Buffer.isBuffer(result.body)) { + manifest = JSON.parse(result.body.toString('utf8')); + } else { + manifest = result.body; + } + + const contentType = result.headers['content-type'] || 'application/vnd.oci.image.manifest.v1+json'; + const digest = result.headers['docker-content-digest'] || ''; + + return { manifest, contentType, digest }; + } + + /** + * Check if a manifest exists in upstream (HEAD request). + */ + public async headManifest( + repository: string, + reference: string, + ): Promise<{ exists: boolean; contentType?: string; digest?: string; size?: number } | null> { + const context: IUpstreamFetchContext = { + protocol: 'oci', + resource: repository, + resourceType: 'manifest', + path: `/v2/${repository}/manifests/${reference}`, + method: 'HEAD', + headers: { + 'accept': [ + 'application/vnd.oci.image.manifest.v1+json', + 'application/vnd.oci.image.index.v1+json', + 'application/vnd.docker.distribution.manifest.v2+json', + 'application/vnd.docker.distribution.manifest.list.v2+json', + ].join(', '), + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result) { + return null; + } + + if (!result.success) { + return { exists: false }; + } + + return { + exists: true, + contentType: result.headers['content-type'], + digest: result.headers['docker-content-digest'], + size: result.headers['content-length'] ? parseInt(result.headers['content-length'], 10) : undefined, + }; + } + + /** + * Fetch a blob from upstream registries. + */ + public async fetchBlob(repository: string, digest: string): Promise { + const context: IUpstreamFetchContext = { + protocol: 'oci', + resource: repository, + resourceType: 'blob', + path: `/v2/${repository}/blobs/${digest}`, + method: 'GET', + headers: { + 'accept': 'application/octet-stream', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + return Buffer.isBuffer(result.body) ? result.body : Buffer.from(result.body); + } + + /** + * Check if a blob exists in upstream (HEAD request). + */ + public async headBlob( + repository: string, + digest: string, + ): Promise<{ exists: boolean; size?: number } | null> { + const context: IUpstreamFetchContext = { + protocol: 'oci', + resource: repository, + resourceType: 'blob', + path: `/v2/${repository}/blobs/${digest}`, + method: 'HEAD', + headers: {}, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result) { + return null; + } + + if (!result.success) { + return { exists: false }; + } + + return { + exists: true, + size: result.headers['content-length'] ? parseInt(result.headers['content-length'], 10) : undefined, + }; + } + + /** + * Fetch the tag list for a repository. + */ + public async fetchTags(repository: string, n?: number, last?: string): Promise { + const query: Record = {}; + if (n) query.n = n.toString(); + if (last) query.last = last; + + const context: IUpstreamFetchContext = { + protocol: 'oci', + resource: repository, + resourceType: 'tags', + path: `/v2/${repository}/tags/list`, + method: 'GET', + headers: { + 'accept': 'application/json', + }, + query, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + let tagList: ITagList; + if (Buffer.isBuffer(result.body)) { + tagList = JSON.parse(result.body.toString('utf8')); + } else { + tagList = result.body; + } + + return tagList; + } + + /** + * Override URL building for OCI-specific handling. + * OCI registries use /v2/ prefix and may require special handling for Docker Hub. + */ + protected buildUpstreamUrl( + upstream: IUpstreamRegistryConfig, + context: IUpstreamFetchContext, + ): string { + let baseUrl = upstream.url; + + // Remove trailing slash + if (baseUrl.endsWith('/')) { + baseUrl = baseUrl.slice(0, -1); + } + + // Handle Docker Hub special case + // Docker Hub uses registry-1.docker.io but library images need special handling + if (baseUrl.includes('docker.io') || baseUrl.includes('registry-1.docker.io')) { + // For library images (e.g., "nginx" -> "library/nginx") + const pathParts = context.path.match(/^\/v2\/([^\/]+)\/(.+)$/); + if (pathParts) { + const [, repository, rest] = pathParts; + // If repository doesn't contain a slash, it's a library image + if (!repository.includes('/')) { + return `${baseUrl}/v2/library/${repository}/${rest}`; + } + } + } + + return `${baseUrl}${context.path}`; + } + + /** + * Override header building for OCI-specific authentication. + * OCI registries may require token-based auth obtained from a separate endpoint. + */ + protected buildHeaders( + upstream: IUpstreamRegistryConfig, + context: IUpstreamFetchContext, + ): Record { + const headers = super.buildHeaders(upstream, context); + + // OCI registries typically use Docker-Distribution-API-Version header + headers['docker-distribution-api-version'] = 'registry/2.0'; + + return headers; + } +} diff --git a/ts/oci/index.ts b/ts/oci/index.ts index e0d5585..68530de 100644 --- a/ts/oci/index.ts +++ b/ts/oci/index.ts @@ -3,4 +3,5 @@ */ export { OciRegistry } from './classes.ociregistry.js'; +export { OciUpstream } from './classes.ociupstream.js'; export * from './interfaces.oci.js'; diff --git a/ts/plugins.ts b/ts/plugins.ts index b4cea6a..9936898 100644 --- a/ts/plugins.ts +++ b/ts/plugins.ts @@ -8,10 +8,16 @@ import * as smartarchive from '@push.rocks/smartarchive'; import * as smartbucket from '@push.rocks/smartbucket'; import * as smartlog from '@push.rocks/smartlog'; import * as smartpath from '@push.rocks/smartpath'; +import * as smartrequest from '@push.rocks/smartrequest'; -export { smartarchive, smartbucket, smartlog, smartpath }; +export { smartarchive, smartbucket, smartlog, smartpath, smartrequest }; // @tsclass scope import * as tsclass from '@tsclass/tsclass'; export { tsclass }; + +// third party +import { minimatch } from 'minimatch'; + +export { minimatch }; diff --git a/ts/pypi/classes.pypiregistry.ts b/ts/pypi/classes.pypiregistry.ts index 94edf87..13d5f90 100644 --- a/ts/pypi/classes.pypiregistry.ts +++ b/ts/pypi/classes.pypiregistry.ts @@ -3,6 +3,7 @@ import { BaseRegistry } from '../core/classes.baseregistry.js'; import { RegistryStorage } from '../core/classes.registrystorage.js'; import { AuthManager } from '../core/classes.authmanager.js'; import type { IRequestContext, IResponse, IAuthToken } from '../core/interfaces.core.js'; +import type { IProtocolUpstreamConfig } from '../upstream/interfaces.upstream.js'; import { isBinaryData, toBuffer } from '../core/helpers.buffer.js'; import type { IPypiPackageMetadata, @@ -11,6 +12,7 @@ import type { IPypiUploadResponse, } from './interfaces.pypi.js'; import * as helpers from './helpers.pypi.js'; +import { PypiUpstream } from './classes.pypiupstream.js'; /** * PyPI registry implementation @@ -22,12 +24,14 @@ export class PypiRegistry extends BaseRegistry { private basePath: string = '/pypi'; private registryUrl: string; private logger: Smartlog; + private upstream: PypiUpstream | null = null; constructor( storage: RegistryStorage, authManager: AuthManager, basePath: string = '/pypi', - registryUrl: string = 'http://localhost:5000' + registryUrl: string = 'http://localhost:5000', + upstreamConfig?: IProtocolUpstreamConfig ) { super(); this.storage = storage; @@ -47,6 +51,20 @@ export class PypiRegistry extends BaseRegistry { } }); this.logger.enableConsole(); + + // Initialize upstream if configured + if (upstreamConfig?.enabled) { + this.upstream = new PypiUpstream(upstreamConfig, registryUrl, this.logger); + } + } + + /** + * Clean up resources (timers, connections, etc.) + */ + public destroy(): void { + if (this.upstream) { + this.upstream.stop(); + } } public async init(): Promise { @@ -214,7 +232,45 @@ export class PypiRegistry extends BaseRegistry { const normalized = helpers.normalizePypiPackageName(packageName); // Get package metadata - const metadata = await this.storage.getPypiPackageMetadata(normalized); + let metadata = await this.storage.getPypiPackageMetadata(normalized); + + // Try upstream if not found locally + if (!metadata && this.upstream) { + const upstreamHtml = await this.upstream.fetchSimplePackage(normalized); + if (upstreamHtml) { + // Parse the HTML to extract file information and cache it + // For now, just return the upstream HTML directly (caching can be improved later) + const acceptHeader = context.headers['accept'] || context.headers['Accept'] || ''; + const preferJson = acceptHeader.includes('application/vnd.pypi.simple') && + acceptHeader.includes('json'); + + if (preferJson) { + // Try to get JSON format from upstream + const upstreamJson = await this.upstream.fetchPackageJson(normalized); + if (upstreamJson) { + return { + status: 200, + headers: { + 'Content-Type': 'application/vnd.pypi.simple.v1+json', + 'Cache-Control': 'public, max-age=300' + }, + body: upstreamJson, + }; + } + } + + // Return HTML format + return { + status: 200, + headers: { + 'Content-Type': 'text/html; charset=utf-8', + 'Cache-Control': 'public, max-age=300' + }, + body: upstreamHtml, + }; + } + } + if (!metadata) { return this.errorResponse(404, 'Package not found'); } @@ -449,7 +505,16 @@ export class PypiRegistry extends BaseRegistry { */ private async handleDownload(packageName: string, filename: string): Promise { const normalized = helpers.normalizePypiPackageName(packageName); - const fileData = await this.storage.getPypiPackageFile(normalized, filename); + let fileData = await this.storage.getPypiPackageFile(normalized, filename); + + // Try upstream if not found locally + if (!fileData && this.upstream) { + fileData = await this.upstream.fetchPackageFile(normalized, filename); + if (fileData) { + // Cache locally + await this.storage.putPypiPackageFile(normalized, filename, fileData); + } + } if (!fileData) { return { diff --git a/ts/pypi/classes.pypiupstream.ts b/ts/pypi/classes.pypiupstream.ts new file mode 100644 index 0000000..117748c --- /dev/null +++ b/ts/pypi/classes.pypiupstream.ts @@ -0,0 +1,211 @@ +import * as plugins from '../plugins.js'; +import { BaseUpstream } from '../upstream/classes.baseupstream.js'; +import type { + IProtocolUpstreamConfig, + IUpstreamFetchContext, + IUpstreamRegistryConfig, +} from '../upstream/interfaces.upstream.js'; + +/** + * PyPI-specific upstream implementation. + * + * Handles: + * - Simple API (HTML) - PEP 503 + * - JSON API - PEP 691 + * - Package file downloads (wheels, sdists) + * - Package name normalization + */ +export class PypiUpstream extends BaseUpstream { + protected readonly protocolName = 'pypi'; + + /** Local registry URL for rewriting download URLs */ + private readonly localRegistryUrl: string; + + constructor( + config: IProtocolUpstreamConfig, + localRegistryUrl: string, + logger?: plugins.smartlog.Smartlog, + ) { + super(config, logger); + this.localRegistryUrl = localRegistryUrl; + } + + /** + * Fetch Simple API index (list of all packages) in HTML format. + */ + public async fetchSimpleIndex(): Promise { + const context: IUpstreamFetchContext = { + protocol: 'pypi', + resource: '*', + resourceType: 'index', + path: '/simple/', + method: 'GET', + headers: { + 'accept': 'text/html', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return result.body.toString('utf8'); + } + + return typeof result.body === 'string' ? result.body : null; + } + + /** + * Fetch Simple API package page (list of files) in HTML format. + */ + public async fetchSimplePackage(packageName: string): Promise { + const normalizedName = this.normalizePackageName(packageName); + const path = `/simple/${normalizedName}/`; + + const context: IUpstreamFetchContext = { + protocol: 'pypi', + resource: packageName, + resourceType: 'simple', + path, + method: 'GET', + headers: { + 'accept': 'text/html', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return result.body.toString('utf8'); + } + + return typeof result.body === 'string' ? result.body : null; + } + + /** + * Fetch package metadata using JSON API (PEP 691). + */ + public async fetchPackageJson(packageName: string): Promise { + const normalizedName = this.normalizePackageName(packageName); + const path = `/simple/${normalizedName}/`; + + const context: IUpstreamFetchContext = { + protocol: 'pypi', + resource: packageName, + resourceType: 'metadata', + path, + method: 'GET', + headers: { + 'accept': 'application/vnd.pypi.simple.v1+json', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return JSON.parse(result.body.toString('utf8')); + } + + return result.body; + } + + /** + * Fetch full package info from PyPI JSON API (/pypi/{package}/json). + */ + public async fetchPypiJson(packageName: string): Promise { + const normalizedName = this.normalizePackageName(packageName); + const path = `/pypi/${normalizedName}/json`; + + const context: IUpstreamFetchContext = { + protocol: 'pypi', + resource: packageName, + resourceType: 'pypi-json', + path, + method: 'GET', + headers: { + 'accept': 'application/json', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return JSON.parse(result.body.toString('utf8')); + } + + return result.body; + } + + /** + * Fetch a package file (wheel or sdist) from upstream. + */ + public async fetchPackageFile(packageName: string, filename: string): Promise { + const normalizedName = this.normalizePackageName(packageName); + const path = `/packages/${normalizedName}/${filename}`; + + const context: IUpstreamFetchContext = { + protocol: 'pypi', + resource: packageName, + resourceType: 'package', + path, + method: 'GET', + headers: { + 'accept': 'application/octet-stream', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + return Buffer.isBuffer(result.body) ? result.body : Buffer.from(result.body); + } + + /** + * Normalize a PyPI package name according to PEP 503. + * - Lowercase all characters + * - Replace runs of ., -, _ with single - + */ + private normalizePackageName(name: string): string { + return name.toLowerCase().replace(/[-_.]+/g, '-'); + } + + /** + * Override URL building for PyPI-specific handling. + */ + protected buildUpstreamUrl( + upstream: IUpstreamRegistryConfig, + context: IUpstreamFetchContext, + ): string { + let baseUrl = upstream.url; + + // Remove trailing slash + if (baseUrl.endsWith('/')) { + baseUrl = baseUrl.slice(0, -1); + } + + return `${baseUrl}${context.path}`; + } +} diff --git a/ts/pypi/index.ts b/ts/pypi/index.ts index a2361e8..938dd0c 100644 --- a/ts/pypi/index.ts +++ b/ts/pypi/index.ts @@ -5,4 +5,5 @@ export * from './interfaces.pypi.js'; export * from './classes.pypiregistry.js'; +export { PypiUpstream } from './classes.pypiupstream.js'; export * as pypiHelpers from './helpers.pypi.js'; diff --git a/ts/rubygems/classes.rubygemsregistry.ts b/ts/rubygems/classes.rubygemsregistry.ts index 8a60d4d..8e8dc9c 100644 --- a/ts/rubygems/classes.rubygemsregistry.ts +++ b/ts/rubygems/classes.rubygemsregistry.ts @@ -3,6 +3,7 @@ import { BaseRegistry } from '../core/classes.baseregistry.js'; import { RegistryStorage } from '../core/classes.registrystorage.js'; import { AuthManager } from '../core/classes.authmanager.js'; import type { IRequestContext, IResponse, IAuthToken } from '../core/interfaces.core.js'; +import type { IProtocolUpstreamConfig } from '../upstream/interfaces.upstream.js'; import type { IRubyGemsMetadata, IRubyGemsVersionMetadata, @@ -12,6 +13,7 @@ import type { ICompactIndexInfoEntry, } from './interfaces.rubygems.js'; import * as helpers from './helpers.rubygems.js'; +import { RubygemsUpstream } from './classes.rubygemsupstream.js'; /** * RubyGems registry implementation @@ -23,12 +25,14 @@ export class RubyGemsRegistry extends BaseRegistry { private basePath: string = '/rubygems'; private registryUrl: string; private logger: Smartlog; + private upstream: RubygemsUpstream | null = null; constructor( storage: RegistryStorage, authManager: AuthManager, basePath: string = '/rubygems', - registryUrl: string = 'http://localhost:5000/rubygems' + registryUrl: string = 'http://localhost:5000/rubygems', + upstreamConfig?: IProtocolUpstreamConfig ) { super(); this.storage = storage; @@ -48,6 +52,20 @@ export class RubyGemsRegistry extends BaseRegistry { } }); this.logger.enableConsole(); + + // Initialize upstream if configured + if (upstreamConfig?.enabled) { + this.upstream = new RubygemsUpstream(upstreamConfig, this.logger); + } + } + + /** + * Clean up resources (timers, connections, etc.) + */ + public destroy(): void { + if (this.upstream) { + this.upstream.stop(); + } } public async init(): Promise { @@ -215,7 +233,17 @@ export class RubyGemsRegistry extends BaseRegistry { * Handle /info/{gem} endpoint (Compact Index) */ private async handleInfoFile(gemName: string): Promise { - const content = await this.storage.getRubyGemsInfo(gemName); + let content = await this.storage.getRubyGemsInfo(gemName); + + // Try upstream if not found locally + if (!content && this.upstream) { + const upstreamInfo = await this.upstream.fetchInfo(gemName); + if (upstreamInfo) { + // Cache locally + await this.storage.putRubyGemsInfo(gemName, upstreamInfo); + content = upstreamInfo; + } + } if (!content) { return { @@ -245,12 +273,21 @@ export class RubyGemsRegistry extends BaseRegistry { return this.errorResponse(400, 'Invalid gem filename'); } - const gemData = await this.storage.getRubyGemsGem( + let gemData = await this.storage.getRubyGemsGem( parsed.name, parsed.version, parsed.platform ); + // Try upstream if not found locally + if (!gemData && this.upstream) { + gemData = await this.upstream.fetchGem(parsed.name, parsed.version); + if (gemData) { + // Cache locally + await this.storage.putRubyGemsGem(parsed.name, parsed.version, gemData, parsed.platform); + } + } + if (!gemData) { return this.errorResponse(404, 'Gem not found'); } diff --git a/ts/rubygems/classes.rubygemsupstream.ts b/ts/rubygems/classes.rubygemsupstream.ts new file mode 100644 index 0000000..a58ccd3 --- /dev/null +++ b/ts/rubygems/classes.rubygemsupstream.ts @@ -0,0 +1,230 @@ +import * as plugins from '../plugins.js'; +import { BaseUpstream } from '../upstream/classes.baseupstream.js'; +import type { + IProtocolUpstreamConfig, + IUpstreamFetchContext, + IUpstreamRegistryConfig, +} from '../upstream/interfaces.upstream.js'; + +/** + * RubyGems-specific upstream implementation. + * + * Handles: + * - Compact Index format (/versions, /info/{gem}, /names) + * - Gem file (.gem) downloading + * - Gem spec fetching + * - HTTP Range requests for incremental updates + */ +export class RubygemsUpstream extends BaseUpstream { + protected readonly protocolName = 'rubygems'; + + constructor( + config: IProtocolUpstreamConfig, + logger?: plugins.smartlog.Smartlog, + ) { + super(config, logger); + } + + /** + * Fetch the /versions file (master list of all gems). + */ + public async fetchVersions(etag?: string): Promise<{ data: string; etag?: string } | null> { + const headers: Record = { + 'accept': 'text/plain', + }; + + if (etag) { + headers['if-none-match'] = etag; + } + + const context: IUpstreamFetchContext = { + protocol: 'rubygems', + resource: '*', + resourceType: 'versions', + path: '/versions', + method: 'GET', + headers, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + let data: string; + if (Buffer.isBuffer(result.body)) { + data = result.body.toString('utf8'); + } else if (typeof result.body === 'string') { + data = result.body; + } else { + return null; + } + + return { + data, + etag: result.headers['etag'], + }; + } + + /** + * Fetch gem info file (/info/{gemname}). + */ + public async fetchInfo(gemName: string): Promise { + const context: IUpstreamFetchContext = { + protocol: 'rubygems', + resource: gemName, + resourceType: 'info', + path: `/info/${gemName}`, + method: 'GET', + headers: { + 'accept': 'text/plain', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return result.body.toString('utf8'); + } + + return typeof result.body === 'string' ? result.body : null; + } + + /** + * Fetch the /names file (list of all gem names). + */ + public async fetchNames(): Promise { + const context: IUpstreamFetchContext = { + protocol: 'rubygems', + resource: '*', + resourceType: 'names', + path: '/names', + method: 'GET', + headers: { + 'accept': 'text/plain', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return result.body.toString('utf8'); + } + + return typeof result.body === 'string' ? result.body : null; + } + + /** + * Fetch a gem file. + */ + public async fetchGem(gemName: string, version: string): Promise { + const path = `/gems/${gemName}-${version}.gem`; + + const context: IUpstreamFetchContext = { + protocol: 'rubygems', + resource: gemName, + resourceType: 'gem', + path, + method: 'GET', + headers: { + 'accept': 'application/octet-stream', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + return Buffer.isBuffer(result.body) ? result.body : Buffer.from(result.body); + } + + /** + * Fetch gem spec (quick spec). + */ + public async fetchQuickSpec(gemName: string, version: string): Promise { + const path = `/quick/Marshal.4.8/${gemName}-${version}.gemspec.rz`; + + const context: IUpstreamFetchContext = { + protocol: 'rubygems', + resource: gemName, + resourceType: 'spec', + path, + method: 'GET', + headers: { + 'accept': 'application/octet-stream', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + return Buffer.isBuffer(result.body) ? result.body : Buffer.from(result.body); + } + + /** + * Fetch gem versions JSON from API. + */ + public async fetchVersionsJson(gemName: string): Promise { + const path = `/api/v1/versions/${gemName}.json`; + + const context: IUpstreamFetchContext = { + protocol: 'rubygems', + resource: gemName, + resourceType: 'versions-json', + path, + method: 'GET', + headers: { + 'accept': 'application/json', + }, + query: {}, + }; + + const result = await this.fetch(context); + + if (!result || !result.success) { + return null; + } + + if (Buffer.isBuffer(result.body)) { + return JSON.parse(result.body.toString('utf8')); + } + + return Array.isArray(result.body) ? result.body : null; + } + + /** + * Override URL building for RubyGems-specific handling. + */ + protected buildUpstreamUrl( + upstream: IUpstreamRegistryConfig, + context: IUpstreamFetchContext, + ): string { + let baseUrl = upstream.url; + + // Remove trailing slash + if (baseUrl.endsWith('/')) { + baseUrl = baseUrl.slice(0, -1); + } + + return `${baseUrl}${context.path}`; + } +} diff --git a/ts/rubygems/index.ts b/ts/rubygems/index.ts index c801d2f..dfceb41 100644 --- a/ts/rubygems/index.ts +++ b/ts/rubygems/index.ts @@ -5,4 +5,5 @@ export * from './interfaces.rubygems.js'; export * from './classes.rubygemsregistry.js'; +export { RubygemsUpstream } from './classes.rubygemsupstream.js'; export * as rubygemsHelpers from './helpers.rubygems.js'; diff --git a/ts/upstream/classes.baseupstream.ts b/ts/upstream/classes.baseupstream.ts new file mode 100644 index 0000000..7b22c51 --- /dev/null +++ b/ts/upstream/classes.baseupstream.ts @@ -0,0 +1,521 @@ +import * as plugins from '../plugins.js'; +import type { + IUpstreamRegistryConfig, + IUpstreamAuthConfig, + IUpstreamCacheConfig, + IUpstreamResilienceConfig, + IUpstreamResult, + IUpstreamFetchContext, + IProtocolUpstreamConfig, + IUpstreamScopeRule, + TCircuitState, +} from './interfaces.upstream.js'; +import { + DEFAULT_CACHE_CONFIG, + DEFAULT_RESILIENCE_CONFIG, +} from './interfaces.upstream.js'; +import { CircuitBreaker, CircuitOpenError, withCircuitBreaker } from './classes.circuitbreaker.js'; +import { UpstreamCache } from './classes.upstreamcache.js'; + +/** + * Base class for protocol-specific upstream implementations. + * + * Provides: + * - Multi-upstream routing with priority + * - Scope-based filtering (glob patterns) + * - Authentication handling + * - Circuit breaker per upstream + * - Caching with TTL + * - Retry with exponential backoff + * - 429 rate limit handling + */ +export abstract class BaseUpstream { + /** Protocol name for logging */ + protected abstract readonly protocolName: string; + + /** Upstream configuration */ + protected readonly config: IProtocolUpstreamConfig; + + /** Resolved cache configuration */ + protected readonly cacheConfig: IUpstreamCacheConfig; + + /** Resolved resilience configuration */ + protected readonly resilienceConfig: IUpstreamResilienceConfig; + + /** Circuit breakers per upstream */ + protected readonly circuitBreakers: Map = new Map(); + + /** Upstream cache */ + protected readonly cache: UpstreamCache; + + /** Logger instance */ + protected readonly logger: plugins.smartlog.Smartlog; + + constructor(config: IProtocolUpstreamConfig, logger?: plugins.smartlog.Smartlog) { + this.config = config; + this.cacheConfig = { ...DEFAULT_CACHE_CONFIG, ...config.cache }; + this.resilienceConfig = { ...DEFAULT_RESILIENCE_CONFIG, ...config.resilience }; + this.cache = new UpstreamCache(this.cacheConfig); + this.logger = logger || new plugins.smartlog.Smartlog({ + logContext: { + company: 'smartregistry', + companyunit: 'upstream', + environment: 'production', + runtime: 'node', + } + }); + + // Initialize circuit breakers for each upstream + for (const upstream of config.upstreams) { + const upstreamResilience = { ...this.resilienceConfig, ...upstream.resilience }; + this.circuitBreakers.set(upstream.id, new CircuitBreaker(upstream.id, upstreamResilience)); + } + } + + /** + * Check if upstream is enabled. + */ + public isEnabled(): boolean { + return this.config.enabled; + } + + /** + * Get all configured upstreams. + */ + public getUpstreams(): IUpstreamRegistryConfig[] { + return this.config.upstreams; + } + + /** + * Get circuit breaker state for an upstream. + */ + public getCircuitState(upstreamId: string): TCircuitState | null { + const breaker = this.circuitBreakers.get(upstreamId); + return breaker ? breaker.getState() : null; + } + + /** + * Get cache statistics. + */ + public getCacheStats() { + return this.cache.getStats(); + } + + /** + * Fetch a resource from upstreams. + * Tries upstreams in priority order, respecting circuit breakers and scope rules. + */ + public async fetch(context: IUpstreamFetchContext): Promise { + if (!this.config.enabled) { + return null; + } + + // Check cache first + const cached = this.cache.get(context); + if (cached && !cached.stale) { + return { + success: true, + status: 200, + headers: cached.headers, + body: cached.data, + upstreamId: cached.upstreamId, + fromCache: true, + latencyMs: 0, + }; + } + + // Check for negative cache (recent 404) + if (this.cache.hasNegative(context)) { + return { + success: false, + status: 404, + headers: {}, + upstreamId: 'cache', + fromCache: true, + latencyMs: 0, + }; + } + + // Get applicable upstreams sorted by priority + const applicableUpstreams = this.getApplicableUpstreams(context.resource); + + if (applicableUpstreams.length === 0) { + return null; + } + + // If we have stale cache, return it immediately and revalidate in background + if (cached?.stale && this.cacheConfig.staleWhileRevalidate) { + // Fire and forget revalidation + this.revalidateInBackground(context, applicableUpstreams); + return { + success: true, + status: 200, + headers: cached.headers, + body: cached.data, + upstreamId: cached.upstreamId, + fromCache: true, + latencyMs: 0, + }; + } + + // Try each upstream in order + let lastError: Error | null = null; + + for (const upstream of applicableUpstreams) { + const breaker = this.circuitBreakers.get(upstream.id); + if (!breaker) continue; + + try { + const result = await withCircuitBreaker( + breaker, + () => this.fetchFromUpstream(upstream, context), + ); + + // Cache successful responses + if (result.success && result.body) { + this.cache.set( + context, + Buffer.isBuffer(result.body) ? result.body : Buffer.from(JSON.stringify(result.body)), + result.headers['content-type'] || 'application/octet-stream', + result.headers, + upstream.id, + ); + } + + // Cache 404 responses + if (result.status === 404) { + this.cache.setNegative(context, upstream.id); + } + + return result; + } catch (error) { + if (error instanceof CircuitOpenError) { + this.logger.log('debug', `Circuit open for upstream ${upstream.id}, trying next`); + } else { + this.logger.log('warn', `Upstream ${upstream.id} failed: ${(error as Error).message}`); + } + lastError = error as Error; + // Continue to next upstream + } + } + + // All upstreams failed + if (lastError) { + this.logger.log('error', `All upstreams failed for ${context.resource}: ${lastError.message}`); + } + + return null; + } + + /** + * Invalidate cache for a resource pattern. + */ + public invalidateCache(pattern: RegExp): number { + return this.cache.invalidatePattern(pattern); + } + + /** + * Clear all cache entries. + */ + public clearCache(): void { + this.cache.clear(); + } + + /** + * Stop the upstream (cleanup resources). + */ + public stop(): void { + this.cache.stop(); + } + + /** + * Get upstreams that apply to a resource, sorted by priority. + */ + protected getApplicableUpstreams(resource: string): IUpstreamRegistryConfig[] { + return this.config.upstreams + .filter(upstream => { + if (!upstream.enabled) return false; + + // Check circuit breaker + const breaker = this.circuitBreakers.get(upstream.id); + if (breaker && !breaker.canRequest()) return false; + + // Check scope rules + return this.matchesScopeRules(resource, upstream.scopeRules); + }) + .sort((a, b) => a.priority - b.priority); + } + + /** + * Check if a resource matches scope rules. + * Empty rules = match all. + */ + protected matchesScopeRules(resource: string, rules?: IUpstreamScopeRule[]): boolean { + if (!rules || rules.length === 0) { + return true; + } + + // Process rules in order + // Start with default exclude (nothing matches) + let matched = false; + + for (const rule of rules) { + const isMatch = plugins.minimatch(resource, rule.pattern); + if (isMatch) { + matched = rule.action === 'include'; + } + } + + return matched; + } + + /** + * Fetch from a specific upstream with retry logic. + */ + protected async fetchFromUpstream( + upstream: IUpstreamRegistryConfig, + context: IUpstreamFetchContext, + ): Promise { + const upstreamResilience = { ...this.resilienceConfig, ...upstream.resilience }; + const startTime = Date.now(); + + let lastError: Error | null = null; + + for (let attempt = 0; attempt <= upstreamResilience.maxRetries; attempt++) { + try { + const result = await this.executeRequest(upstream, context, upstreamResilience.timeoutMs); + return { + ...result, + upstreamId: upstream.id, + fromCache: false, + latencyMs: Date.now() - startTime, + }; + } catch (error) { + lastError = error as Error; + + // Don't retry on 4xx errors (except 429) + if (this.isNonRetryableError(error)) { + break; + } + + // Calculate delay with exponential backoff and jitter + if (attempt < upstreamResilience.maxRetries) { + const delay = this.calculateBackoffDelay( + attempt, + upstreamResilience.retryDelayMs, + upstreamResilience.retryMaxDelayMs, + ); + await this.sleep(delay); + } + } + } + + throw lastError || new Error('Request failed'); + } + + /** + * Execute a single HTTP request to an upstream. + */ + protected async executeRequest( + upstream: IUpstreamRegistryConfig, + context: IUpstreamFetchContext, + timeoutMs: number, + ): Promise> { + // Build the full URL + const url = this.buildUpstreamUrl(upstream, context); + + // Build headers with auth + const headers = this.buildHeaders(upstream, context); + + // Make the request using SmartRequest + const request = plugins.smartrequest.SmartRequest.create() + .url(url) + .method(context.method as any) + .headers(headers) + .timeout(timeoutMs) + .handle429Backoff({ maxRetries: 3, fallbackDelay: 1000, maxWaitTime: 30000 }); + + // Add query params if present + if (Object.keys(context.query).length > 0) { + request.query(context.query); + } + + let response: plugins.smartrequest.ICoreResponse; + + switch (context.method.toUpperCase()) { + case 'GET': + response = await request.get(); + break; + case 'HEAD': + // SmartRequest doesn't have head(), use options + response = await request.method('HEAD').get(); + break; + default: + response = await request.get(); + } + + // Parse response + const responseHeaders: Record = {}; + for (const [key, value] of Object.entries(response.headers)) { + responseHeaders[key.toLowerCase()] = Array.isArray(value) ? value[0] : value; + } + + let body: Buffer | any; + const contentType = responseHeaders['content-type'] || ''; + + if (response.ok) { + if (contentType.includes('application/json')) { + body = await response.json(); + } else { + const arrayBuffer = await response.arrayBuffer(); + body = Buffer.from(arrayBuffer); + } + } + + return { + success: response.ok, + status: response.status, + headers: responseHeaders, + body, + }; + } + + /** + * Build the full URL for an upstream request. + * Subclasses can override for protocol-specific URL building. + */ + protected buildUpstreamUrl(upstream: IUpstreamRegistryConfig, context: IUpstreamFetchContext): string { + // Remove leading slash if URL already has trailing slash + let path = context.path; + if (upstream.url.endsWith('/') && path.startsWith('/')) { + path = path.slice(1); + } + return `${upstream.url}${path}`; + } + + /** + * Build headers including authentication. + */ + protected buildHeaders( + upstream: IUpstreamRegistryConfig, + context: IUpstreamFetchContext, + ): Record { + const headers: Record = { ...context.headers }; + + // Remove host header (will be set by HTTP client) + delete headers['host']; + + // Add authentication + this.addAuthHeaders(headers, upstream.auth); + + return headers; + } + + /** + * Add authentication headers based on auth config. + */ + protected addAuthHeaders(headers: Record, auth: IUpstreamAuthConfig): void { + switch (auth.type) { + case 'basic': + if (auth.username && auth.password) { + const credentials = Buffer.from(`${auth.username}:${auth.password}`).toString('base64'); + headers['authorization'] = `Basic ${credentials}`; + } + break; + case 'bearer': + if (auth.token) { + headers['authorization'] = `Bearer ${auth.token}`; + } + break; + case 'api-key': + if (auth.token) { + const headerName = auth.headerName || 'authorization'; + headers[headerName.toLowerCase()] = auth.token; + } + break; + case 'none': + default: + // No authentication + break; + } + } + + /** + * Check if an error should not be retried. + */ + protected isNonRetryableError(error: unknown): boolean { + // Check for HTTP status errors + if (error && typeof error === 'object' && 'status' in error) { + const status = (error as { status: number }).status; + // Don't retry 4xx errors except 429 (rate limited) + if (status >= 400 && status < 500 && status !== 429) { + return true; + } + } + return false; + } + + /** + * Calculate backoff delay with exponential backoff and jitter. + */ + protected calculateBackoffDelay( + attempt: number, + baseDelayMs: number, + maxDelayMs: number, + ): number { + // Exponential backoff: delay = base * 2^attempt + const exponentialDelay = baseDelayMs * Math.pow(2, attempt); + + // Cap at max delay + const cappedDelay = Math.min(exponentialDelay, maxDelayMs); + + // Add jitter (±25%) + const jitter = cappedDelay * 0.25 * (Math.random() * 2 - 1); + + return Math.floor(cappedDelay + jitter); + } + + /** + * Sleep for a specified duration. + */ + protected sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); + } + + /** + * Revalidate cache in background. + */ + protected async revalidateInBackground( + context: IUpstreamFetchContext, + upstreams: IUpstreamRegistryConfig[], + ): Promise { + try { + for (const upstream of upstreams) { + const breaker = this.circuitBreakers.get(upstream.id); + if (!breaker || !breaker.canRequest()) continue; + + try { + const result = await withCircuitBreaker( + breaker, + () => this.fetchFromUpstream(upstream, context), + ); + + if (result.success && result.body) { + this.cache.set( + context, + Buffer.isBuffer(result.body) ? result.body : Buffer.from(JSON.stringify(result.body)), + result.headers['content-type'] || 'application/octet-stream', + result.headers, + upstream.id, + ); + return; // Successfully revalidated + } + } catch { + // Continue to next upstream + } + } + } catch (error) { + this.logger.log('debug', `Background revalidation failed: ${(error as Error).message}`); + } + } +} diff --git a/ts/upstream/classes.circuitbreaker.ts b/ts/upstream/classes.circuitbreaker.ts new file mode 100644 index 0000000..b80943f --- /dev/null +++ b/ts/upstream/classes.circuitbreaker.ts @@ -0,0 +1,238 @@ +import type { TCircuitState, IUpstreamResilienceConfig } from './interfaces.upstream.js'; +import { DEFAULT_RESILIENCE_CONFIG } from './interfaces.upstream.js'; + +/** + * Circuit breaker implementation for upstream resilience. + * + * States: + * - CLOSED: Normal operation, requests pass through + * - OPEN: Circuit is tripped, requests fail fast + * - HALF_OPEN: Testing if upstream has recovered + * + * Transitions: + * - CLOSED → OPEN: When failure count exceeds threshold + * - OPEN → HALF_OPEN: After reset timeout expires + * - HALF_OPEN → CLOSED: On successful request + * - HALF_OPEN → OPEN: On failed request + */ +export class CircuitBreaker { + /** Unique identifier for logging and metrics */ + public readonly id: string; + + /** Current circuit state */ + private state: TCircuitState = 'CLOSED'; + + /** Count of consecutive failures */ + private failureCount: number = 0; + + /** Timestamp when circuit was opened */ + private openedAt: number = 0; + + /** Number of successful requests in half-open state */ + private halfOpenSuccesses: number = 0; + + /** Configuration */ + private readonly config: IUpstreamResilienceConfig; + + /** Number of successes required to close circuit from half-open */ + private readonly halfOpenThreshold: number = 2; + + constructor(id: string, config?: Partial) { + this.id = id; + this.config = { ...DEFAULT_RESILIENCE_CONFIG, ...config }; + } + + /** + * Get current circuit state. + */ + public getState(): TCircuitState { + // Check if we should transition from OPEN to HALF_OPEN + if (this.state === 'OPEN') { + const elapsed = Date.now() - this.openedAt; + if (elapsed >= this.config.circuitBreakerResetMs) { + this.transitionTo('HALF_OPEN'); + } + } + return this.state; + } + + /** + * Check if circuit allows requests. + * Returns true if requests should be allowed. + */ + public canRequest(): boolean { + const currentState = this.getState(); + return currentState !== 'OPEN'; + } + + /** + * Record a successful request. + * May transition circuit from HALF_OPEN to CLOSED. + */ + public recordSuccess(): void { + if (this.state === 'HALF_OPEN') { + this.halfOpenSuccesses++; + if (this.halfOpenSuccesses >= this.halfOpenThreshold) { + this.transitionTo('CLOSED'); + } + } else if (this.state === 'CLOSED') { + // Reset failure count on success + this.failureCount = 0; + } + } + + /** + * Record a failed request. + * May transition circuit from CLOSED/HALF_OPEN to OPEN. + */ + public recordFailure(): void { + if (this.state === 'HALF_OPEN') { + // Any failure in half-open immediately opens circuit + this.transitionTo('OPEN'); + } else if (this.state === 'CLOSED') { + this.failureCount++; + if (this.failureCount >= this.config.circuitBreakerThreshold) { + this.transitionTo('OPEN'); + } + } + } + + /** + * Force circuit to open state. + * Useful for manual intervention or external health checks. + */ + public forceOpen(): void { + this.transitionTo('OPEN'); + } + + /** + * Force circuit to closed state. + * Useful for manual intervention after fixing upstream issues. + */ + public forceClose(): void { + this.transitionTo('CLOSED'); + } + + /** + * Reset circuit to initial state. + */ + public reset(): void { + this.state = 'CLOSED'; + this.failureCount = 0; + this.openedAt = 0; + this.halfOpenSuccesses = 0; + } + + /** + * Get circuit metrics for monitoring. + */ + public getMetrics(): ICircuitBreakerMetrics { + return { + id: this.id, + state: this.getState(), + failureCount: this.failureCount, + openedAt: this.openedAt > 0 ? new Date(this.openedAt) : null, + timeUntilHalfOpen: this.state === 'OPEN' + ? Math.max(0, this.config.circuitBreakerResetMs - (Date.now() - this.openedAt)) + : 0, + halfOpenSuccesses: this.halfOpenSuccesses, + threshold: this.config.circuitBreakerThreshold, + resetMs: this.config.circuitBreakerResetMs, + }; + } + + /** + * Transition to a new state with proper cleanup. + */ + private transitionTo(newState: TCircuitState): void { + const previousState = this.state; + this.state = newState; + + switch (newState) { + case 'OPEN': + this.openedAt = Date.now(); + this.halfOpenSuccesses = 0; + break; + case 'HALF_OPEN': + this.halfOpenSuccesses = 0; + break; + case 'CLOSED': + this.failureCount = 0; + this.openedAt = 0; + this.halfOpenSuccesses = 0; + break; + } + + // Log state transition (useful for debugging and monitoring) + // In production, this would emit events or metrics + if (previousState !== newState) { + // State changed - could emit event here + } + } +} + +/** + * Metrics for circuit breaker monitoring. + */ +export interface ICircuitBreakerMetrics { + /** Circuit breaker identifier */ + id: string; + /** Current state */ + state: TCircuitState; + /** Number of consecutive failures */ + failureCount: number; + /** When circuit was opened (null if never opened) */ + openedAt: Date | null; + /** Milliseconds until circuit transitions to half-open (0 if not open) */ + timeUntilHalfOpen: number; + /** Number of successes in half-open state */ + halfOpenSuccesses: number; + /** Failure threshold for opening circuit */ + threshold: number; + /** Reset timeout in milliseconds */ + resetMs: number; +} + +/** + * Execute a function with circuit breaker protection. + * + * @param breaker The circuit breaker to use + * @param fn The async function to execute + * @param fallback Optional fallback function when circuit is open + * @returns The result of fn or fallback + * @throws CircuitOpenError if circuit is open and no fallback provided + */ +export async function withCircuitBreaker( + breaker: CircuitBreaker, + fn: () => Promise, + fallback?: () => Promise, +): Promise { + if (!breaker.canRequest()) { + if (fallback) { + return fallback(); + } + throw new CircuitOpenError(breaker.id); + } + + try { + const result = await fn(); + breaker.recordSuccess(); + return result; + } catch (error) { + breaker.recordFailure(); + throw error; + } +} + +/** + * Error thrown when circuit is open and no fallback is provided. + */ +export class CircuitOpenError extends Error { + public readonly circuitId: string; + + constructor(circuitId: string) { + super(`Circuit breaker '${circuitId}' is open`); + this.name = 'CircuitOpenError'; + this.circuitId = circuitId; + } +} diff --git a/ts/upstream/classes.upstreamcache.ts b/ts/upstream/classes.upstreamcache.ts new file mode 100644 index 0000000..8442b92 --- /dev/null +++ b/ts/upstream/classes.upstreamcache.ts @@ -0,0 +1,423 @@ +import type { + ICacheEntry, + IUpstreamCacheConfig, + IUpstreamFetchContext, +} from './interfaces.upstream.js'; +import { DEFAULT_CACHE_CONFIG } from './interfaces.upstream.js'; + +/** + * In-memory cache for upstream responses. + * + * Features: + * - TTL-based expiration + * - Stale-while-revalidate support + * - Negative caching (404s) + * - Content-type aware caching + * - ETag support for conditional requests + * + * Note: This is an in-memory implementation. For production with persistence, + * extend this class to use RegistryStorage for S3-backed caching. + */ +export class UpstreamCache { + /** Cache storage */ + private readonly cache: Map = new Map(); + + /** Configuration */ + private readonly config: IUpstreamCacheConfig; + + /** Maximum cache entries (prevents memory bloat) */ + private readonly maxEntries: number; + + /** Cleanup interval handle */ + private cleanupInterval: ReturnType | null = null; + + constructor(config?: Partial, maxEntries: number = 10000) { + this.config = { ...DEFAULT_CACHE_CONFIG, ...config }; + this.maxEntries = maxEntries; + + // Start periodic cleanup if caching is enabled + if (this.config.enabled) { + this.startCleanup(); + } + } + + /** + * Check if caching is enabled. + */ + public isEnabled(): boolean { + return this.config.enabled; + } + + /** + * Get cached entry for a request context. + * Returns null if not found or expired (unless stale-while-revalidate). + */ + public get(context: IUpstreamFetchContext): ICacheEntry | null { + if (!this.config.enabled) { + return null; + } + + const key = this.buildCacheKey(context); + const entry = this.cache.get(key); + + if (!entry) { + return null; + } + + const now = new Date(); + + // Check if entry is expired + if (entry.expiresAt && entry.expiresAt < now) { + // Check if we can serve stale content + if (this.config.staleWhileRevalidate && !entry.stale) { + const staleAge = (now.getTime() - entry.expiresAt.getTime()) / 1000; + if (staleAge <= this.config.staleMaxAgeSeconds) { + // Mark as stale and return + entry.stale = true; + return entry; + } + } + // Entry is too old, remove it + this.cache.delete(key); + return null; + } + + return entry; + } + + /** + * Store a response in the cache. + */ + public set( + context: IUpstreamFetchContext, + data: Buffer, + contentType: string, + headers: Record, + upstreamId: string, + options?: ICacheSetOptions, + ): void { + if (!this.config.enabled) { + return; + } + + // Enforce max entries limit + if (this.cache.size >= this.maxEntries) { + this.evictOldest(); + } + + const key = this.buildCacheKey(context); + const now = new Date(); + + // Determine TTL based on content type + const ttlSeconds = options?.ttlSeconds ?? this.determineTtl(context, contentType, headers); + + const entry: ICacheEntry = { + data, + contentType, + headers, + cachedAt: now, + expiresAt: ttlSeconds > 0 ? new Date(now.getTime() + ttlSeconds * 1000) : undefined, + etag: headers['etag'] || options?.etag, + upstreamId, + stale: false, + }; + + this.cache.set(key, entry); + } + + /** + * Store a negative cache entry (404 response). + */ + public setNegative(context: IUpstreamFetchContext, upstreamId: string): void { + if (!this.config.enabled || this.config.negativeCacheTtlSeconds <= 0) { + return; + } + + const key = this.buildCacheKey(context); + const now = new Date(); + + const entry: ICacheEntry = { + data: Buffer.from(''), + contentType: 'application/octet-stream', + headers: {}, + cachedAt: now, + expiresAt: new Date(now.getTime() + this.config.negativeCacheTtlSeconds * 1000), + upstreamId, + stale: false, + }; + + this.cache.set(key, entry); + } + + /** + * Check if there's a negative cache entry for this context. + */ + public hasNegative(context: IUpstreamFetchContext): boolean { + const entry = this.get(context); + return entry !== null && entry.data.length === 0; + } + + /** + * Invalidate a specific cache entry. + */ + public invalidate(context: IUpstreamFetchContext): boolean { + const key = this.buildCacheKey(context); + return this.cache.delete(key); + } + + /** + * Invalidate all entries matching a pattern. + * Useful for invalidating all versions of a package. + */ + public invalidatePattern(pattern: RegExp): number { + let count = 0; + for (const key of this.cache.keys()) { + if (pattern.test(key)) { + this.cache.delete(key); + count++; + } + } + return count; + } + + /** + * Invalidate all entries from a specific upstream. + */ + public invalidateUpstream(upstreamId: string): number { + let count = 0; + for (const [key, entry] of this.cache.entries()) { + if (entry.upstreamId === upstreamId) { + this.cache.delete(key); + count++; + } + } + return count; + } + + /** + * Clear all cache entries. + */ + public clear(): void { + this.cache.clear(); + } + + /** + * Get cache statistics. + */ + public getStats(): ICacheStats { + let freshCount = 0; + let staleCount = 0; + let negativeCount = 0; + let totalSize = 0; + const now = new Date(); + + for (const entry of this.cache.values()) { + totalSize += entry.data.length; + + if (entry.data.length === 0) { + negativeCount++; + } else if (entry.stale || (entry.expiresAt && entry.expiresAt < now)) { + staleCount++; + } else { + freshCount++; + } + } + + return { + totalEntries: this.cache.size, + freshEntries: freshCount, + staleEntries: staleCount, + negativeEntries: negativeCount, + totalSizeBytes: totalSize, + maxEntries: this.maxEntries, + enabled: this.config.enabled, + }; + } + + /** + * Stop the cache and cleanup. + */ + public stop(): void { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } + } + + /** + * Build a unique cache key for a request context. + */ + private buildCacheKey(context: IUpstreamFetchContext): string { + // Include method, protocol, path, and sorted query params + const queryString = Object.keys(context.query) + .sort() + .map(k => `${k}=${context.query[k]}`) + .join('&'); + + return `${context.protocol}:${context.method}:${context.path}${queryString ? '?' + queryString : ''}`; + } + + /** + * Determine TTL based on content characteristics. + */ + private determineTtl( + context: IUpstreamFetchContext, + contentType: string, + headers: Record, + ): number { + // Check for Cache-Control header + const cacheControl = headers['cache-control']; + if (cacheControl) { + const maxAgeMatch = cacheControl.match(/max-age=(\d+)/); + if (maxAgeMatch) { + return parseInt(maxAgeMatch[1], 10); + } + if (cacheControl.includes('no-store') || cacheControl.includes('no-cache')) { + return 0; + } + } + + // Check if content is immutable (content-addressable) + if (this.isImmutableContent(context, contentType)) { + return this.config.immutableTtlSeconds; + } + + // Default TTL for mutable content + return this.config.defaultTtlSeconds; + } + + /** + * Check if content is immutable (content-addressable). + */ + private isImmutableContent(context: IUpstreamFetchContext, contentType: string): boolean { + // OCI blobs with digest are immutable + if (context.protocol === 'oci' && context.resourceType === 'blob') { + return true; + } + + // NPM tarballs are immutable (versioned) + if (context.protocol === 'npm' && context.resourceType === 'tarball') { + return true; + } + + // Maven artifacts with version are immutable + if (context.protocol === 'maven' && context.resourceType === 'artifact') { + return true; + } + + // Cargo crate files are immutable + if (context.protocol === 'cargo' && context.resourceType === 'crate') { + return true; + } + + // Composer dist files are immutable + if (context.protocol === 'composer' && context.resourceType === 'dist') { + return true; + } + + // PyPI package files are immutable + if (context.protocol === 'pypi' && context.resourceType === 'package') { + return true; + } + + // RubyGems .gem files are immutable + if (context.protocol === 'rubygems' && context.resourceType === 'gem') { + return true; + } + + return false; + } + + /** + * Evict oldest entries to make room for new ones. + */ + private evictOldest(): void { + // Evict 10% of max entries + const evictCount = Math.ceil(this.maxEntries * 0.1); + let evicted = 0; + + // First, try to evict stale entries + const now = new Date(); + for (const [key, entry] of this.cache.entries()) { + if (evicted >= evictCount) break; + if (entry.stale || (entry.expiresAt && entry.expiresAt < now)) { + this.cache.delete(key); + evicted++; + } + } + + // If not enough evicted, evict oldest by cachedAt + if (evicted < evictCount) { + const entries = Array.from(this.cache.entries()) + .sort((a, b) => a[1].cachedAt.getTime() - b[1].cachedAt.getTime()); + + for (const [key] of entries) { + if (evicted >= evictCount) break; + this.cache.delete(key); + evicted++; + } + } + } + + /** + * Start periodic cleanup of expired entries. + */ + private startCleanup(): void { + // Run cleanup every minute + this.cleanupInterval = setInterval(() => { + this.cleanup(); + }, 60000); + + // Don't keep the process alive just for cleanup + if (this.cleanupInterval.unref) { + this.cleanupInterval.unref(); + } + } + + /** + * Remove all expired entries. + */ + private cleanup(): void { + const now = new Date(); + const staleDeadline = new Date(now.getTime() - this.config.staleMaxAgeSeconds * 1000); + + for (const [key, entry] of this.cache.entries()) { + if (entry.expiresAt) { + // Remove if past stale deadline + if (entry.expiresAt < staleDeadline) { + this.cache.delete(key); + } + } + } + } +} + +/** + * Options for cache set operation. + */ +export interface ICacheSetOptions { + /** Override TTL in seconds */ + ttlSeconds?: number; + /** ETag for conditional requests */ + etag?: string; +} + +/** + * Cache statistics. + */ +export interface ICacheStats { + /** Total number of cached entries */ + totalEntries: number; + /** Number of fresh (non-expired) entries */ + freshEntries: number; + /** Number of stale entries (expired but still usable) */ + staleEntries: number; + /** Number of negative cache entries */ + negativeEntries: number; + /** Total size of cached data in bytes */ + totalSizeBytes: number; + /** Maximum allowed entries */ + maxEntries: number; + /** Whether caching is enabled */ + enabled: boolean; +} diff --git a/ts/upstream/index.ts b/ts/upstream/index.ts new file mode 100644 index 0000000..c5d24d3 --- /dev/null +++ b/ts/upstream/index.ts @@ -0,0 +1,11 @@ +// Interfaces and types +export * from './interfaces.upstream.js'; + +// Classes +export { CircuitBreaker, CircuitOpenError, withCircuitBreaker } from './classes.circuitbreaker.js'; +export type { ICircuitBreakerMetrics } from './classes.circuitbreaker.js'; + +export { UpstreamCache } from './classes.upstreamcache.js'; +export type { ICacheSetOptions, ICacheStats } from './classes.upstreamcache.js'; + +export { BaseUpstream } from './classes.baseupstream.js'; diff --git a/ts/upstream/interfaces.upstream.ts b/ts/upstream/interfaces.upstream.ts new file mode 100644 index 0000000..99f6e82 --- /dev/null +++ b/ts/upstream/interfaces.upstream.ts @@ -0,0 +1,195 @@ +import type { TRegistryProtocol } from '../core/interfaces.core.js'; + +/** + * Scope rule for routing requests to specific upstreams. + * Uses glob patterns for flexible matching. + */ +export interface IUpstreamScopeRule { + /** Glob pattern (e.g., "@company/*", "com.example.*", "library/*") */ + pattern: string; + /** Whether matching resources should be included or excluded */ + action: 'include' | 'exclude'; +} + +/** + * Authentication configuration for an upstream registry. + * Supports multiple auth strategies. + */ +export interface IUpstreamAuthConfig { + /** Authentication type */ + type: 'none' | 'basic' | 'bearer' | 'api-key'; + /** Username for basic auth */ + username?: string; + /** Password for basic auth */ + password?: string; + /** Token for bearer or api-key auth */ + token?: string; + /** Custom header name for api-key auth (default: 'Authorization') */ + headerName?: string; +} + +/** + * Cache configuration for upstream content. + */ +export interface IUpstreamCacheConfig { + /** Whether caching is enabled */ + enabled: boolean; + /** Default TTL in seconds for mutable content (default: 300 = 5 min) */ + defaultTtlSeconds: number; + /** TTL in seconds for immutable/content-addressable content (default: 2592000 = 30 days) */ + immutableTtlSeconds: number; + /** Whether to serve stale content while revalidating in background */ + staleWhileRevalidate: boolean; + /** Maximum age in seconds for stale content (default: 3600 = 1 hour) */ + staleMaxAgeSeconds: number; + /** TTL in seconds for negative cache entries (404s) (default: 60 = 1 min) */ + negativeCacheTtlSeconds: number; +} + +/** + * Resilience configuration for upstream requests. + */ +export interface IUpstreamResilienceConfig { + /** Request timeout in milliseconds (default: 30000) */ + timeoutMs: number; + /** Maximum number of retry attempts (default: 3) */ + maxRetries: number; + /** Initial retry delay in milliseconds (default: 1000) */ + retryDelayMs: number; + /** Maximum retry delay in milliseconds (default: 30000) */ + retryMaxDelayMs: number; + /** Number of failures before circuit breaker opens (default: 5) */ + circuitBreakerThreshold: number; + /** Time in milliseconds before circuit breaker attempts reset (default: 30000) */ + circuitBreakerResetMs: number; +} + +/** + * Configuration for a single upstream registry. + */ +export interface IUpstreamRegistryConfig { + /** Unique identifier for this upstream */ + id: string; + /** Human-readable name */ + name: string; + /** Base URL of the upstream registry (e.g., "https://registry.npmjs.org") */ + url: string; + /** Priority for routing (lower = higher priority, 1 = first) */ + priority: number; + /** Whether this upstream is enabled */ + enabled: boolean; + /** Scope rules for routing (empty = match all) */ + scopeRules?: IUpstreamScopeRule[]; + /** Authentication configuration */ + auth: IUpstreamAuthConfig; + /** Cache configuration overrides */ + cache?: Partial; + /** Resilience configuration overrides */ + resilience?: Partial; +} + +/** + * Protocol-level upstream configuration. + * Configures upstream behavior for a specific protocol (npm, oci, etc.) + */ +export interface IProtocolUpstreamConfig { + /** Whether upstream is enabled for this protocol */ + enabled: boolean; + /** List of upstream registries, ordered by priority */ + upstreams: IUpstreamRegistryConfig[]; + /** Protocol-level cache configuration defaults */ + cache?: Partial; + /** Protocol-level resilience configuration defaults */ + resilience?: Partial; +} + +/** + * Result of an upstream fetch operation. + */ +export interface IUpstreamResult { + /** Whether the fetch was successful (2xx status) */ + success: boolean; + /** HTTP status code */ + status: number; + /** Response headers */ + headers: Record; + /** Response body (Buffer for binary, object for JSON) */ + body?: Buffer | any; + /** ID of the upstream that served the request */ + upstreamId: string; + /** Whether the response was served from cache */ + fromCache: boolean; + /** Request latency in milliseconds */ + latencyMs: number; +} + +/** + * Circuit breaker state. + */ +export type TCircuitState = 'CLOSED' | 'OPEN' | 'HALF_OPEN'; + +/** + * Context for an upstream fetch request. + */ +export interface IUpstreamFetchContext { + /** Protocol type */ + protocol: TRegistryProtocol; + /** Resource identifier (package name, artifact name, etc.) */ + resource: string; + /** Type of resource being fetched (packument, tarball, manifest, blob, etc.) */ + resourceType: string; + /** Original request path */ + path: string; + /** HTTP method */ + method: string; + /** Request headers */ + headers: Record; + /** Query parameters */ + query: Record; +} + +/** + * Cache entry stored in the upstream cache. + */ +export interface ICacheEntry { + /** Cached data */ + data: Buffer; + /** Content type of the cached data */ + contentType: string; + /** Original response headers */ + headers: Record; + /** When the entry was cached */ + cachedAt: Date; + /** When the entry expires */ + expiresAt?: Date; + /** ETag for conditional requests */ + etag?: string; + /** ID of the upstream that provided the data */ + upstreamId: string; + /** Whether the entry is stale but still usable */ + stale?: boolean; +} + +/** + * Default cache configuration values. + */ +export const DEFAULT_CACHE_CONFIG: IUpstreamCacheConfig = { + enabled: true, + defaultTtlSeconds: 300, // 5 minutes + immutableTtlSeconds: 2592000, // 30 days + staleWhileRevalidate: true, + staleMaxAgeSeconds: 3600, // 1 hour + negativeCacheTtlSeconds: 60, // 1 minute +}; + +/** + * Default resilience configuration values. + */ +export const DEFAULT_RESILIENCE_CONFIG: IUpstreamResilienceConfig = { + timeoutMs: 30000, + maxRetries: 3, + retryDelayMs: 1000, + retryMaxDelayMs: 30000, + circuitBreakerThreshold: 5, + circuitBreakerResetMs: 30000, +};