feat(upstream): Add dynamic per-request upstream provider and integrate into registries

This commit is contained in:
2025-12-03 22:16:40 +00:00
parent 351680159b
commit e9af3f8328
14 changed files with 1117 additions and 287 deletions

View File

@@ -2,8 +2,8 @@ import { Smartlog } from '@push.rocks/smartlog';
import { BaseRegistry } from '../core/classes.baseregistry.js';
import { RegistryStorage } from '../core/classes.registrystorage.js';
import { AuthManager } from '../core/classes.authmanager.js';
import type { IRequestContext, IResponse, IAuthToken, IRegistryError } from '../core/interfaces.core.js';
import type { IProtocolUpstreamConfig } from '../upstream/interfaces.upstream.js';
import type { IRequestContext, IResponse, IAuthToken, IRegistryError, IRequestActor } from '../core/interfaces.core.js';
import type { IUpstreamProvider } from '../upstream/interfaces.upstream.js';
import { OciUpstream } from './classes.ociupstream.js';
import type {
IUploadSession,
@@ -24,7 +24,7 @@ export class OciRegistry extends BaseRegistry {
private basePath: string = '/oci';
private cleanupInterval?: NodeJS.Timeout;
private ociTokens?: { realm: string; service: string };
private upstream: OciUpstream | null = null;
private upstreamProvider: IUpstreamProvider | null = null;
private logger: Smartlog;
constructor(
@@ -32,13 +32,14 @@ export class OciRegistry extends BaseRegistry {
authManager: AuthManager,
basePath: string = '/oci',
ociTokens?: { realm: string; service: string },
upstreamConfig?: IProtocolUpstreamConfig
upstreamProvider?: IUpstreamProvider
) {
super();
this.storage = storage;
this.authManager = authManager;
this.basePath = basePath;
this.ociTokens = ociTokens;
this.upstreamProvider = upstreamProvider || null;
// Initialize logger
this.logger = new Smartlog({
@@ -53,15 +54,50 @@ export class OciRegistry extends BaseRegistry {
});
this.logger.enableConsole();
// Initialize upstream if configured
if (upstreamConfig?.enabled) {
this.upstream = new OciUpstream(upstreamConfig, basePath, this.logger);
this.logger.log('info', 'OCI upstream initialized', {
upstreams: upstreamConfig.upstreams.map(u => u.name),
});
if (upstreamProvider) {
this.logger.log('info', 'OCI upstream provider configured');
}
}
/**
* Extract scope from OCI repository name.
* @example "myorg/myimage" -> "myorg"
* @example "library/nginx" -> "library"
* @example "nginx" -> null
*/
private extractScope(repository: string): string | null {
const slashIndex = repository.indexOf('/');
if (slashIndex > 0) {
return repository.substring(0, slashIndex);
}
return null;
}
/**
* Get upstream for a specific request.
* Calls the provider to resolve upstream config dynamically.
*/
private async getUpstreamForRequest(
resource: string,
resourceType: string,
method: string,
actor?: IRequestActor
): Promise<OciUpstream | null> {
if (!this.upstreamProvider) return null;
const config = await this.upstreamProvider.resolveUpstreamConfig({
protocol: 'oci',
resource,
scope: this.extractScope(resource),
actor,
method,
resourceType,
});
if (!config?.enabled) return null;
return new OciUpstream(config, this.basePath, this.logger);
}
public async init(): Promise<void> {
// Start cleanup of stale upload sessions
this.startUploadSessionCleanup();
@@ -80,6 +116,14 @@ export class OciRegistry extends BaseRegistry {
const tokenString = authHeader?.replace(/^Bearer\s+/i, '');
const token = tokenString ? await this.authManager.validateToken(tokenString, 'oci') : null;
// Build actor from context and validated token
const actor: IRequestActor = {
...context.actor,
userId: token?.userId,
ip: context.headers['x-forwarded-for'] || context.headers['X-Forwarded-For'],
userAgent: context.headers['user-agent'] || context.headers['User-Agent'],
};
// Route to appropriate handler
if (path === '/v2/' || path === '/v2') {
return this.handleVersionCheck();
@@ -91,14 +135,14 @@ export class OciRegistry extends BaseRegistry {
const [, name, reference] = manifestMatch;
// Prefer rawBody for content-addressable operations to preserve exact bytes
const bodyData = context.rawBody || context.body;
return this.handleManifestRequest(context.method, name, reference, token, bodyData, context.headers);
return this.handleManifestRequest(context.method, name, reference, token, bodyData, context.headers, actor);
}
// Blob operations: /v2/{name}/blobs/{digest}
const blobMatch = path.match(/^\/v2\/([^\/]+(?:\/[^\/]+)*)\/blobs\/(sha256:[a-f0-9]{64})$/);
if (blobMatch) {
const [, name, digest] = blobMatch;
return this.handleBlobRequest(context.method, name, digest, token, context.headers);
return this.handleBlobRequest(context.method, name, digest, token, context.headers, actor);
}
// Blob upload operations: /v2/{name}/blobs/uploads/
@@ -168,11 +212,12 @@ export class OciRegistry extends BaseRegistry {
reference: string,
token: IAuthToken | null,
body?: Buffer | any,
headers?: Record<string, string>
headers?: Record<string, string>,
actor?: IRequestActor
): Promise<IResponse> {
switch (method) {
case 'GET':
return this.getManifest(repository, reference, token, headers);
return this.getManifest(repository, reference, token, headers, actor);
case 'HEAD':
return this.headManifest(repository, reference, token);
case 'PUT':
@@ -193,11 +238,12 @@ export class OciRegistry extends BaseRegistry {
repository: string,
digest: string,
token: IAuthToken | null,
headers: Record<string, string>
headers: Record<string, string>,
actor?: IRequestActor
): Promise<IResponse> {
switch (method) {
case 'GET':
return this.getBlob(repository, digest, token, headers['range'] || headers['Range']);
return this.getBlob(repository, digest, token, headers['range'] || headers['Range'], actor);
case 'HEAD':
return this.headBlob(repository, digest, token);
case 'DELETE':
@@ -318,7 +364,8 @@ export class OciRegistry extends BaseRegistry {
repository: string,
reference: string,
token: IAuthToken | null,
headers?: Record<string, string>
headers?: Record<string, string>,
actor?: IRequestActor
): Promise<IResponse> {
if (!await this.checkPermission(token, repository, 'pull')) {
return this.createUnauthorizedResponse(repository, 'pull');
@@ -346,30 +393,33 @@ export class OciRegistry extends BaseRegistry {
}
// If not found locally, try upstream
if (!manifestData && this.upstream) {
this.logger.log('debug', 'getManifest: fetching from upstream', { repository, reference });
const upstreamResult = await this.upstream.fetchManifest(repository, reference);
if (upstreamResult) {
manifestData = Buffer.from(JSON.stringify(upstreamResult.manifest), 'utf8');
contentType = upstreamResult.contentType;
digest = upstreamResult.digest;
if (!manifestData) {
const upstream = await this.getUpstreamForRequest(repository, 'manifest', 'GET', actor);
if (upstream) {
this.logger.log('debug', 'getManifest: fetching from upstream', { repository, reference });
const upstreamResult = await upstream.fetchManifest(repository, reference);
if (upstreamResult) {
manifestData = Buffer.from(JSON.stringify(upstreamResult.manifest), 'utf8');
contentType = upstreamResult.contentType;
digest = upstreamResult.digest;
// Cache the manifest locally
await this.storage.putOciManifest(repository, digest, manifestData, contentType);
// Cache the manifest locally
await this.storage.putOciManifest(repository, digest, manifestData, contentType);
// If reference is a tag, update tags mapping
if (!reference.startsWith('sha256:')) {
const tags = await this.getTagsData(repository);
tags[reference] = digest;
const tagsPath = `oci/tags/${repository}/tags.json`;
await this.storage.putObject(tagsPath, Buffer.from(JSON.stringify(tags), 'utf-8'));
// If reference is a tag, update tags mapping
if (!reference.startsWith('sha256:')) {
const tags = await this.getTagsData(repository);
tags[reference] = digest;
const tagsPath = `oci/tags/${repository}/tags.json`;
await this.storage.putObject(tagsPath, Buffer.from(JSON.stringify(tags), 'utf-8'));
}
this.logger.log('debug', 'getManifest: cached manifest locally', {
repository,
reference,
digest,
});
}
this.logger.log('debug', 'getManifest: cached manifest locally', {
repository,
reference,
digest,
});
}
}
@@ -514,7 +564,8 @@ export class OciRegistry extends BaseRegistry {
repository: string,
digest: string,
token: IAuthToken | null,
range?: string
range?: string,
actor?: IRequestActor
): Promise<IResponse> {
if (!await this.checkPermission(token, repository, 'pull')) {
return this.createUnauthorizedResponse(repository, 'pull');
@@ -524,18 +575,21 @@ export class OciRegistry extends BaseRegistry {
let data = await this.storage.getOciBlob(digest);
// If not found locally, try upstream
if (!data && this.upstream) {
this.logger.log('debug', 'getBlob: fetching from upstream', { repository, digest });
const upstreamBlob = await this.upstream.fetchBlob(repository, digest);
if (upstreamBlob) {
data = upstreamBlob;
// Cache the blob locally (blobs are content-addressable and immutable)
await this.storage.putOciBlob(digest, data);
this.logger.log('debug', 'getBlob: cached blob locally', {
repository,
digest,
size: data.length,
});
if (!data) {
const upstream = await this.getUpstreamForRequest(repository, 'blob', 'GET', actor);
if (upstream) {
this.logger.log('debug', 'getBlob: fetching from upstream', { repository, digest });
const upstreamBlob = await upstream.fetchBlob(repository, digest);
if (upstreamBlob) {
data = upstreamBlob;
// Cache the blob locally (blobs are content-addressable and immutable)
await this.storage.putOciBlob(digest, data);
this.logger.log('debug', 'getBlob: cached blob locally', {
repository,
digest,
size: data.length,
});
}
}
}