From 5b68fa55d02060147392cc1bf8418c62afee396b Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Tue, 2 Jun 2026 13:38:48 +0000 Subject: [PATCH] fix(registry-copy): stream blob uploads in chunks --- changelog.md | 7 + pnpm-lock.yaml | 48 ++--- ts/classes.registrycopy.ts | 354 ++++++++++++++++++++++++++++++++----- 3 files changed, 326 insertions(+), 83 deletions(-) diff --git a/changelog.md b/changelog.md index be690b5..1e97687 100644 --- a/changelog.md +++ b/changelog.md @@ -3,6 +3,13 @@ ## Pending +### Fixes + +- stream blob uploads in chunks (registry-copy) + - Upload blobs via OCI chunked PATCH requests instead of buffering full layers in memory + - Retry chunk uploads with upload status checks to resume from accepted byte ranges + - Cancel incomplete uploads and handle registries that finalize blobs despite inconsistent responses + ## 2026-05-22 - 2.4.0 ### Features diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 949ce4b..cbb68dc 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -866,9 +866,6 @@ packages: engines: {node: '>=18'} hasBin: true - '@push.rocks/consolecolor@2.0.3': - resolution: {integrity: sha512-hA+m0BMqEwZNSAS7c2aQFfoPkpX/dNdsHzkdLdeERUOy7BLacb9ItTUofGtjtginP0yDj4NSpqSjNYyX3Y8Y/w==} - '@push.rocks/consolecolor@2.0.4': resolution: {integrity: sha512-rQJfuSJLzm117PBpsfyemX8Q/rpKh8ZVc2AqDVu6RXJMJkmGkKsADe0/rnttuHZYss8IP7yJIN9E6Vnx+jyy0A==} @@ -920,9 +917,6 @@ packages: '@push.rocks/smartdata@7.1.7': resolution: {integrity: sha512-HDI/Q9dKybfsJ68oCzlE+S63Xpij9qXnMfi28yznKP0Li1ECVZZMDDGIW5IjsXlHjO+Q+RJMcVd72Pjt3QLY5Q==} - '@push.rocks/smartdelay@3.0.5': - resolution: {integrity: sha512-mUuI7kj2f7ztjpic96FvRIlf2RsKBa5arw81AHNsndbxO6asRcxuWL8dTVxouEIK8YsBUlj0AsrCkHhMbLQdHw==} - '@push.rocks/smartdelay@3.1.0': resolution: {integrity: sha512-59xveBMbWmbFhh/rqhQnYG/klg/VONG9hV8+RQ7ftqsNRkcmUT+VM5etAbODgAUvsF4lxK+xVR0tbZOo0kGhRQ==} @@ -1007,9 +1001,6 @@ packages: '@push.rocks/smartpdf@4.2.2': resolution: {integrity: sha512-xQWRChCLcM/sUrRuanvIcND/dKrnCYfL8Rr3kzSIPgSoDSmdDbd4kz7lLAHEPTsCezIwg2VqxFidW+zMNZ5Z1Q==} - '@push.rocks/smartpromise@4.2.3': - resolution: {integrity: sha512-Ycg/TJR+tMt+S3wSFurOpEoW6nXv12QBtKXgBcjMZ4RsdO28geN46U09osPn9N9WuwQy1PkmTV5J/V4F9U8qEw==} - '@push.rocks/smartpromise@4.2.4': resolution: {integrity: sha512-8FUyYt94hOIY9mqHjitn4h69u0jbEtTF2RKKw2DpiTVFjpDTk9gXbVHZ/V+xEcBrN4mrzdQES0OiDmkNPoddEQ==} @@ -1049,9 +1040,6 @@ packages: '@push.rocks/smartstream@3.4.2': resolution: {integrity: sha512-JsjFjaNIlCBUglciM/IrXH0mH+oOQTLYQ6UMwqsew2XSUTXxER3ev2NeKMDBV6ONf2HF21EPnOZuKfgvtNGnUg==} - '@push.rocks/smartstring@4.1.0': - resolution: {integrity: sha512-Q4py/Nm3KTDhQ9EiC75yBtSTLR0KLMwhKM+8gGcutgKotZT6wJ3gncjmtD8LKFfNhb4lSaFMgPJgLrCHTOH6Iw==} - '@push.rocks/smartstring@4.1.1': resolution: {integrity: sha512-FlEpp2PcQ819ymmxjWb5/2gD8uPic/+IvOrSP2+KTdXLHOI4GSyK9YW/YBF541LVGl0GC3VGFmypcPNUzkPfYw==} @@ -4544,10 +4532,6 @@ snapshots: - react-native-b4a - supports-color - '@push.rocks/consolecolor@2.0.3': - dependencies: - ansi-256-colors: 1.1.0 - '@push.rocks/consolecolor@2.0.4': dependencies: ansi-256-colors: 1.1.0 @@ -4602,8 +4586,8 @@ snapshots: dependencies: '@push.rocks/smartfs': 1.5.1 '@push.rocks/smartpath': 6.0.0 - '@push.rocks/smartpromise': 4.2.3 - '@push.rocks/smartstring': 4.1.0 + '@push.rocks/smartpromise': 4.2.4 + '@push.rocks/smartstring': 4.1.1 '@push.rocks/qenv@6.1.4': dependencies: @@ -4740,10 +4724,6 @@ snapshots: - supports-color - vue - '@push.rocks/smartdelay@3.0.5': - dependencies: - '@push.rocks/smartpromise': 4.2.3 - '@push.rocks/smartdelay@3.1.0': dependencies: '@push.rocks/smartpromise': 4.2.4 @@ -4771,7 +4751,7 @@ snapshots: '@push.rocks/smartexit@2.0.3': dependencies: '@push.rocks/lik': 6.4.1 - '@push.rocks/smartpromise': 4.2.3 + '@push.rocks/smartpromise': 4.2.4 '@push.rocks/smartexpect@2.5.0': dependencies: @@ -4820,7 +4800,7 @@ snapshots: dependencies: '@push.rocks/lik': 6.4.1 '@push.rocks/smartobject': 1.0.12 - '@push.rocks/smartpromise': 4.2.3 + '@push.rocks/smartpromise': 4.2.4 inquirer: 11.1.0 '@push.rocks/smartjimp@1.2.1': @@ -4844,9 +4824,9 @@ snapshots: '@push.rocks/smartlog-destination-local@9.0.2': dependencies: - '@push.rocks/consolecolor': 2.0.3 + '@push.rocks/consolecolor': 2.0.4 '@push.rocks/smartlog-interfaces': 3.0.2 - '@push.rocks/smartpromise': 4.2.3 + '@push.rocks/smartpromise': 4.2.4 '@push.rocks/smartlog-interfaces@3.0.2': dependencies: @@ -5005,8 +4985,6 @@ snapshots: - typescript - utf-8-validate - '@push.rocks/smartpromise@4.2.3': {} - '@push.rocks/smartpromise@4.2.4': {} '@push.rocks/smartpuppeteer@2.0.6(typescript@6.0.3)': @@ -5052,7 +5030,7 @@ snapshots: '@push.rocks/smartrx@3.0.10': dependencies: - '@push.rocks/smartpromise': 4.2.3 + '@push.rocks/smartpromise': 4.2.4 rxjs: 7.8.2 '@push.rocks/smartserve@2.0.4': @@ -5070,9 +5048,9 @@ snapshots: '@push.rocks/smartshell@3.3.8': dependencies: - '@push.rocks/smartdelay': 3.0.5 + '@push.rocks/smartdelay': 3.1.0 '@push.rocks/smartexit': 2.0.3 - '@push.rocks/smartpromise': 4.2.3 + '@push.rocks/smartpromise': 4.2.4 '@types/which': 3.0.4 which: 6.0.1 @@ -5106,10 +5084,6 @@ snapshots: '@push.rocks/smartpromise': 4.2.4 '@push.rocks/smartrx': 3.0.10 - '@push.rocks/smartstring@4.1.0': - dependencies: - '@push.rocks/isounique': 1.0.5 - '@push.rocks/smartstring@4.1.1': dependencies: '@push.rocks/isounique': 1.0.5 @@ -5117,8 +5091,8 @@ snapshots: '@push.rocks/smarttime@4.2.3': dependencies: '@push.rocks/lik': 6.4.1 - '@push.rocks/smartdelay': 3.0.5 - '@push.rocks/smartpromise': 4.2.3 + '@push.rocks/smartdelay': 3.1.0 + '@push.rocks/smartpromise': 4.2.4 croner: 10.0.1 date-fns: 4.1.0 dayjs: 1.11.20 diff --git a/ts/classes.registrycopy.ts b/ts/classes.registrycopy.ts index 60655d4..42789c8 100644 --- a/ts/classes.registrycopy.ts +++ b/ts/classes.registrycopy.ts @@ -3,6 +3,8 @@ import * as os from 'os'; import * as path from 'path'; import { logger } from './tsdocker.logging.js'; +const blobUploadChunkSize = 32 * 1024 * 1024; + interface IRegistryCredentials { username: string; password: string; @@ -20,6 +22,20 @@ interface ITokenCache { export class RegistryCopy { private tokenCache: ITokenCache = {}; + private formatError(error: unknown): string { + const err = error as Error & { cause?: unknown }; + const cause = err.cause instanceof Error + ? `; cause: ${err.cause.message}` + : err.cause + ? `; cause: ${String(err.cause)}` + : ''; + return `${err.message}${cause}`; + } + + private async cancelResponseBody(resp: Response): Promise { + await resp.body?.cancel().catch(() => undefined); + } + /** * Wraps fetch() with timeout (via AbortSignal) and retry with exponential backoff. * Retries on network errors and 5xx; does NOT retry on 4xx client errors. @@ -46,6 +62,7 @@ export class RegistryCopy { if (resp.status >= 500 && attempt < maxRetries) { const delay = 1000 * Math.pow(2, attempt - 1); logger.log('warn', `${method} ${url} returned ${resp.status}, retrying in ${delay}ms (attempt ${attempt}/${maxRetries})...`); + await this.cancelResponseBody(resp); await new Promise(r => setTimeout(r, delay)); continue; } @@ -57,10 +74,10 @@ export class RegistryCopy { lastError = err as Error; if (attempt < maxRetries) { const delay = 1000 * Math.pow(2, attempt - 1); - logger.log('warn', `${method} ${url} failed (attempt ${attempt}/${maxRetries}): ${lastError.message}, retrying in ${delay}ms...`); + logger.log('warn', `${method} ${url} failed (attempt ${attempt}/${maxRetries}): ${this.formatError(err)}, retrying in ${delay}ms...`); await new Promise(r => setTimeout(r, delay)); } else { - logger.log('error', `${method} ${url} failed after ${maxRetries} attempts: ${lastError.message}`); + logger.log('error', `${method} ${url} failed after ${maxRetries} attempts: ${this.formatError(err)}`); } } } @@ -131,6 +148,230 @@ export class RegistryCopy { return `https://${registry}`; } + private normalizeUploadUrl(registry: string, location: string): string { + if (location.startsWith('/')) { + return `${this.getRegistryApiBase(registry)}${location}`; + } + return location; + } + + private parseUploadRange(rangeHeader: string | null, currentBytes: number): number | null { + if (!rangeHeader) { + return null; + } + + const rangeMatch = rangeHeader.match(/(?:bytes=)?(\d+)-(\d+)$/); + if (!rangeMatch) { + return null; + } + + const start = Number(rangeMatch[1]); + const end = Number(rangeMatch[2]); + if (!Number.isFinite(start) || !Number.isFinite(end) || start !== 0 || end < 0) { + return null; + } + + if (currentBytes === 0 && end === 0) { + return 0; + } + + return end + 1; + } + + private async fetchUploadUrl( + registry: string, + repo: string, + uploadUrl: string, + options: RequestInit & { duplex?: string }, + credentials?: IRegistryCredentials | null, + timeoutMs: number = 300_000, + maxRetries: number = 1, + ): Promise { + const token = await this.getToken(registry, repo, 'pull,push', credentials); + const headers: Record = { ...((options.headers || {}) as Record) }; + if (token) { + headers['Authorization'] = `Bearer ${token}`; + } + + const fetchOptions: RequestInit & { duplex?: string } = { + ...options, + headers, + }; + if (options.body) { + fetchOptions.duplex = 'half'; + } + + return this.fetchWithRetry(uploadUrl, fetchOptions, timeoutMs, maxRetries); + } + + private async getUploadStatus( + registry: string, + repo: string, + uploadUrl: string, + currentBytes: number, + credentials?: IRegistryCredentials | null, + ): Promise<{ uploadUrl: string; uploadedBytes: number | null }> { + const resp = await this.fetchUploadUrl(registry, repo, uploadUrl, { method: 'GET' }, credentials, 30_000, 3); + const nextLocation = resp.headers.get('location'); + const nextUploadUrl = nextLocation ? this.normalizeUploadUrl(registry, nextLocation) : uploadUrl; + + if (resp.status !== 204 && resp.status !== 202) { + const body = await resp.text(); + throw new Error(`Failed to get upload status for ${registry}/${repo}: ${resp.status} ${body}`); + } + + return { + uploadUrl: nextUploadUrl, + uploadedBytes: this.parseUploadRange(resp.headers.get('range'), currentBytes), + }; + } + + private async *readBlobChunks(resp: Response): AsyncGenerator { + if (!resp.body) { + yield Buffer.from(await resp.arrayBuffer()); + return; + } + + const reader = resp.body.getReader(); + const pendingBuffers: Buffer[] = []; + let pendingBytes = 0; + let fullyRead = false; + + const takeChunk = (size: number): Buffer => { + const chunk = Buffer.allocUnsafe(size); + let chunkOffset = 0; + + while (chunkOffset < size) { + const nextBuffer = pendingBuffers[0]; + const bytesToCopy = Math.min(nextBuffer.length, size - chunkOffset); + nextBuffer.copy(chunk, chunkOffset, 0, bytesToCopy); + chunkOffset += bytesToCopy; + pendingBytes -= bytesToCopy; + + if (bytesToCopy === nextBuffer.length) { + pendingBuffers.shift(); + } else { + pendingBuffers[0] = nextBuffer.subarray(bytesToCopy); + } + } + + return chunk; + }; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + fullyRead = true; + break; + } + + const nextBuffer = Buffer.from(value); + pendingBuffers.push(nextBuffer); + pendingBytes += nextBuffer.length; + + while (pendingBytes >= blobUploadChunkSize) { + yield takeChunk(blobUploadChunkSize); + } + } + + if (pendingBytes > 0) { + yield takeChunk(pendingBytes); + } + } finally { + if (!fullyRead) { + await reader.cancel().catch(() => undefined); + } + reader.releaseLock(); + } + } + + private async cancelUpload( + registry: string, + repo: string, + uploadUrl: string, + credentials?: IRegistryCredentials | null, + ): Promise { + try { + const resp = await this.fetchUploadUrl(registry, repo, uploadUrl, { method: 'DELETE' }, credentials, 30_000, 1); + if (!resp.ok && resp.status !== 404) { + logger.log('warn', `Failed to cancel upload for ${registry}/${repo}: ${resp.status} ${await resp.text()}`); + } + } catch (err) { + logger.log('warn', `Failed to cancel upload for ${registry}/${repo}: ${this.formatError(err)}`); + } + } + + private async patchUploadChunk( + registry: string, + repo: string, + uploadUrl: string, + chunk: Buffer, + uploadedBytes: number, + credentials?: IRegistryCredentials | null, + ): Promise<{ uploadUrl: string; uploadedBytes: number }> { + let remainingChunk = chunk; + let currentUploadUrl = uploadUrl; + let currentUploadedBytes = uploadedBytes; + let attempt = 1; + + while (remainingChunk.length > 0) { + try { + const resp = await this.fetchUploadUrl(registry, repo, currentUploadUrl, { + method: 'PATCH', + headers: { + 'Content-Type': 'application/octet-stream', + 'Content-Length': String(remainingChunk.length), + }, + body: remainingChunk as any, + }, credentials, 300_000, 1); + + if (resp.status !== 202) { + const body = await resp.text(); + throw new Error(`Chunk upload failed for ${registry}/${repo}: ${resp.status} ${body}`); + } + + const nextLocation = resp.headers.get('location'); + if (nextLocation) { + currentUploadUrl = this.normalizeUploadUrl(registry, nextLocation); + } + + const rangedBytes = this.parseUploadRange(resp.headers.get('range'), currentUploadedBytes); + currentUploadedBytes = rangedBytes && rangedBytes > currentUploadedBytes + ? rangedBytes + : currentUploadedBytes + remainingChunk.length; + remainingChunk = Buffer.alloc(0); + } catch (err) { + if (attempt >= 6) { + throw err; + } + + const delay = 1000 * Math.pow(2, attempt - 1); + logger.log('warn', `Chunk upload for ${registry}/${repo} failed: ${this.formatError(err)}, checking upload status before retrying in ${delay}ms...`); + + const status = await this.getUploadStatus(registry, repo, currentUploadUrl, currentUploadedBytes, credentials); + currentUploadUrl = status.uploadUrl; + if (status.uploadedBytes !== null) { + const acceptedBytes = status.uploadedBytes - currentUploadedBytes; + if (acceptedBytes >= remainingChunk.length) { + currentUploadedBytes = status.uploadedBytes; + remainingChunk = Buffer.alloc(0); + break; + } + if (acceptedBytes > 0) { + currentUploadedBytes = status.uploadedBytes; + remainingChunk = remainingChunk.subarray(acceptedBytes); + } + } + + await new Promise(r => setTimeout(r, delay)); + attempt++; + } + } + + return { uploadUrl: currentUploadUrl, uploadedBytes: currentUploadedBytes }; + } + /** * Obtains a Bearer token for registry operations. * Follows the standard Docker auth flow: @@ -304,7 +545,7 @@ export class RegistryCopy { /** * Copies a single blob from source to destination registry. - * Uses monolithic upload (POST initiate + PUT complete). + * Uses OCI chunked upload to avoid buffering multi-GB layers in memory. */ private async copyBlob( srcRegistry: string, @@ -322,20 +563,6 @@ export class RegistryCopy { return; } - // Download blob from source - const getResp = await this.registryFetch(srcRegistry, `/v2/${srcRepo}/blobs/${digest}`, { - repo: srcRepo, - actions: 'pull', - credentials: srcCredentials, - }); - - if (!getResp.ok) { - throw new Error(`Failed to get blob ${digest} from ${srcRegistry}/${srcRepo}: ${getResp.status}`); - } - - const blobData = Buffer.from(await getResp.arrayBuffer()); - const blobSize = blobData.length; - // Initiate upload at destination const postResp = await this.registryFetch(destRegistry, `/v2/${destRepo}/blobs/uploads/`, { method: 'POST', @@ -356,37 +583,72 @@ export class RegistryCopy { throw new Error(`No upload location returned from ${destRegistry}/${destRepo}`); } - // Make upload URL absolute if relative - if (uploadUrl.startsWith('/')) { - const apiBase = this.getRegistryApiBase(destRegistry); - uploadUrl = `${apiBase}${uploadUrl}`; - } - - // Complete upload with PUT (monolithic) - const separator = uploadUrl.includes('?') ? '&' : '?'; - const putUrl = `${uploadUrl}${separator}digest=${encodeURIComponent(digest)}`; - - // For PUT to the upload URL, we need auth - const token = await this.getToken(destRegistry, destRepo, 'pull,push', destCredentials); - const putHeaders: Record = { - 'Content-Type': 'application/octet-stream', - 'Content-Length': String(blobSize), - }; - if (token) { - putHeaders['Authorization'] = `Bearer ${token}`; - } - - const putResp = await this.fetchWithRetry(putUrl, { - method: 'PUT', - headers: putHeaders, - body: blobData, - }, 300_000); - - if (!putResp.ok) { - const body = await putResp.text(); - throw new Error(`Failed to upload blob ${digest} to ${destRegistry}/${destRepo}: ${putResp.status} ${body}`); + uploadUrl = this.normalizeUploadUrl(destRegistry, uploadUrl); + + let uploadedBytes = 0; + let uploadCompleted = false; + try { + // Download blob from source only after the destination upload can accept it. + const getResp = await this.registryFetch(srcRegistry, `/v2/${srcRepo}/blobs/${digest}`, { + repo: srcRepo, + actions: 'pull', + credentials: srcCredentials, + }); + + if (!getResp.ok) { + await this.cancelResponseBody(getResp); + throw new Error(`Failed to get blob ${digest} from ${srcRegistry}/${srcRepo}: ${getResp.status}`); + } + + for await (const chunk of this.readBlobChunks(getResp)) { + const result = await this.patchUploadChunk( + destRegistry, + destRepo, + uploadUrl, + chunk, + uploadedBytes, + destCredentials, + ); + uploadUrl = result.uploadUrl; + uploadedBytes = result.uploadedBytes; + } + + // Complete upload with a zero-length PUT and the expected digest. + const separator = uploadUrl.includes('?') ? '&' : '?'; + const putUrl = `${uploadUrl}${separator}digest=${encodeURIComponent(digest)}`; + + const putResp = await this.fetchUploadUrl(destRegistry, destRepo, putUrl, { + method: 'PUT', + headers: { + 'Content-Type': 'application/octet-stream', + 'Content-Length': '0', + }, + }, destCredentials, 300_000, 1).catch(async (err) => { + if (await this.blobExists(destRegistry, destRepo, digest, destCredentials)) { + return new Response(null, { status: 201 }); + } + throw err; + }); + + if (!putResp.ok) { + if (await this.blobExists(destRegistry, destRepo, digest, destCredentials)) { + logger.log('info', ` Blob ${digest.substring(0, 19)}... finalized despite response ${putResp.status}`); + uploadCompleted = true; + return; + } + const body = await putResp.text(); + throw new Error(`Failed to upload blob ${digest} to ${destRegistry}/${destRepo}: ${putResp.status} ${body}`); + } + + uploadCompleted = true; + } catch (err) { + if (!uploadCompleted) { + await this.cancelUpload(destRegistry, destRepo, uploadUrl, destCredentials); + } + throw err; } + const blobSize = uploadedBytes; const sizeStr = blobSize > 1048576 ? `${(blobSize / 1048576).toFixed(1)} MB` : `${(blobSize / 1024).toFixed(1)} KB`;