fix(registry-copy): stream blob uploads in chunks

This commit is contained in:
2026-06-02 13:38:48 +00:00
parent b3cabba0e9
commit 5b68fa55d0
3 changed files with 326 additions and 83 deletions
+7
View File
@@ -3,6 +3,13 @@
## Pending
### Fixes
- stream blob uploads in chunks (registry-copy)
- Upload blobs via OCI chunked PATCH requests instead of buffering full layers in memory
- Retry chunk uploads with upload status checks to resume from accepted byte ranges
- Cancel incomplete uploads and handle registries that finalize blobs despite inconsistent responses
## 2026-05-22 - 2.4.0
### Features
+11 -37
View File
@@ -866,9 +866,6 @@ packages:
engines: {node: '>=18'}
hasBin: true
'@push.rocks/consolecolor@2.0.3':
resolution: {integrity: sha512-hA+m0BMqEwZNSAS7c2aQFfoPkpX/dNdsHzkdLdeERUOy7BLacb9ItTUofGtjtginP0yDj4NSpqSjNYyX3Y8Y/w==}
'@push.rocks/consolecolor@2.0.4':
resolution: {integrity: sha512-rQJfuSJLzm117PBpsfyemX8Q/rpKh8ZVc2AqDVu6RXJMJkmGkKsADe0/rnttuHZYss8IP7yJIN9E6Vnx+jyy0A==}
@@ -920,9 +917,6 @@ packages:
'@push.rocks/smartdata@7.1.7':
resolution: {integrity: sha512-HDI/Q9dKybfsJ68oCzlE+S63Xpij9qXnMfi28yznKP0Li1ECVZZMDDGIW5IjsXlHjO+Q+RJMcVd72Pjt3QLY5Q==}
'@push.rocks/smartdelay@3.0.5':
resolution: {integrity: sha512-mUuI7kj2f7ztjpic96FvRIlf2RsKBa5arw81AHNsndbxO6asRcxuWL8dTVxouEIK8YsBUlj0AsrCkHhMbLQdHw==}
'@push.rocks/smartdelay@3.1.0':
resolution: {integrity: sha512-59xveBMbWmbFhh/rqhQnYG/klg/VONG9hV8+RQ7ftqsNRkcmUT+VM5etAbODgAUvsF4lxK+xVR0tbZOo0kGhRQ==}
@@ -1007,9 +1001,6 @@ packages:
'@push.rocks/smartpdf@4.2.2':
resolution: {integrity: sha512-xQWRChCLcM/sUrRuanvIcND/dKrnCYfL8Rr3kzSIPgSoDSmdDbd4kz7lLAHEPTsCezIwg2VqxFidW+zMNZ5Z1Q==}
'@push.rocks/smartpromise@4.2.3':
resolution: {integrity: sha512-Ycg/TJR+tMt+S3wSFurOpEoW6nXv12QBtKXgBcjMZ4RsdO28geN46U09osPn9N9WuwQy1PkmTV5J/V4F9U8qEw==}
'@push.rocks/smartpromise@4.2.4':
resolution: {integrity: sha512-8FUyYt94hOIY9mqHjitn4h69u0jbEtTF2RKKw2DpiTVFjpDTk9gXbVHZ/V+xEcBrN4mrzdQES0OiDmkNPoddEQ==}
@@ -1049,9 +1040,6 @@ packages:
'@push.rocks/smartstream@3.4.2':
resolution: {integrity: sha512-JsjFjaNIlCBUglciM/IrXH0mH+oOQTLYQ6UMwqsew2XSUTXxER3ev2NeKMDBV6ONf2HF21EPnOZuKfgvtNGnUg==}
'@push.rocks/smartstring@4.1.0':
resolution: {integrity: sha512-Q4py/Nm3KTDhQ9EiC75yBtSTLR0KLMwhKM+8gGcutgKotZT6wJ3gncjmtD8LKFfNhb4lSaFMgPJgLrCHTOH6Iw==}
'@push.rocks/smartstring@4.1.1':
resolution: {integrity: sha512-FlEpp2PcQ819ymmxjWb5/2gD8uPic/+IvOrSP2+KTdXLHOI4GSyK9YW/YBF541LVGl0GC3VGFmypcPNUzkPfYw==}
@@ -4544,10 +4532,6 @@ snapshots:
- react-native-b4a
- supports-color
'@push.rocks/consolecolor@2.0.3':
dependencies:
ansi-256-colors: 1.1.0
'@push.rocks/consolecolor@2.0.4':
dependencies:
ansi-256-colors: 1.1.0
@@ -4602,8 +4586,8 @@ snapshots:
dependencies:
'@push.rocks/smartfs': 1.5.1
'@push.rocks/smartpath': 6.0.0
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartstring': 4.1.0
'@push.rocks/smartpromise': 4.2.4
'@push.rocks/smartstring': 4.1.1
'@push.rocks/qenv@6.1.4':
dependencies:
@@ -4740,10 +4724,6 @@ snapshots:
- supports-color
- vue
'@push.rocks/smartdelay@3.0.5':
dependencies:
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartdelay@3.1.0':
dependencies:
'@push.rocks/smartpromise': 4.2.4
@@ -4771,7 +4751,7 @@ snapshots:
'@push.rocks/smartexit@2.0.3':
dependencies:
'@push.rocks/lik': 6.4.1
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartpromise': 4.2.4
'@push.rocks/smartexpect@2.5.0':
dependencies:
@@ -4820,7 +4800,7 @@ snapshots:
dependencies:
'@push.rocks/lik': 6.4.1
'@push.rocks/smartobject': 1.0.12
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartpromise': 4.2.4
inquirer: 11.1.0
'@push.rocks/smartjimp@1.2.1':
@@ -4844,9 +4824,9 @@ snapshots:
'@push.rocks/smartlog-destination-local@9.0.2':
dependencies:
'@push.rocks/consolecolor': 2.0.3
'@push.rocks/consolecolor': 2.0.4
'@push.rocks/smartlog-interfaces': 3.0.2
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartpromise': 4.2.4
'@push.rocks/smartlog-interfaces@3.0.2':
dependencies:
@@ -5005,8 +4985,6 @@ snapshots:
- typescript
- utf-8-validate
'@push.rocks/smartpromise@4.2.3': {}
'@push.rocks/smartpromise@4.2.4': {}
'@push.rocks/smartpuppeteer@2.0.6(typescript@6.0.3)':
@@ -5052,7 +5030,7 @@ snapshots:
'@push.rocks/smartrx@3.0.10':
dependencies:
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartpromise': 4.2.4
rxjs: 7.8.2
'@push.rocks/smartserve@2.0.4':
@@ -5070,9 +5048,9 @@ snapshots:
'@push.rocks/smartshell@3.3.8':
dependencies:
'@push.rocks/smartdelay': 3.0.5
'@push.rocks/smartdelay': 3.1.0
'@push.rocks/smartexit': 2.0.3
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartpromise': 4.2.4
'@types/which': 3.0.4
which: 6.0.1
@@ -5106,10 +5084,6 @@ snapshots:
'@push.rocks/smartpromise': 4.2.4
'@push.rocks/smartrx': 3.0.10
'@push.rocks/smartstring@4.1.0':
dependencies:
'@push.rocks/isounique': 1.0.5
'@push.rocks/smartstring@4.1.1':
dependencies:
'@push.rocks/isounique': 1.0.5
@@ -5117,8 +5091,8 @@ snapshots:
'@push.rocks/smarttime@4.2.3':
dependencies:
'@push.rocks/lik': 6.4.1
'@push.rocks/smartdelay': 3.0.5
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartdelay': 3.1.0
'@push.rocks/smartpromise': 4.2.4
croner: 10.0.1
date-fns: 4.1.0
dayjs: 1.11.20
+308 -46
View File
@@ -3,6 +3,8 @@ import * as os from 'os';
import * as path from 'path';
import { logger } from './tsdocker.logging.js';
const blobUploadChunkSize = 32 * 1024 * 1024;
interface IRegistryCredentials {
username: string;
password: string;
@@ -20,6 +22,20 @@ interface ITokenCache {
export class RegistryCopy {
private tokenCache: ITokenCache = {};
private formatError(error: unknown): string {
const err = error as Error & { cause?: unknown };
const cause = err.cause instanceof Error
? `; cause: ${err.cause.message}`
: err.cause
? `; cause: ${String(err.cause)}`
: '';
return `${err.message}${cause}`;
}
private async cancelResponseBody(resp: Response): Promise<void> {
await resp.body?.cancel().catch(() => undefined);
}
/**
* Wraps fetch() with timeout (via AbortSignal) and retry with exponential backoff.
* Retries on network errors and 5xx; does NOT retry on 4xx client errors.
@@ -46,6 +62,7 @@ export class RegistryCopy {
if (resp.status >= 500 && attempt < maxRetries) {
const delay = 1000 * Math.pow(2, attempt - 1);
logger.log('warn', `${method} ${url} returned ${resp.status}, retrying in ${delay}ms (attempt ${attempt}/${maxRetries})...`);
await this.cancelResponseBody(resp);
await new Promise(r => setTimeout(r, delay));
continue;
}
@@ -57,10 +74,10 @@ export class RegistryCopy {
lastError = err as Error;
if (attempt < maxRetries) {
const delay = 1000 * Math.pow(2, attempt - 1);
logger.log('warn', `${method} ${url} failed (attempt ${attempt}/${maxRetries}): ${lastError.message}, retrying in ${delay}ms...`);
logger.log('warn', `${method} ${url} failed (attempt ${attempt}/${maxRetries}): ${this.formatError(err)}, retrying in ${delay}ms...`);
await new Promise(r => setTimeout(r, delay));
} else {
logger.log('error', `${method} ${url} failed after ${maxRetries} attempts: ${lastError.message}`);
logger.log('error', `${method} ${url} failed after ${maxRetries} attempts: ${this.formatError(err)}`);
}
}
}
@@ -131,6 +148,230 @@ export class RegistryCopy {
return `https://${registry}`;
}
private normalizeUploadUrl(registry: string, location: string): string {
if (location.startsWith('/')) {
return `${this.getRegistryApiBase(registry)}${location}`;
}
return location;
}
private parseUploadRange(rangeHeader: string | null, currentBytes: number): number | null {
if (!rangeHeader) {
return null;
}
const rangeMatch = rangeHeader.match(/(?:bytes=)?(\d+)-(\d+)$/);
if (!rangeMatch) {
return null;
}
const start = Number(rangeMatch[1]);
const end = Number(rangeMatch[2]);
if (!Number.isFinite(start) || !Number.isFinite(end) || start !== 0 || end < 0) {
return null;
}
if (currentBytes === 0 && end === 0) {
return 0;
}
return end + 1;
}
private async fetchUploadUrl(
registry: string,
repo: string,
uploadUrl: string,
options: RequestInit & { duplex?: string },
credentials?: IRegistryCredentials | null,
timeoutMs: number = 300_000,
maxRetries: number = 1,
): Promise<Response> {
const token = await this.getToken(registry, repo, 'pull,push', credentials);
const headers: Record<string, string> = { ...((options.headers || {}) as Record<string, string>) };
if (token) {
headers['Authorization'] = `Bearer ${token}`;
}
const fetchOptions: RequestInit & { duplex?: string } = {
...options,
headers,
};
if (options.body) {
fetchOptions.duplex = 'half';
}
return this.fetchWithRetry(uploadUrl, fetchOptions, timeoutMs, maxRetries);
}
private async getUploadStatus(
registry: string,
repo: string,
uploadUrl: string,
currentBytes: number,
credentials?: IRegistryCredentials | null,
): Promise<{ uploadUrl: string; uploadedBytes: number | null }> {
const resp = await this.fetchUploadUrl(registry, repo, uploadUrl, { method: 'GET' }, credentials, 30_000, 3);
const nextLocation = resp.headers.get('location');
const nextUploadUrl = nextLocation ? this.normalizeUploadUrl(registry, nextLocation) : uploadUrl;
if (resp.status !== 204 && resp.status !== 202) {
const body = await resp.text();
throw new Error(`Failed to get upload status for ${registry}/${repo}: ${resp.status} ${body}`);
}
return {
uploadUrl: nextUploadUrl,
uploadedBytes: this.parseUploadRange(resp.headers.get('range'), currentBytes),
};
}
private async *readBlobChunks(resp: Response): AsyncGenerator<Buffer> {
if (!resp.body) {
yield Buffer.from(await resp.arrayBuffer());
return;
}
const reader = resp.body.getReader();
const pendingBuffers: Buffer[] = [];
let pendingBytes = 0;
let fullyRead = false;
const takeChunk = (size: number): Buffer => {
const chunk = Buffer.allocUnsafe(size);
let chunkOffset = 0;
while (chunkOffset < size) {
const nextBuffer = pendingBuffers[0];
const bytesToCopy = Math.min(nextBuffer.length, size - chunkOffset);
nextBuffer.copy(chunk, chunkOffset, 0, bytesToCopy);
chunkOffset += bytesToCopy;
pendingBytes -= bytesToCopy;
if (bytesToCopy === nextBuffer.length) {
pendingBuffers.shift();
} else {
pendingBuffers[0] = nextBuffer.subarray(bytesToCopy);
}
}
return chunk;
};
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
fullyRead = true;
break;
}
const nextBuffer = Buffer.from(value);
pendingBuffers.push(nextBuffer);
pendingBytes += nextBuffer.length;
while (pendingBytes >= blobUploadChunkSize) {
yield takeChunk(blobUploadChunkSize);
}
}
if (pendingBytes > 0) {
yield takeChunk(pendingBytes);
}
} finally {
if (!fullyRead) {
await reader.cancel().catch(() => undefined);
}
reader.releaseLock();
}
}
private async cancelUpload(
registry: string,
repo: string,
uploadUrl: string,
credentials?: IRegistryCredentials | null,
): Promise<void> {
try {
const resp = await this.fetchUploadUrl(registry, repo, uploadUrl, { method: 'DELETE' }, credentials, 30_000, 1);
if (!resp.ok && resp.status !== 404) {
logger.log('warn', `Failed to cancel upload for ${registry}/${repo}: ${resp.status} ${await resp.text()}`);
}
} catch (err) {
logger.log('warn', `Failed to cancel upload for ${registry}/${repo}: ${this.formatError(err)}`);
}
}
private async patchUploadChunk(
registry: string,
repo: string,
uploadUrl: string,
chunk: Buffer,
uploadedBytes: number,
credentials?: IRegistryCredentials | null,
): Promise<{ uploadUrl: string; uploadedBytes: number }> {
let remainingChunk = chunk;
let currentUploadUrl = uploadUrl;
let currentUploadedBytes = uploadedBytes;
let attempt = 1;
while (remainingChunk.length > 0) {
try {
const resp = await this.fetchUploadUrl(registry, repo, currentUploadUrl, {
method: 'PATCH',
headers: {
'Content-Type': 'application/octet-stream',
'Content-Length': String(remainingChunk.length),
},
body: remainingChunk as any,
}, credentials, 300_000, 1);
if (resp.status !== 202) {
const body = await resp.text();
throw new Error(`Chunk upload failed for ${registry}/${repo}: ${resp.status} ${body}`);
}
const nextLocation = resp.headers.get('location');
if (nextLocation) {
currentUploadUrl = this.normalizeUploadUrl(registry, nextLocation);
}
const rangedBytes = this.parseUploadRange(resp.headers.get('range'), currentUploadedBytes);
currentUploadedBytes = rangedBytes && rangedBytes > currentUploadedBytes
? rangedBytes
: currentUploadedBytes + remainingChunk.length;
remainingChunk = Buffer.alloc(0);
} catch (err) {
if (attempt >= 6) {
throw err;
}
const delay = 1000 * Math.pow(2, attempt - 1);
logger.log('warn', `Chunk upload for ${registry}/${repo} failed: ${this.formatError(err)}, checking upload status before retrying in ${delay}ms...`);
const status = await this.getUploadStatus(registry, repo, currentUploadUrl, currentUploadedBytes, credentials);
currentUploadUrl = status.uploadUrl;
if (status.uploadedBytes !== null) {
const acceptedBytes = status.uploadedBytes - currentUploadedBytes;
if (acceptedBytes >= remainingChunk.length) {
currentUploadedBytes = status.uploadedBytes;
remainingChunk = Buffer.alloc(0);
break;
}
if (acceptedBytes > 0) {
currentUploadedBytes = status.uploadedBytes;
remainingChunk = remainingChunk.subarray(acceptedBytes);
}
}
await new Promise(r => setTimeout(r, delay));
attempt++;
}
}
return { uploadUrl: currentUploadUrl, uploadedBytes: currentUploadedBytes };
}
/**
* Obtains a Bearer token for registry operations.
* Follows the standard Docker auth flow:
@@ -304,7 +545,7 @@ export class RegistryCopy {
/**
* Copies a single blob from source to destination registry.
* Uses monolithic upload (POST initiate + PUT complete).
* Uses OCI chunked upload to avoid buffering multi-GB layers in memory.
*/
private async copyBlob(
srcRegistry: string,
@@ -322,20 +563,6 @@ export class RegistryCopy {
return;
}
// Download blob from source
const getResp = await this.registryFetch(srcRegistry, `/v2/${srcRepo}/blobs/${digest}`, {
repo: srcRepo,
actions: 'pull',
credentials: srcCredentials,
});
if (!getResp.ok) {
throw new Error(`Failed to get blob ${digest} from ${srcRegistry}/${srcRepo}: ${getResp.status}`);
}
const blobData = Buffer.from(await getResp.arrayBuffer());
const blobSize = blobData.length;
// Initiate upload at destination
const postResp = await this.registryFetch(destRegistry, `/v2/${destRepo}/blobs/uploads/`, {
method: 'POST',
@@ -356,37 +583,72 @@ export class RegistryCopy {
throw new Error(`No upload location returned from ${destRegistry}/${destRepo}`);
}
// Make upload URL absolute if relative
if (uploadUrl.startsWith('/')) {
const apiBase = this.getRegistryApiBase(destRegistry);
uploadUrl = `${apiBase}${uploadUrl}`;
}
// Complete upload with PUT (monolithic)
const separator = uploadUrl.includes('?') ? '&' : '?';
const putUrl = `${uploadUrl}${separator}digest=${encodeURIComponent(digest)}`;
// For PUT to the upload URL, we need auth
const token = await this.getToken(destRegistry, destRepo, 'pull,push', destCredentials);
const putHeaders: Record<string, string> = {
'Content-Type': 'application/octet-stream',
'Content-Length': String(blobSize),
};
if (token) {
putHeaders['Authorization'] = `Bearer ${token}`;
}
const putResp = await this.fetchWithRetry(putUrl, {
method: 'PUT',
headers: putHeaders,
body: blobData,
}, 300_000);
if (!putResp.ok) {
const body = await putResp.text();
throw new Error(`Failed to upload blob ${digest} to ${destRegistry}/${destRepo}: ${putResp.status} ${body}`);
uploadUrl = this.normalizeUploadUrl(destRegistry, uploadUrl);
let uploadedBytes = 0;
let uploadCompleted = false;
try {
// Download blob from source only after the destination upload can accept it.
const getResp = await this.registryFetch(srcRegistry, `/v2/${srcRepo}/blobs/${digest}`, {
repo: srcRepo,
actions: 'pull',
credentials: srcCredentials,
});
if (!getResp.ok) {
await this.cancelResponseBody(getResp);
throw new Error(`Failed to get blob ${digest} from ${srcRegistry}/${srcRepo}: ${getResp.status}`);
}
for await (const chunk of this.readBlobChunks(getResp)) {
const result = await this.patchUploadChunk(
destRegistry,
destRepo,
uploadUrl,
chunk,
uploadedBytes,
destCredentials,
);
uploadUrl = result.uploadUrl;
uploadedBytes = result.uploadedBytes;
}
// Complete upload with a zero-length PUT and the expected digest.
const separator = uploadUrl.includes('?') ? '&' : '?';
const putUrl = `${uploadUrl}${separator}digest=${encodeURIComponent(digest)}`;
const putResp = await this.fetchUploadUrl(destRegistry, destRepo, putUrl, {
method: 'PUT',
headers: {
'Content-Type': 'application/octet-stream',
'Content-Length': '0',
},
}, destCredentials, 300_000, 1).catch(async (err) => {
if (await this.blobExists(destRegistry, destRepo, digest, destCredentials)) {
return new Response(null, { status: 201 });
}
throw err;
});
if (!putResp.ok) {
if (await this.blobExists(destRegistry, destRepo, digest, destCredentials)) {
logger.log('info', ` Blob ${digest.substring(0, 19)}... finalized despite response ${putResp.status}`);
uploadCompleted = true;
return;
}
const body = await putResp.text();
throw new Error(`Failed to upload blob ${digest} to ${destRegistry}/${destRepo}: ${putResp.status} ${body}`);
}
uploadCompleted = true;
} catch (err) {
if (!uploadCompleted) {
await this.cancelUpload(destRegistry, destRepo, uploadUrl, destCredentials);
}
throw err;
}
const blobSize = uploadedBytes;
const sizeStr = blobSize > 1048576
? `${(blobSize / 1048576).toFixed(1)} MB`
: `${(blobSize / 1024).toFixed(1)} KB`;