Files
cloudly/ts/manager.registry/classes.registrymanager.ts
T

637 lines
21 KiB
TypeScript

import type { Cloudly } from '../classes.cloudly.js';
import { logger } from '../logger.js';
import * as plugins from '../plugins.js';
import type { Service } from '../manager.service/classes.service.js';
type TAuthenticatedRegistryUser = {
userId: string;
username: string;
canWrite: boolean;
};
type TOciTags = Record<string, string>;
interface IOciDescriptor {
digest?: unknown;
}
interface IOciManifestDocument {
config?: IOciDescriptor;
layers?: IOciDescriptor[];
manifests?: IOciDescriptor[];
}
export class CloudlyRegistryManager {
private cloudlyRef: Cloudly;
private smartRegistry!: plugins.smartregistry.SmartRegistry;
private recordedTagDigests = new Map<string, string>();
private started = false;
constructor(cloudlyRefArg: Cloudly) {
this.cloudlyRef = cloudlyRefArg;
}
public async start() {
const publicRegistryUrl = this.getPublicRegistryUrl();
const registryJwtSecret = JSON.stringify(this.cloudlyRef.authManager.smartjwtInstance.getKeyPairAsJson());
const s3Descriptor = this.cloudlyRef.config.data.s3Descriptor;
if (!s3Descriptor?.bucketName) {
throw new Error('Cloudly registry requires an S3 bucketName');
}
this.smartRegistry = new plugins.smartregistry.SmartRegistry({
storage: s3Descriptor as plugins.smartregistry.IStorageConfig,
storageHooks: {
afterPut: async (contextArg) => {
await this.handleRegistryStorageAfterPut(contextArg);
},
},
auth: {
jwtSecret: registryJwtSecret,
tokenStore: 'memory',
npmTokens: { enabled: false },
ociTokens: {
enabled: true,
realm: `${publicRegistryUrl}/v2/token`,
service: this.cloudlyRef.config.data.publicUrl || 'cloudly',
},
pypiTokens: { enabled: false },
rubygemsTokens: { enabled: false },
},
oci: {
enabled: true,
basePath: '/v2',
registryUrl: publicRegistryUrl,
},
});
await this.smartRegistry.init();
this.started = true;
logger.log('info', `Cloudly OCI registry available at ${publicRegistryUrl}/v2`);
}
public async stop() {
if (this.smartRegistry) {
this.smartRegistry.destroy();
}
this.started = false;
}
public async handleHttpRequest(
ctx: plugins.typedserver.IRequestContext,
): Promise<Response> {
try {
const requestUrl = ctx.url;
if (requestUrl.pathname === '/v2/token') {
return await this.handleTokenRequest(ctx, requestUrl);
}
if (!this.started) {
return new Response('registry is not ready', { status: 503 });
}
const rawBody = Buffer.from(await ctx.request.arrayBuffer());
const response = await this.smartRegistry.handleRequest({
method: ctx.method || 'GET',
path: requestUrl.pathname,
query: Object.fromEntries(requestUrl.searchParams),
headers: this.headersToRecord(ctx.headers),
rawBody: rawBody.length > 0 ? rawBody : undefined,
});
return this.createRegistryResponse(response);
} catch (error) {
logger.log('error', `registry request failed: ${(error as Error).message}`);
return new Response('registry request failed', { status: 500 });
}
}
public getRegistryHost() {
if (!this.cloudlyRef.config.data.publicUrl) {
throw new Error('Cloudly registry requires publicUrl');
}
const publicPort = this.cloudlyRef.config.data.publicPort;
const includePort =
this.cloudlyRef.config.data.sslMode === 'none' && publicPort && !['80', '443'].includes(publicPort);
return `${this.cloudlyRef.config.data.publicUrl}${includePort ? `:${publicPort}` : ''}`;
}
public getServiceRegistryTarget(
serviceArg: Service,
tagArg = 'latest',
): plugins.servezoneInterfaces.data.IRegistryTarget {
const registryHost = this.getRegistryHost();
const repository = this.getServiceRepository(serviceArg);
return {
protocol: 'oci',
registryHost,
repository,
tag: tagArg,
imageUrl: `${registryHost}/${repository}:${tagArg}`,
serviceId: serviceArg.id,
imageId: serviceArg.data?.imageId,
};
}
public async deleteServiceRepository(serviceArg: Service): Promise<void> {
const repository = serviceArg.data.registryTarget?.repository;
if (!repository) return;
const services = await this.cloudlyRef.serviceManager.CService.getInstances({});
const referencedByOtherService = services.some((candidateArg) => {
return candidateArg.id !== serviceArg.id && candidateArg.data.registryTarget?.repository === repository;
});
if (referencedByOtherService) return;
await this.deleteOciRepository(repository);
for (const tagKey of Array.from(this.recordedTagDigests.keys())) {
if (tagKey.startsWith(`${repository}:`)) {
this.recordedTagDigests.delete(tagKey);
}
}
}
private getRegistryStorage(): any {
if (!this.started || !this.smartRegistry) {
throw new Error('Cloudly registry is not started');
}
return this.smartRegistry.getStorage();
}
private getOciTagsPath(repositoryArg: string): string {
return `oci/tags/${repositoryArg}/tags.json`;
}
private normalizeOciDigest(digestArg: string | null | undefined): string | null {
if (typeof digestArg !== 'string') return null;
const normalizedDigest = digestArg.trim().toLowerCase();
return /^sha256:[a-f0-9]{64}$/.test(normalizedDigest) ? normalizedDigest : null;
}
private getSha256HashFromDigest(digestArg: string): string {
const normalizedDigest = this.normalizeOciDigest(digestArg);
if (!normalizedDigest) {
throw new Error(`Invalid OCI digest: ${digestArg}`);
}
return normalizedDigest.slice('sha256:'.length);
}
private getOciManifestPath(repositoryArg: string, digestArg: string): string {
return `oci/manifests/${repositoryArg}/${this.getSha256HashFromDigest(digestArg)}`;
}
private getOciBlobPath(digestArg: string): string {
return `oci/blobs/sha256/${this.getSha256HashFromDigest(digestArg)}`;
}
private async readOciTags(repositoryArg: string, storageArg = this.getRegistryStorage()): Promise<TOciTags> {
const tagsBuffer = await storageArg.getObject(this.getOciTagsPath(repositoryArg));
if (!tagsBuffer) return {};
const parsedTags = JSON.parse(tagsBuffer.toString('utf8'));
if (!parsedTags || typeof parsedTags !== 'object' || Array.isArray(parsedTags)) {
throw new Error(`Invalid OCI tags document for ${repositoryArg}`);
}
const tags: TOciTags = {};
for (const [tagName, digestValue] of Object.entries(parsedTags)) {
const digest = typeof digestValue === 'string' ? this.normalizeOciDigest(digestValue) : null;
if (!digest) {
throw new Error(`Invalid OCI digest for ${repositoryArg}:${tagName}`);
}
tags[tagName] = digest;
}
return tags;
}
private async readOciManifest(
storageArg: any,
repositoryArg: string,
digestArg: string,
): Promise<IOciManifestDocument | null> {
const manifestBuffer = await storageArg.getOciManifest(repositoryArg, digestArg);
if (!manifestBuffer) return null;
try {
const manifest = JSON.parse(manifestBuffer.toString('utf8'));
return manifest && typeof manifest === 'object' ? manifest : null;
} catch (error) {
logger.log('warn', `failed to parse OCI manifest ${repositoryArg}@${digestArg}: ${(error as Error).message}`);
return null;
}
}
private getDescriptorDigest(descriptorArg: IOciDescriptor | undefined): string | null {
return typeof descriptorArg?.digest === 'string' ? this.normalizeOciDigest(descriptorArg.digest) : null;
}
private collectOciManifestReferences(manifestArg: IOciManifestDocument): {
blobDigests: string[];
manifestDigests: string[];
} {
const blobDigests = [
this.getDescriptorDigest(manifestArg.config),
...(manifestArg.layers || []).map((descriptorArg) => this.getDescriptorDigest(descriptorArg)),
].filter((digestArg): digestArg is string => Boolean(digestArg));
const manifestDigests = (manifestArg.manifests || [])
.map((descriptorArg) => this.getDescriptorDigest(descriptorArg))
.filter((digestArg): digestArg is string => Boolean(digestArg));
return { blobDigests, manifestDigests };
}
private async collectReferencedOciObjects(
storageArg: any,
repositoryArg: string,
rootDigestsArg: string[],
): Promise<{ manifestDigests: Set<string>; blobDigests: Set<string> }> {
const manifestDigests = new Set<string>();
const blobDigests = new Set<string>();
const pendingManifestDigests = rootDigestsArg
.map((digestArg) => this.normalizeOciDigest(digestArg))
.filter((digestArg): digestArg is string => Boolean(digestArg));
while (pendingManifestDigests.length > 0) {
const manifestDigest = pendingManifestDigests.shift()!;
if (manifestDigests.has(manifestDigest)) continue;
manifestDigests.add(manifestDigest);
const manifest = await this.readOciManifest(storageArg, repositoryArg, manifestDigest);
if (!manifest) continue;
const references = this.collectOciManifestReferences(manifest);
for (const blobDigest of references.blobDigests) {
blobDigests.add(blobDigest);
}
for (const childManifestDigest of references.manifestDigests) {
if (!manifestDigests.has(childManifestDigest)) {
pendingManifestDigests.push(childManifestDigest);
}
}
}
return { manifestDigests, blobDigests };
}
private async listRepositoryManifestDigests(storageArg: any, repositoryArg: string): Promise<string[]> {
const manifestPrefix = `oci/manifests/${repositoryArg}/`;
const paths = await storageArg.listObjects(manifestPrefix);
return paths
.filter((pathArg: string) => !pathArg.endsWith('.type'))
.map((pathArg: string) => pathArg.slice(manifestPrefix.length))
.filter((hashArg: string) => /^[a-f0-9]{64}$/.test(hashArg))
.map((hashArg: string) => `sha256:${hashArg}`);
}
private async collectAllTaggedOciObjectsExceptRepository(storageArg: any, repositoryArg: string) {
const protectedObjects = {
manifestDigests: new Set<string>(),
blobDigests: new Set<string>(),
};
const tagPaths = await storageArg.listObjects('oci/tags/');
for (const tagPath of tagPaths) {
const match = tagPath.match(/^oci\/tags\/(.+)\/tags\.json$/);
if (!match || match[1] === repositoryArg) continue;
const tags = await this.readOciTags(match[1], storageArg);
const references = await this.collectReferencedOciObjects(storageArg, match[1], Object.values(tags));
for (const digest of references.manifestDigests) {
protectedObjects.manifestDigests.add(digest);
}
for (const digest of references.blobDigests) {
protectedObjects.blobDigests.add(digest);
}
}
return protectedObjects;
}
private async deleteObjectIfExists(storageArg: any, pathArg: string): Promise<void> {
if (typeof storageArg.objectExists === 'function' && !(await storageArg.objectExists(pathArg))) {
return;
}
await storageArg.deleteObject(pathArg);
}
private async deleteOciRepository(repositoryArg: string): Promise<void> {
const storage = this.getRegistryStorage();
const tags = await this.readOciTags(repositoryArg, storage);
const repositoryManifestDigests = await this.listRepositoryManifestDigests(storage, repositoryArg);
const rootDigests = Array.from(new Set([...Object.values(tags), ...repositoryManifestDigests]));
if (rootDigests.length === 0) {
await this.deleteObjectIfExists(storage, this.getOciTagsPath(repositoryArg));
return;
}
const targetObjects = await this.collectReferencedOciObjects(storage, repositoryArg, rootDigests);
const protectedObjects = await this.collectAllTaggedOciObjectsExceptRepository(storage, repositoryArg);
for (const blobDigest of targetObjects.blobDigests) {
if (!protectedObjects.blobDigests.has(blobDigest)) {
await this.deleteObjectIfExists(storage, this.getOciBlobPath(blobDigest));
}
}
for (const manifestDigest of targetObjects.manifestDigests) {
if (!protectedObjects.manifestDigests.has(manifestDigest)) {
const manifestPath = this.getOciManifestPath(repositoryArg, manifestDigest);
await this.deleteObjectIfExists(storage, manifestPath);
await this.deleteObjectIfExists(storage, `${manifestPath}.type`);
}
}
await this.deleteObjectIfExists(storage, this.getOciTagsPath(repositoryArg));
logger.log('info', `deleted Cloudly registry repository ${repositoryArg}`);
}
private async handleRegistryStorageAfterPut(
contextArg: plugins.smartregistry.IStorageHookContext,
) {
try {
if (contextArg.protocol !== 'oci') {
return;
}
if (!contextArg.key.startsWith('oci/tags/') || !contextArg.key.endsWith('/tags.json')) {
return;
}
const repository = contextArg.key.slice('oci/tags/'.length, -'/tags.json'.length);
const tagsBuffer = await this.smartRegistry.getStorage().getObject(contextArg.key);
if (!tagsBuffer) {
return;
}
const tags = JSON.parse(tagsBuffer.toString('utf8')) as Record<string, string>;
for (const [tag, digest] of Object.entries(tags)) {
const tagKey = `${repository}:${tag}`;
if (this.recordedTagDigests.get(tagKey) === digest) {
continue;
}
this.recordedTagDigests.set(tagKey, digest);
await this.recordRegistryPushEvent(repository, tag, digest, contextArg.actor?.userId);
}
} catch (error) {
logger.log('error', `registry push event handling failed: ${(error as Error).message}`);
}
}
private async recordRegistryPushEvent(
repositoryArg: string,
tagArg: string,
digestArg: string,
actorUserIdArg?: string,
) {
const service = await this.getServiceByRegistryRepository(repositoryArg);
if (!service) {
logger.log('info', `registry push for unmapped repository ${repositoryArg}:${tagArg}`);
return;
}
const registryTarget = this.getServiceRegistryTarget(service, tagArg);
const pushEvent: plugins.servezoneInterfaces.data.IRegistryPushEvent = {
protocol: 'oci',
registryHost: registryTarget.registryHost,
repository: repositoryArg,
tag: tagArg,
digest: digestArg,
imageUrl: registryTarget.imageUrl,
pushedAt: Date.now(),
serviceId: service.id,
imageId: service.data.imageId,
actorUserId: actorUserIdArg,
};
service.data = {
...service.data,
...(service.data.deployOnPush === false ? {} : { imageVersion: tagArg }),
registryTarget,
};
await service.save();
await this.recordImagePushEvent(service, pushEvent);
if (service.data.deployOnPush !== false) {
await this.cloudlyRef.coreflowManager.pushClusterConfigToConnectedCoreflows();
}
logger.log('info', `recorded registry push ${repositoryArg}:${tagArg} -> ${digestArg}`);
}
private async recordImagePushEvent(
serviceArg: Service,
pushEventArg: plugins.servezoneInterfaces.data.IRegistryPushEvent,
) {
if (!serviceArg.data.imageId) {
return;
}
const image = await this.cloudlyRef.imageManager.CImage.getInstance({
id: serviceArg.data.imageId,
}).catch(() => null);
if (!image) {
return;
}
image.data.versions = image.data.versions || [];
const existingVersion = image.data.versions.find((versionArg) => {
return versionArg.versionString === pushEventArg.tag;
});
const versionData = {
versionString: pushEventArg.tag,
digest: pushEventArg.digest,
registryRepository: pushEventArg.repository,
registryTag: pushEventArg.tag,
source: 'registry' as const,
size: existingVersion?.size || 0,
createdAt: existingVersion?.createdAt || pushEventArg.pushedAt,
};
if (existingVersion) {
Object.assign(existingVersion, versionData);
} else {
image.data.versions.push(versionData);
}
image.data.lastPushEvent = pushEventArg;
await image.save();
}
private async getServiceByRegistryRepository(repositoryArg: string) {
const services = await this.cloudlyRef.serviceManager.CService.getInstances({});
return services.find((serviceArg) => {
return this.getServiceRepository(serviceArg) === repositoryArg;
});
}
private getServiceRepository(serviceArg: Service) {
const serviceName = this.slugify(serviceArg.data?.name || serviceArg.id);
const serviceId = this.slugify(serviceArg.id).slice(0, 12) || serviceArg.id;
return `workloads/${this.slugify(`${serviceName}-${serviceId}`)}`;
}
private slugify(valueArg: string) {
return valueArg
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/-+/g, '-')
.replace(/^-+|-+$/g, '')
|| 'service';
}
private async handleTokenRequest(
ctx: plugins.typedserver.IRequestContext,
requestUrl: URL,
): Promise<Response> {
const user = await this.authenticateRequest(ctx);
if (!user) {
return new Response('authentication required', {
status: 401,
headers: {
'WWW-Authenticate': 'Basic realm="Cloudly Registry"',
},
});
}
const requestedScopes = this.getRequestedOciScopes(requestUrl.searchParams);
const requestedWriteAccess = requestedScopes.some((scopeArg) => {
const action = scopeArg.split(':').at(-1);
return action === 'push' || action === 'delete';
});
if (requestedWriteAccess && !user.canWrite) {
return new Response('registry write access denied', { status: 403 });
}
const token = await this.smartRegistry.getAuthManager().createOciToken(
user.userId,
requestedScopes,
3600,
);
return new Response(
JSON.stringify({
token,
access_token: token,
expires_in: 3600,
issued_at: new Date().toISOString(),
}),
{
status: 200,
headers: {
'Content-Type': 'application/json',
},
},
);
}
private async authenticateRequest(
ctx: plugins.typedserver.IRequestContext,
): Promise<TAuthenticatedRegistryUser | null> {
const credentials = this.getBasicCredentials(ctx);
if (!credentials) {
return null;
}
const users = await this.cloudlyRef.authManager.CUser.getInstances({});
for (const user of users) {
if (user.data?.username !== credentials.username) {
continue;
}
const passwordMatches = user.data.password === credentials.password;
const matchingToken = user.data.tokens?.find((tokenArg) => {
return tokenArg.token === credentials.password && tokenArg.expiresAt > Date.now();
});
if (!passwordMatches && !matchingToken) {
continue;
}
const assignedRoles = matchingToken?.assignedRoles || [];
return {
userId: user.id,
username: user.data.username,
canWrite: user.data.role === 'admin' || assignedRoles.includes('admin'),
};
}
return null;
}
private getBasicCredentials(ctx: plugins.typedserver.IRequestContext) {
const authHeader = ctx.headers.get('authorization');
if (!authHeader?.startsWith('Basic ')) {
return null;
}
const decoded = Buffer.from(authHeader.slice('Basic '.length), 'base64').toString('utf8');
const separatorIndex = decoded.indexOf(':');
if (separatorIndex <= 0) {
return null;
}
return {
username: decoded.slice(0, separatorIndex),
password: decoded.slice(separatorIndex + 1),
};
}
private getRequestedOciScopes(searchParamsArg: URLSearchParams) {
const scopes: string[] = [];
for (const scope of searchParamsArg.getAll('scope')) {
const [scopeType, scopeName, actionsString] = scope.split(':');
if (scopeType !== 'repository' || !scopeName || !actionsString) {
continue;
}
for (const action of actionsString.split(',')) {
if (action) {
scopes.push(`oci:${scopeType}:${scopeName}:${action}`);
}
}
}
return scopes;
}
private getPublicRegistryUrl() {
return `${this.cloudlyRef.config.data.sslMode === 'none' ? 'http' : 'https'}://${this.getRegistryHost()}`;
}
private headersToRecord(headersArg: Headers) {
const headers: Record<string, string> = {};
headersArg.forEach((value, key) => {
headers[key.toLowerCase()] = value;
});
return headers;
}
private createRegistryResponse(
responseArg: plugins.smartregistry.IResponse,
): Response {
const headers = new Headers();
for (const [key, value] of Object.entries(responseArg.headers)) {
headers.set(key, value);
}
if (!responseArg.body) {
return new Response(null, {
status: responseArg.status,
headers,
});
}
if (responseArg.body instanceof ReadableStream) {
return new Response(responseArg.body, {
status: responseArg.status,
headers,
});
}
if (Buffer.isBuffer(responseArg.body) || typeof responseArg.body === 'string') {
return new Response(responseArg.body as BodyInit, {
status: responseArg.status,
headers,
});
}
if (!headers.has('Content-Type')) {
headers.set('Content-Type', 'application/json');
}
return new Response(JSON.stringify(responseArg.body), {
status: responseArg.status,
headers,
});
}
}