Files
tsdocker/ts/classes.registrycopy.ts

568 lines
19 KiB
TypeScript

import * as fs from 'fs';
import * as os from 'os';
import * as path from 'path';
import { logger } from './tsdocker.logging.js';
interface IRegistryCredentials {
username: string;
password: string;
}
interface ITokenCache {
[scope: string]: { token: string; expiry: number };
}
/**
* OCI Distribution API client for copying images between registries.
* Supports manifest lists (multi-arch) and single-platform manifests.
* Uses native fetch (Node 18+).
*/
export class RegistryCopy {
private tokenCache: ITokenCache = {};
/**
* Wraps fetch() with timeout (via AbortSignal) and retry with exponential backoff.
* Retries on network errors and 5xx; does NOT retry on 4xx client errors.
* On 401, clears the token cache entry so the next attempt re-authenticates.
*/
private async fetchWithRetry(
url: string,
options: RequestInit & { duplex?: string },
timeoutMs: number = 300_000,
maxRetries: number = 6,
): Promise<Response> {
const method = (options.method || 'GET').toUpperCase();
let lastError: Error | null = null;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
if (attempt > 1) {
logger.log('info', `Retry ${attempt}/${maxRetries} for ${method} ${url}`);
}
const resp = await fetch(url, {
...options,
signal: AbortSignal.timeout(timeoutMs),
});
// Retry on 5xx server errors (but not 4xx)
if (resp.status >= 500 && attempt < maxRetries) {
const delay = 1000 * Math.pow(2, attempt - 1);
logger.log('warn', `${method} ${url} returned ${resp.status}, retrying in ${delay}ms (attempt ${attempt}/${maxRetries})...`);
await new Promise(r => setTimeout(r, delay));
continue;
}
if (resp.status >= 500) {
logger.log('error', `${method} ${url} returned ${resp.status} after ${maxRetries} attempts, giving up`);
}
return resp;
} catch (err) {
lastError = err as Error;
if (attempt < maxRetries) {
const delay = 1000 * Math.pow(2, attempt - 1);
logger.log('warn', `${method} ${url} failed (attempt ${attempt}/${maxRetries}): ${lastError.message}, retrying in ${delay}ms...`);
await new Promise(r => setTimeout(r, delay));
} else {
logger.log('error', `${method} ${url} failed after ${maxRetries} attempts: ${lastError.message}`);
}
}
}
throw lastError!;
}
/**
* Reads Docker credentials from ~/.docker/config.json for a given registry.
* Supports base64-encoded "auth" field in the config.
*/
public static getDockerConfigCredentials(registryUrl: string): IRegistryCredentials | null {
try {
const configPath = path.join(os.homedir(), '.docker', 'config.json');
if (!fs.existsSync(configPath)) return null;
const config = JSON.parse(fs.readFileSync(configPath, 'utf-8'));
const auths = config.auths || {};
// Try exact match first, then common variations
const keys = [
registryUrl,
`https://${registryUrl}`,
`http://${registryUrl}`,
];
// Docker Hub special cases
if (registryUrl === 'docker.io' || registryUrl === 'registry-1.docker.io') {
keys.push(
'https://index.docker.io/v1/',
'https://index.docker.io/v2/',
'index.docker.io',
'docker.io',
'registry-1.docker.io',
);
}
for (const key of keys) {
if (auths[key]?.auth) {
const decoded = Buffer.from(auths[key].auth, 'base64').toString('utf-8');
const colonIndex = decoded.indexOf(':');
if (colonIndex > 0) {
return {
username: decoded.substring(0, colonIndex),
password: decoded.substring(colonIndex + 1),
};
}
}
}
return null;
} catch {
return null;
}
}
/**
* Returns the API base URL for a registry.
* Docker Hub uses registry-1.docker.io as API endpoint.
*/
private getRegistryApiBase(registry: string): string {
if (registry === 'docker.io' || registry === 'index.docker.io') {
return 'https://registry-1.docker.io';
}
// Local registries (localhost) use HTTP
if (registry.startsWith('localhost') || registry.startsWith('127.0.0.1')) {
return `http://${registry}`;
}
return `https://${registry}`;
}
/**
* Obtains a Bearer token for registry operations.
* Follows the standard Docker auth flow:
* GET /v2/ → 401 with Www-Authenticate → request token
*/
private async getToken(
registry: string,
repo: string,
actions: string,
credentials?: IRegistryCredentials | null,
): Promise<string | null> {
const scope = `repository:${repo}:${actions}`;
const cached = this.tokenCache[`${registry}/${scope}`];
if (cached && cached.expiry > Date.now()) {
return cached.token;
}
const apiBase = this.getRegistryApiBase(registry);
// Local registries typically don't need auth
if (registry.startsWith('localhost') || registry.startsWith('127.0.0.1')) {
return null;
}
try {
const checkResp = await this.fetchWithRetry(`${apiBase}/v2/`, { method: 'GET' }, 30_000);
if (checkResp.ok) return null; // No auth needed
const wwwAuth = checkResp.headers.get('www-authenticate') || '';
const realmMatch = wwwAuth.match(/realm="([^"]+)"/);
const serviceMatch = wwwAuth.match(/service="([^"]+)"/);
if (!realmMatch) return null;
const realm = realmMatch[1];
const service = serviceMatch ? serviceMatch[1] : '';
const tokenUrl = new URL(realm);
tokenUrl.searchParams.set('scope', scope);
if (service) tokenUrl.searchParams.set('service', service);
const headers: Record<string, string> = {};
const creds = credentials || RegistryCopy.getDockerConfigCredentials(registry);
if (creds) {
headers['Authorization'] = 'Basic ' + Buffer.from(`${creds.username}:${creds.password}`).toString('base64');
}
const tokenResp = await this.fetchWithRetry(tokenUrl.toString(), { headers }, 30_000);
if (!tokenResp.ok) {
const body = await tokenResp.text();
throw new Error(`Token request failed (${tokenResp.status}): ${body}`);
}
const tokenData = await tokenResp.json() as any;
const token = tokenData.token || tokenData.access_token;
if (token) {
// Cache for 5 minutes (conservative)
this.tokenCache[`${registry}/${scope}`] = {
token,
expiry: Date.now() + 5 * 60 * 1000,
};
}
return token;
} catch (err) {
logger.log('warn', `Auth for ${registry}: ${(err as Error).message}`);
return null;
}
}
/**
* Makes an authenticated request to a registry.
*/
private async registryFetch(
registry: string,
path: string,
options: {
method?: string;
headers?: Record<string, string>;
body?: Buffer | ReadableStream | null;
repo?: string;
actions?: string;
credentials?: IRegistryCredentials | null;
} = {},
): Promise<Response> {
const apiBase = this.getRegistryApiBase(registry);
const method = options.method || 'GET';
const headers: Record<string, string> = { ...(options.headers || {}) };
const repo = options.repo || '';
const actions = options.actions || 'pull';
const token = await this.getToken(registry, repo, actions, options.credentials);
if (token) {
headers['Authorization'] = `Bearer ${token}`;
}
const url = `${apiBase}${path}`;
const fetchOptions: any = { method, headers };
if (options.body) {
fetchOptions.body = options.body;
fetchOptions.duplex = 'half'; // Required for streaming body in Node
}
const resp = await this.fetchWithRetry(url, fetchOptions, 300_000);
// Token expired — clear cache so next call re-authenticates
if (resp.status === 401 && token) {
const cacheKey = `${registry}/${`repository:${repo}:${actions}`}`;
logger.log('warn', `Got 401 for ${registry}${path} — clearing cached token for ${cacheKey}`);
delete this.tokenCache[cacheKey];
}
return resp;
}
/**
* Gets a manifest from a registry (supports both manifest lists and single manifests).
*/
private async getManifest(
registry: string,
repo: string,
reference: string,
credentials?: IRegistryCredentials | null,
): Promise<{ contentType: string; body: any; digest: string; raw: Buffer }> {
const accept = [
'application/vnd.oci.image.index.v1+json',
'application/vnd.docker.distribution.manifest.list.v2+json',
'application/vnd.oci.image.manifest.v1+json',
'application/vnd.docker.distribution.manifest.v2+json',
].join(', ');
const resp = await this.registryFetch(registry, `/v2/${repo}/manifests/${reference}`, {
headers: { 'Accept': accept },
repo,
actions: 'pull',
credentials,
});
if (!resp.ok) {
const body = await resp.text();
throw new Error(`Failed to get manifest ${registry}/${repo}:${reference} (${resp.status}): ${body}`);
}
const raw = Buffer.from(await resp.arrayBuffer());
const contentType = resp.headers.get('content-type') || '';
const digest = resp.headers.get('docker-content-digest') || this.computeDigest(raw);
const body = JSON.parse(raw.toString('utf-8'));
return { contentType, body, digest, raw };
}
/**
* Checks if a blob exists in the destination registry.
*/
private async blobExists(
registry: string,
repo: string,
digest: string,
credentials?: IRegistryCredentials | null,
): Promise<boolean> {
const resp = await this.registryFetch(registry, `/v2/${repo}/blobs/${digest}`, {
method: 'HEAD',
repo,
actions: 'pull,push',
credentials,
});
return resp.ok;
}
/**
* Copies a single blob from source to destination registry.
* Uses monolithic upload (POST initiate + PUT complete).
*/
private async copyBlob(
srcRegistry: string,
srcRepo: string,
destRegistry: string,
destRepo: string,
digest: string,
srcCredentials?: IRegistryCredentials | null,
destCredentials?: IRegistryCredentials | null,
): Promise<void> {
// Check if blob already exists at destination
const exists = await this.blobExists(destRegistry, destRepo, digest, destCredentials);
if (exists) {
logger.log('info', ` Blob ${digest.substring(0, 19)}... already exists, skipping`);
return;
}
// Download blob from source
const getResp = await this.registryFetch(srcRegistry, `/v2/${srcRepo}/blobs/${digest}`, {
repo: srcRepo,
actions: 'pull',
credentials: srcCredentials,
});
if (!getResp.ok) {
throw new Error(`Failed to get blob ${digest} from ${srcRegistry}/${srcRepo}: ${getResp.status}`);
}
const blobData = Buffer.from(await getResp.arrayBuffer());
const blobSize = blobData.length;
// Initiate upload at destination
const postResp = await this.registryFetch(destRegistry, `/v2/${destRepo}/blobs/uploads/`, {
method: 'POST',
headers: { 'Content-Length': '0' },
repo: destRepo,
actions: 'pull,push',
credentials: destCredentials,
});
if (!postResp.ok && postResp.status !== 202) {
const body = await postResp.text();
throw new Error(`Failed to initiate upload at ${destRegistry}/${destRepo}: ${postResp.status} ${body}`);
}
// Get upload URL from Location header
let uploadUrl = postResp.headers.get('location') || '';
if (!uploadUrl) {
throw new Error(`No upload location returned from ${destRegistry}/${destRepo}`);
}
// Make upload URL absolute if relative
if (uploadUrl.startsWith('/')) {
const apiBase = this.getRegistryApiBase(destRegistry);
uploadUrl = `${apiBase}${uploadUrl}`;
}
// Complete upload with PUT (monolithic)
const separator = uploadUrl.includes('?') ? '&' : '?';
const putUrl = `${uploadUrl}${separator}digest=${encodeURIComponent(digest)}`;
// For PUT to the upload URL, we need auth
const token = await this.getToken(destRegistry, destRepo, 'pull,push', destCredentials);
const putHeaders: Record<string, string> = {
'Content-Type': 'application/octet-stream',
'Content-Length': String(blobSize),
};
if (token) {
putHeaders['Authorization'] = `Bearer ${token}`;
}
const putResp = await this.fetchWithRetry(putUrl, {
method: 'PUT',
headers: putHeaders,
body: blobData,
}, 300_000);
if (!putResp.ok) {
const body = await putResp.text();
throw new Error(`Failed to upload blob ${digest} to ${destRegistry}/${destRepo}: ${putResp.status} ${body}`);
}
const sizeStr = blobSize > 1048576
? `${(blobSize / 1048576).toFixed(1)} MB`
: `${(blobSize / 1024).toFixed(1)} KB`;
logger.log('info', ` Copied blob ${digest.substring(0, 19)}... (${sizeStr})`);
}
/**
* Pushes a manifest to a registry.
*/
private async putManifest(
registry: string,
repo: string,
reference: string,
manifest: Buffer,
contentType: string,
credentials?: IRegistryCredentials | null,
): Promise<string> {
const resp = await this.registryFetch(registry, `/v2/${repo}/manifests/${reference}`, {
method: 'PUT',
headers: {
'Content-Type': contentType,
'Content-Length': String(manifest.length),
},
body: manifest,
repo,
actions: 'pull,push',
credentials,
});
if (!resp.ok) {
const body = await resp.text();
throw new Error(`Failed to put manifest ${registry}/${repo}:${reference} (${resp.status}): ${body}`);
}
const digest = resp.headers.get('docker-content-digest') || this.computeDigest(manifest);
return digest;
}
/**
* Copies a single-platform manifest and all its blobs from source to destination.
*/
private async copySingleManifest(
srcRegistry: string,
srcRepo: string,
destRegistry: string,
destRepo: string,
manifestDigest: string,
srcCredentials?: IRegistryCredentials | null,
destCredentials?: IRegistryCredentials | null,
): Promise<void> {
// Get the platform manifest
const { body: manifest, contentType, raw } = await this.getManifest(
srcRegistry, srcRepo, manifestDigest, srcCredentials,
);
// Copy config blob
if (manifest.config?.digest) {
logger.log('info', ` Copying config blob...`);
await this.copyBlob(
srcRegistry, srcRepo, destRegistry, destRepo,
manifest.config.digest, srcCredentials, destCredentials,
);
}
// Copy layer blobs
const layers = manifest.layers || [];
for (let i = 0; i < layers.length; i++) {
const layer = layers[i];
logger.log('info', ` Copying layer ${i + 1}/${layers.length}...`);
await this.copyBlob(
srcRegistry, srcRepo, destRegistry, destRepo,
layer.digest, srcCredentials, destCredentials,
);
}
// Push the platform manifest by digest
await this.putManifest(
destRegistry, destRepo, manifestDigest, raw, contentType, destCredentials,
);
}
/**
* Copies a complete image (single or multi-arch) from source to destination registry.
*
* @param srcRegistry - Source registry host (e.g., "localhost:5234")
* @param srcRepo - Source repository (e.g., "myapp")
* @param srcTag - Source tag (e.g., "v1.0.0")
* @param destRegistry - Destination registry host (e.g., "registry.gitlab.com")
* @param destRepo - Destination repository (e.g., "org/myapp")
* @param destTag - Destination tag (e.g., "v1.0.0" or "v1.0.0_arm64")
* @param credentials - Optional credentials for destination registry
*/
public async copyImage(
srcRegistry: string,
srcRepo: string,
srcTag: string,
destRegistry: string,
destRepo: string,
destTag: string,
credentials?: IRegistryCredentials | null,
): Promise<void> {
logger.log('info', `Copying ${srcRegistry}/${srcRepo}:${srcTag} -> ${destRegistry}/${destRepo}:${destTag}`);
// Source is always the local registry (no credentials needed)
const srcCredentials: IRegistryCredentials | null = null;
const destCredentials = credentials || RegistryCopy.getDockerConfigCredentials(destRegistry);
// Get the top-level manifest
const topManifest = await this.getManifest(srcRegistry, srcRepo, srcTag, srcCredentials);
const { body, contentType, raw } = topManifest;
const isManifestList =
contentType.includes('manifest.list') ||
contentType.includes('image.index') ||
body.manifests !== undefined;
if (isManifestList) {
// Multi-arch: copy each platform manifest + blobs, then push the manifest list
const platforms = (body.manifests || []) as any[];
logger.log('info', `Multi-arch manifest with ${platforms.length} platform(s)`);
for (const platformEntry of platforms) {
const platDesc = platformEntry.platform
? `${platformEntry.platform.os}/${platformEntry.platform.architecture}`
: platformEntry.digest;
logger.log('info', `Copying platform: ${platDesc}`);
await this.copySingleManifest(
srcRegistry, srcRepo, destRegistry, destRepo,
platformEntry.digest, srcCredentials, destCredentials,
);
}
// Push the manifest list/index with the destination tag
const digest = await this.putManifest(
destRegistry, destRepo, destTag, raw, contentType, destCredentials,
);
logger.log('ok', `Pushed manifest list to ${destRegistry}/${destRepo}:${destTag} (${digest.substring(0, 19)}...)`);
} else {
// Single-platform manifest: copy blobs + push manifest
logger.log('info', 'Single-platform manifest');
// Copy config blob
if (body.config?.digest) {
logger.log('info', ' Copying config blob...');
await this.copyBlob(
srcRegistry, srcRepo, destRegistry, destRepo,
body.config.digest, srcCredentials, destCredentials,
);
}
// Copy layer blobs
const layers = body.layers || [];
for (let i = 0; i < layers.length; i++) {
logger.log('info', ` Copying layer ${i + 1}/${layers.length}...`);
await this.copyBlob(
srcRegistry, srcRepo, destRegistry, destRepo,
layers[i].digest, srcCredentials, destCredentials,
);
}
// Push the manifest with the destination tag
const digest = await this.putManifest(
destRegistry, destRepo, destTag, raw, contentType, destCredentials,
);
logger.log('ok', `Pushed manifest to ${destRegistry}/${destRepo}:${destTag} (${digest.substring(0, 19)}...)`);
}
}
/**
* Computes sha256 digest of a buffer.
*/
private computeDigest(data: Buffer): string {
const crypto = require('crypto');
const hash = crypto.createHash('sha256').update(data).digest('hex');
return `sha256:${hash}`;
}
}