feat: provision corestore bindings
This commit is contained in:
@@ -2,6 +2,7 @@ import * as plugins from './coreflow.plugins.js';
|
||||
import { logger } from './coreflow.logging.js';
|
||||
import { Coreflow } from './coreflow.classes.coreflow.js';
|
||||
import type { IExternalGatewayConfig } from './coreflow.connector.externalgateway.js';
|
||||
import * as crypto from 'node:crypto';
|
||||
|
||||
export class ClusterManager {
|
||||
public coreflowRef: Coreflow;
|
||||
@@ -49,6 +50,7 @@ export class ClusterManager {
|
||||
private getWorkloadServiceDeploymentLabels(
|
||||
serviceArgFromCloudly: plugins.servezoneInterfaces.data.IService,
|
||||
containerImageFromCloudly: plugins.servezoneInterfaces.data.IImage,
|
||||
secretHashArg = '',
|
||||
) {
|
||||
const desiredImageVersion =
|
||||
serviceArgFromCloudly.data.imageVersion ||
|
||||
@@ -69,9 +71,27 @@ export class ClusterManager {
|
||||
'serve.zone.imageVersion': desiredImageVersion,
|
||||
'serve.zone.registryImageUrl': serviceArgFromCloudly.data.registryTarget?.imageUrl || '',
|
||||
'serve.zone.registryDigest': desiredRegistryDigest || '',
|
||||
'serve.zone.secretHash': secretHashArg,
|
||||
};
|
||||
}
|
||||
|
||||
private stableStringify(valueArg: unknown): string {
|
||||
if (Array.isArray(valueArg)) {
|
||||
return `[${valueArg.map((itemArg) => this.stableStringify(itemArg)).join(',')}]`;
|
||||
}
|
||||
if (valueArg && typeof valueArg === 'object') {
|
||||
return `{${Object.keys(valueArg as Record<string, unknown>)
|
||||
.sort()
|
||||
.map((keyArg) => `${JSON.stringify(keyArg)}:${this.stableStringify((valueArg as Record<string, unknown>)[keyArg])}`)
|
||||
.join(',')}}`;
|
||||
}
|
||||
return JSON.stringify(valueArg);
|
||||
}
|
||||
|
||||
private hashSecretObject(secretObjectArg: Record<string, string>) {
|
||||
return crypto.createHash('sha256').update(this.stableStringify(secretObjectArg)).digest('hex');
|
||||
}
|
||||
|
||||
private async pullRegistryTargetImage(
|
||||
registryTargetArg: plugins.servezoneInterfaces.data.IRegistryTarget,
|
||||
): Promise<plugins.docker.DockerImage> {
|
||||
@@ -103,6 +123,70 @@ export class ClusterManager {
|
||||
return localDockerImage;
|
||||
}
|
||||
|
||||
private async createCorestoreGlobalService(
|
||||
corestoreImageArg: plugins.docker.DockerImage,
|
||||
networksArg: Array<plugins.docker.DockerNetwork>,
|
||||
) {
|
||||
const corestoreImage = corestoreImageArg as unknown as {
|
||||
RepoTags?: string[];
|
||||
Labels?: Record<string, string>;
|
||||
};
|
||||
const imageRef = corestoreImage.RepoTags?.[0] || 'code.foss.global/serve.zone/corestore:latest';
|
||||
const corestoreEnv = [
|
||||
'CORESTORE_DATA_DIR=/data/corestore',
|
||||
'CORESTORE_PUBLIC_HOST=corestore',
|
||||
...(process.env.CORESTORE_API_TOKEN
|
||||
? [`CORESTORE_API_TOKEN=${process.env.CORESTORE_API_TOKEN}`]
|
||||
: []),
|
||||
];
|
||||
const response = await this.coreflowRef.dockerHost.request('POST', '/services/create', {
|
||||
Name: 'corestore',
|
||||
Labels: {
|
||||
version: corestoreImage.Labels?.version || '',
|
||||
'serve.zone.serviceCategory': 'base',
|
||||
'serve.zone.provides': 'database,objectstorage',
|
||||
},
|
||||
TaskTemplate: {
|
||||
ContainerSpec: {
|
||||
Image: imageRef,
|
||||
Labels: {
|
||||
'serve.zone.serviceCategory': 'base',
|
||||
'serve.zone.provides': 'database,objectstorage',
|
||||
},
|
||||
Env: corestoreEnv,
|
||||
Mounts: [
|
||||
{
|
||||
Target: '/data/corestore',
|
||||
Source: '/var/lib/serve.zone/corestore',
|
||||
Type: 'bind',
|
||||
ReadOnly: false,
|
||||
Consistency: 'default',
|
||||
},
|
||||
],
|
||||
},
|
||||
Networks: networksArg.map((networkArg) => ({
|
||||
Target: networkArg.Name,
|
||||
Aliases: ['corestore'],
|
||||
})),
|
||||
RestartPolicy: {
|
||||
Condition: 'any',
|
||||
},
|
||||
Resources: {
|
||||
Limits: {
|
||||
MemoryBytes: 700 * 1000000,
|
||||
},
|
||||
},
|
||||
},
|
||||
Mode: {
|
||||
Global: {},
|
||||
},
|
||||
});
|
||||
if (response.statusCode >= 300) {
|
||||
throw new Error(`Failed to create corestore service: ${JSON.stringify(response.body)}`);
|
||||
}
|
||||
return this.getDockerServiceByName('corestore');
|
||||
}
|
||||
|
||||
/**
|
||||
* starts the cluster manager
|
||||
*/
|
||||
@@ -181,6 +265,10 @@ export class ClusterManager {
|
||||
imageUrl: 'code.foss.global/serve.zone/corelog',
|
||||
});
|
||||
|
||||
const corestoreImage = await this.coreflowRef.dockerHost.createImageFromRegistry({
|
||||
imageUrl: 'code.foss.global/serve.zone/corestore',
|
||||
});
|
||||
|
||||
// SERVICES
|
||||
// lets deploy the base services
|
||||
// coretraffic
|
||||
@@ -249,6 +337,32 @@ export class ClusterManager {
|
||||
}
|
||||
logger.log('info', 'waiting for corelog to be up and running');
|
||||
await plugins.smartdelay.delayFor(10000);
|
||||
|
||||
// corestore
|
||||
let corestoreService: plugins.docker.DockerService | null;
|
||||
corestoreService = await this.getDockerServiceByName('corestore');
|
||||
|
||||
if (
|
||||
corestoreService &&
|
||||
(((corestoreService.Spec as any).Mode && !(corestoreService.Spec as any).Mode.Global) ||
|
||||
(await corestoreService.needsUpdate()))
|
||||
) {
|
||||
await corestoreService.remove();
|
||||
corestoreService = null;
|
||||
} else {
|
||||
logger.log('ok', `corestore service is up to date`);
|
||||
}
|
||||
|
||||
if (!corestoreService) {
|
||||
corestoreService = await this.createCorestoreGlobalService(corestoreImage, [
|
||||
sznCorechatNetwork,
|
||||
sznWebgatewayNetwork,
|
||||
]);
|
||||
} else {
|
||||
logger.log('ok', 'corestore service is already present');
|
||||
}
|
||||
logger.log('info', 'waiting for corestore to be up and running');
|
||||
await plugins.smartdelay.delayFor(10000);
|
||||
}
|
||||
|
||||
public async provisionWorkloadService(
|
||||
@@ -268,10 +382,6 @@ export class ClusterManager {
|
||||
await this.coreflowRef.cloudlyConnector.cloudlyApiClient.image.getImageById(
|
||||
serviceArgFromCloudly.data.imageId,
|
||||
);
|
||||
const deploymentLabels = this.getWorkloadServiceDeploymentLabels(
|
||||
serviceArgFromCloudly,
|
||||
containerImageFromCloudly,
|
||||
);
|
||||
let localDockerImage: plugins.docker.DockerImage;
|
||||
|
||||
// lets get the docker image for the service
|
||||
@@ -330,6 +440,24 @@ export class ClusterManager {
|
||||
dockerSecretName,
|
||||
);
|
||||
|
||||
const secretBundle =
|
||||
await this.coreflowRef.cloudlyConnector.cloudlyApiClient.secretbundle.getSecretBundleById(
|
||||
serviceArgFromCloudly.data.secretBundleId,
|
||||
);
|
||||
const platformEnvObject = await this.coreflowRef.platformManager.provisionBindingsForService(
|
||||
serviceArgFromCloudly,
|
||||
);
|
||||
const secretObject = {
|
||||
...platformEnvObject,
|
||||
...(await secretBundle.getFlatKeyValueObjectForEnvironment()),
|
||||
};
|
||||
const secretHash = this.hashSecretObject(secretObject);
|
||||
const deploymentLabels = this.getWorkloadServiceDeploymentLabels(
|
||||
serviceArgFromCloudly,
|
||||
containerImageFromCloudly,
|
||||
secretHash,
|
||||
);
|
||||
|
||||
// existing network to connect to
|
||||
const webGatewayNetwork = await this.coreflowRef.dockerHost.getNetworkByName(
|
||||
this.commonDockerData.networkNames.sznWebgateway,
|
||||
@@ -366,15 +494,10 @@ export class ClusterManager {
|
||||
await containerSecret.remove();
|
||||
}
|
||||
|
||||
const secretBundle =
|
||||
await this.coreflowRef.cloudlyConnector.cloudlyApiClient.secretbundle.getSecretBundleById(
|
||||
serviceArgFromCloudly.data.secretBundleId,
|
||||
);
|
||||
|
||||
// lets create the relevant stuff on the docker side
|
||||
containerSecret = await this.coreflowRef.dockerHost.createSecret({
|
||||
name: dockerSecretName,
|
||||
contentArg: JSON.stringify(await secretBundle.getFlatKeyValueObjectForEnvironment()),
|
||||
contentArg: JSON.stringify(secretObject),
|
||||
labels: {},
|
||||
version: serviceArgFromCloudly.data.imageVersion,
|
||||
});
|
||||
|
||||
@@ -9,6 +9,18 @@ type TPlatformDesiredState = {
|
||||
services?: plugins.servezoneInterfaces.data.IService[];
|
||||
};
|
||||
|
||||
type TCoreStoreProvisionResponse = {
|
||||
serviceId: string;
|
||||
serviceName?: string;
|
||||
resources: Array<{
|
||||
capability: 'database' | 'objectstorage';
|
||||
provider: 'smartdb' | 'smartstorage';
|
||||
resourceName: string;
|
||||
env: Record<string, string>;
|
||||
}>;
|
||||
env: Record<string, string>;
|
||||
};
|
||||
|
||||
export class PlatformManager {
|
||||
public coreflowRef: Coreflow;
|
||||
private configSubscription?: { unsubscribe: () => void };
|
||||
@@ -52,6 +64,28 @@ export class PlatformManager {
|
||||
logger.log('info', `Platform service reconciliation completed for ${desiredState.bindings.length} bindings`);
|
||||
}
|
||||
|
||||
public async provisionBindingsForService(
|
||||
serviceArg: plugins.servezoneInterfaces.data.IService,
|
||||
): Promise<Record<string, string>> {
|
||||
const desiredState = this.currentDesiredState || (await this.getDesiredState());
|
||||
this.currentDesiredState = desiredState;
|
||||
const bindings = desiredState.bindings.filter((bindingArg) => {
|
||||
return (
|
||||
bindingArg.desiredState !== 'disabled' &&
|
||||
this.bindingMatchesService(bindingArg, serviceArg) &&
|
||||
this.isCoreStoreCapability(bindingArg.capability)
|
||||
);
|
||||
});
|
||||
|
||||
const env: Record<string, string> = {};
|
||||
for (const binding of bindings) {
|
||||
const providerConfig = this.getProviderConfig(binding, desiredState.providerConfigs);
|
||||
const provisionedEnv = await this.provisionCoreStoreBinding(binding, serviceArg, providerConfig);
|
||||
Object.assign(env, provisionedEnv);
|
||||
}
|
||||
return env;
|
||||
}
|
||||
|
||||
private async getDesiredState(
|
||||
desiredStateArg: Partial<TPlatformDesiredState> = {},
|
||||
): Promise<TPlatformDesiredState> {
|
||||
@@ -107,6 +141,15 @@ export class PlatformManager {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.isCoreStoreCapability(bindingArg.capability)) {
|
||||
try {
|
||||
await this.provisionCoreStoreBinding(bindingArg, service, providerConfig);
|
||||
} catch (error) {
|
||||
await this.failBinding(bindingArg, `CoreStore provisioning failed: ${(error as Error).message}`);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (!providerConfig) {
|
||||
await this.failBinding(bindingArg, `No enabled provider config found for ${bindingArg.capability}`);
|
||||
return;
|
||||
@@ -138,6 +181,109 @@ export class PlatformManager {
|
||||
);
|
||||
}
|
||||
|
||||
private bindingMatchesService(
|
||||
bindingArg: plugins.servezoneInterfaces.platform.IPlatformBinding,
|
||||
serviceArg: plugins.servezoneInterfaces.data.IService,
|
||||
) {
|
||||
return bindingArg.serviceId === serviceArg.id || bindingArg.serviceId === serviceArg.data.name;
|
||||
}
|
||||
|
||||
private isCoreStoreCapability(
|
||||
capabilityArg: plugins.servezoneInterfaces.platform.TPlatformCapability,
|
||||
): capabilityArg is 'database' | 'objectstorage' {
|
||||
return capabilityArg === 'database' || capabilityArg === 'objectstorage';
|
||||
}
|
||||
|
||||
private getCoreStoreControlUrl(
|
||||
providerConfigArg?: plugins.servezoneInterfaces.platform.IPlatformProviderConfig,
|
||||
) {
|
||||
const configuredUrl = this.getStringConfigValue(providerConfigArg?.config || {}, 'controlUrl');
|
||||
return configuredUrl || process.env.CORESTORE_CONTROL_URL || 'http://corestore:3000';
|
||||
}
|
||||
|
||||
private getCoreStoreApiToken(
|
||||
providerConfigArg?: plugins.servezoneInterfaces.platform.IPlatformProviderConfig,
|
||||
) {
|
||||
return this.getStringConfigValue(providerConfigArg?.config || {}, 'apiToken') || process.env.CORESTORE_API_TOKEN;
|
||||
}
|
||||
|
||||
private async provisionCoreStoreBinding(
|
||||
bindingArg: plugins.servezoneInterfaces.platform.IPlatformBinding,
|
||||
serviceArg: plugins.servezoneInterfaces.data.IService,
|
||||
providerConfigArg?: plugins.servezoneInterfaces.platform.IPlatformProviderConfig,
|
||||
): Promise<Record<string, string>> {
|
||||
if (!this.isCoreStoreCapability(bindingArg.capability)) {
|
||||
throw new Error(`CoreStore cannot provision ${bindingArg.capability}`);
|
||||
}
|
||||
const capability = bindingArg.capability;
|
||||
const controlUrl = this.getCoreStoreControlUrl(providerConfigArg);
|
||||
const response = await this.postCoreStore<TCoreStoreProvisionResponse>(
|
||||
`${controlUrl.replace(/\/+$/, '')}/resources/provision`,
|
||||
{
|
||||
serviceId: serviceArg.id,
|
||||
serviceName: serviceArg.data.name,
|
||||
capabilities: [capability],
|
||||
},
|
||||
providerConfigArg,
|
||||
);
|
||||
const resource = response.resources.find((resourceArg) => resourceArg.capability === capability);
|
||||
if (!resource) {
|
||||
throw new Error(`CoreStore did not return a ${capability} resource`);
|
||||
}
|
||||
await this.updateBindingStatus(bindingArg, {
|
||||
status: 'ready',
|
||||
endpoints: [this.getCoreStoreEndpoint(capability, resource.env)],
|
||||
credentials: [{ env: resource.env }],
|
||||
errorText: '',
|
||||
});
|
||||
return resource.env;
|
||||
}
|
||||
|
||||
private getCoreStoreEndpoint(
|
||||
capabilityArg: 'database' | 'objectstorage',
|
||||
envArg: Record<string, string>,
|
||||
): plugins.servezoneInterfaces.platform.IPlatformServiceEndpoint {
|
||||
if (capabilityArg === 'database') {
|
||||
return {
|
||||
name: 'corestore-smartdb',
|
||||
capability: 'database',
|
||||
protocol: 'mongodb',
|
||||
internalUrl: envArg.MONGODB_URI,
|
||||
networkAlias: envArg.MONGODB_HOST || 'corestore',
|
||||
port: Number(envArg.MONGODB_PORT || '27017'),
|
||||
};
|
||||
}
|
||||
return {
|
||||
name: 'corestore-smartstorage',
|
||||
capability: 'objectstorage',
|
||||
protocol: 's3',
|
||||
internalUrl: envArg.AWS_ENDPOINT_URL || envArg.S3_ENDPOINT,
|
||||
networkAlias: envArg.S3_ENDPOINT_HOST || 'corestore',
|
||||
port: Number(envArg.S3_PORT || '9000'),
|
||||
};
|
||||
}
|
||||
|
||||
private async postCoreStore<T>(
|
||||
urlArg: string,
|
||||
bodyArg: unknown,
|
||||
providerConfigArg?: plugins.servezoneInterfaces.platform.IPlatformProviderConfig,
|
||||
): Promise<T> {
|
||||
const token = this.getCoreStoreApiToken(providerConfigArg);
|
||||
const response = await fetch(urlArg, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
...(token ? { authorization: `Bearer ${token}` } : {}),
|
||||
},
|
||||
body: JSON.stringify(bodyArg),
|
||||
});
|
||||
const responseText = await response.text();
|
||||
if (!response.ok) {
|
||||
throw new Error(`CoreStore request failed ${response.status}: ${responseText}`);
|
||||
}
|
||||
return responseText ? JSON.parse(responseText) as T : ({} as T);
|
||||
}
|
||||
|
||||
private getEndpointsForBinding(
|
||||
bindingArg: plugins.servezoneInterfaces.platform.IPlatformBinding,
|
||||
providerConfigArg: plugins.servezoneInterfaces.platform.IPlatformProviderConfig,
|
||||
|
||||
Reference in New Issue
Block a user