feat(core,storage,oci,registry-config): add streaming response support and configurable registry URLs across protocols

This commit is contained in:
2026-03-24 22:59:37 +00:00
parent 1f0acf2825
commit 7da1a35efe
42 changed files with 4179 additions and 5396 deletions

View File

@@ -3,6 +3,7 @@ import { BaseRegistry } from '../core/classes.baseregistry.js';
import { RegistryStorage } from '../core/classes.registrystorage.js';
import { AuthManager } from '../core/classes.authmanager.js';
import type { IRequestContext, IResponse, IAuthToken, IRegistryError, IRequestActor } from '../core/interfaces.core.js';
import { createHashTransform, streamToBuffer } from '../core/helpers.stream.js';
import type { IUpstreamProvider } from '../upstream/interfaces.upstream.js';
import { OciUpstream } from './classes.ociupstream.js';
import type {
@@ -302,6 +303,8 @@ export class OciRegistry extends BaseRegistry {
uploadId,
repository,
chunks: [],
chunkPaths: [],
chunkIndex: 0,
totalSize: 0,
createdAt: new Date(),
lastActivity: new Date(),
@@ -571,25 +574,35 @@ export class OciRegistry extends BaseRegistry {
return this.createUnauthorizedResponse(repository, 'pull');
}
// Try local storage first
let data = await this.storage.getOciBlob(digest);
// Try local storage first (streaming)
const streamResult = await this.storage.getOciBlobStream(digest);
if (streamResult) {
return {
status: 200,
headers: {
'Content-Type': 'application/octet-stream',
'Content-Length': streamResult.size.toString(),
'Docker-Content-Digest': digest,
},
body: streamResult.stream,
};
}
// If not found locally, try upstream
if (!data) {
const upstream = await this.getUpstreamForRequest(repository, 'blob', 'GET', actor);
if (upstream) {
this.logger.log('debug', 'getBlob: fetching from upstream', { repository, digest });
const upstreamBlob = await upstream.fetchBlob(repository, digest);
if (upstreamBlob) {
data = upstreamBlob;
// Cache the blob locally (blobs are content-addressable and immutable)
await this.storage.putOciBlob(digest, data);
this.logger.log('debug', 'getBlob: cached blob locally', {
repository,
digest,
size: data.length,
});
}
let data: Buffer | null = null;
const upstream = await this.getUpstreamForRequest(repository, 'blob', 'GET', actor);
if (upstream) {
this.logger.log('debug', 'getBlob: fetching from upstream', { repository, digest });
const upstreamBlob = await upstream.fetchBlob(repository, digest);
if (upstreamBlob) {
data = upstreamBlob;
// Cache the blob locally (blobs are content-addressable and immutable)
await this.storage.putOciBlob(digest, data);
this.logger.log('debug', 'getBlob: cached blob locally', {
repository,
digest,
size: data.length,
});
}
}
@@ -620,17 +633,15 @@ export class OciRegistry extends BaseRegistry {
return this.createUnauthorizedHeadResponse(repository, 'pull');
}
const exists = await this.storage.ociBlobExists(digest);
if (!exists) {
const blobSize = await this.storage.getOciBlobSize(digest);
if (blobSize === null) {
return { status: 404, headers: {}, body: null };
}
const blob = await this.storage.getOciBlob(digest);
return {
status: 200,
headers: {
'Content-Length': blob ? blob.length.toString() : '0',
'Content-Length': blobSize.toString(),
'Docker-Content-Digest': digest,
},
body: null,
@@ -670,7 +681,12 @@ export class OciRegistry extends BaseRegistry {
}
const chunkData = this.toBuffer(data);
session.chunks.push(chunkData);
// Write chunk to temp S3 object instead of accumulating in memory
const chunkPath = `oci/uploads/${uploadId}/chunk-${session.chunkIndex}`;
await this.storage.putObject(chunkPath, chunkData);
session.chunkPaths.push(chunkPath);
session.chunkIndex++;
session.totalSize += chunkData.length;
session.lastActivity = new Date();
@@ -699,13 +715,52 @@ export class OciRegistry extends BaseRegistry {
};
}
const chunks = [...session.chunks];
if (finalData) chunks.push(this.toBuffer(finalData));
const blobData = Buffer.concat(chunks);
// If there's final data in the PUT body, write it as the last chunk
if (finalData) {
const buf = this.toBuffer(finalData);
const chunkPath = `oci/uploads/${uploadId}/chunk-${session.chunkIndex}`;
await this.storage.putObject(chunkPath, buf);
session.chunkPaths.push(chunkPath);
session.chunkIndex++;
session.totalSize += buf.length;
}
// Verify digest
const calculatedDigest = await this.calculateDigest(blobData);
// Create a ReadableStream that assembles all chunks from S3 sequentially
const chunkPaths = [...session.chunkPaths];
const storage = this.storage;
let chunkIdx = 0;
const assembledStream = new ReadableStream<Uint8Array>({
async pull(controller) {
if (chunkIdx >= chunkPaths.length) {
controller.close();
return;
}
const result = await storage.getObjectStream(chunkPaths[chunkIdx++]);
if (result) {
const reader = result.stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) controller.enqueue(value);
}
}
},
});
// Pipe through hash transform for incremental digest verification
const { transform: hashTransform, getDigest } = createHashTransform('sha256');
const hashedStream = assembledStream.pipeThrough(hashTransform);
// Consume stream to buffer for S3 upload
// (AWS SDK PutObjectCommand requires known content-length for streams;
// the key win is chunks are NOT accumulated in memory during PATCH — they live in S3)
const blobData = await streamToBuffer(hashedStream);
// Verify digest before storing
const calculatedDigest = `sha256:${getDigest()}`;
if (calculatedDigest !== digest) {
await this.cleanupUploadChunks(session);
this.uploadSessions.delete(uploadId);
return {
status: 400,
headers: {},
@@ -713,7 +768,11 @@ export class OciRegistry extends BaseRegistry {
};
}
// Store verified blob
await this.storage.putOciBlob(digest, blobData);
// Cleanup temp chunks and session
await this.cleanupUploadChunks(session);
this.uploadSessions.delete(uploadId);
return {
@@ -726,6 +785,19 @@ export class OciRegistry extends BaseRegistry {
};
}
/**
* Delete all temp S3 chunk objects for an upload session.
*/
private async cleanupUploadChunks(session: IUploadSession): Promise<void> {
for (const chunkPath of session.chunkPaths) {
try {
await this.storage.deleteObject(chunkPath);
} catch {
// Best-effort cleanup
}
}
}
private async getUploadStatus(uploadId: string): Promise<IResponse> {
const session = this.uploadSessions.get(uploadId);
if (!session) {
@@ -917,6 +989,8 @@ export class OciRegistry extends BaseRegistry {
for (const [uploadId, session] of this.uploadSessions.entries()) {
if (now.getTime() - session.lastActivity.getTime() > maxAge) {
// Clean up temp S3 chunks for stale sessions
this.cleanupUploadChunks(session).catch(() => {});
this.uploadSessions.delete(uploadId);
}
}

View File

@@ -62,6 +62,10 @@ export interface IUploadSession {
uploadId: string;
repository: string;
chunks: Buffer[];
/** S3 paths to temp chunk objects (streaming mode) */
chunkPaths: string[];
/** Index counter for naming temp chunk objects */
chunkIndex: number;
totalSize: number;
createdAt: Date;
lastActivity: Date;