feat: push config updates to coreflow

This commit is contained in:
2026-04-28 16:02:05 +00:00
parent ee6d4c3d04
commit 1bed907f53
5 changed files with 113 additions and 14 deletions
+56
View File
@@ -190,6 +190,62 @@ tap.test('should expose generated service registry targets', async () => {
expect(refreshedService.data.registryTarget?.imageUrl).toEqual(registryTarget.imageUrl); expect(refreshedService.data.registryTarget?.imageUrl).toEqual(registryTarget.imageUrl);
}); });
tap.test('should push service config updates to connected coreflows', async (toolsArg) => {
const cluster = await testClient.cluster.createCluster('Registry Config Push Test Cluster');
const persistedCluster = await testCloudly.clusterManager.getConfigBy_ConfigID(cluster.id);
const clusterUser = await testCloudly.authManager.CUser.getInstance({
id: persistedCluster.data.userId,
});
const clusterToken = clusterUser.data.tokens?.[0]?.token;
expect(clusterToken).toBeTruthy();
const coreflowClient = new cloudlyApiClient.CloudlyApiClient({
registerAs: 'coreflow',
cloudlyUrl: `http://${helpers.testCloudlyConfig.publicUrl}:${helpers.testCloudlyConfig.publicPort}`,
});
const configUpdates: any[] = [];
let subscription: { unsubscribe: () => void } | undefined;
try {
await coreflowClient.start();
await coreflowClient.getIdentityByToken(clusterToken!, {
statefullIdentity: true,
tagConnection: true,
});
subscription = coreflowClient.configUpdateSubject.subscribe((updateArg) => {
configUpdates.push(updateArg);
});
const image = await testClient.image.createImage({
name: 'Registry Config Push Test Image',
description: 'Image used by the config push test',
});
const service = await testClient.services.createService({
name: 'Registry Config Push Test Service',
description: 'Service used by the config push test',
imageId: image.id,
imageVersion: 'latest',
environment: {},
secretBundleId: '',
serviceCategory: 'workload',
deploymentStrategy: 'custom',
scaleFactor: 1,
balancingStrategy: 'round-robin',
ports: {
web: 3000,
},
domains: [],
deploymentIds: [],
});
await toolsArg.delayFor(100);
expect(configUpdates[0]?.configData.id).toEqual(cluster.id);
expect(configUpdates[0]?.services.find((serviceArg: any) => serviceArg.id === service.id)).toBeTruthy();
} finally {
subscription?.unsubscribe();
await coreflowClient.stop();
}
});
tap.test('should expose platform desired state', async () => { tap.test('should expose platform desired state', async () => {
const capabilitiesResponse = await testClient.platform.getPlatformCapabilities(); const capabilitiesResponse = await testClient.platform.getPlatformCapabilities();
expect(capabilitiesResponse.capabilities.find((capability) => capability.id === 'database')).toBeTruthy(); expect(capabilitiesResponse.capabilities.find((capability) => capability.id === 'database')).toBeTruthy();
+2 -3
View File
@@ -139,6 +139,7 @@ export class ClusterManager {
const clusterUser = new this.cloudlyRef.authManager.CUser({ const clusterUser = new this.cloudlyRef.authManager.CUser({
id: await this.cloudlyRef.authManager.CUser.getNewId(), id: await this.cloudlyRef.authManager.CUser.getNewId(),
data: { data: {
username: `cluster-${configObjectArg.id}`,
role: 'cluster', role: 'cluster',
type: 'machine', type: 'machine',
tokens: [ tokens: [
@@ -151,9 +152,7 @@ export class ClusterManager {
}, },
}); });
await clusterUser.save(); await clusterUser.save();
Object.assign(configObjectArg, { configObjectArg.data.userId = clusterUser.id;
userId: clusterUser.id,
});
const clusterInstance = await Cluster.fromConfigObject(configObjectArg); const clusterInstance = await Cluster.fromConfigObject(configObjectArg);
await clusterInstance.save(); await clusterInstance.save();
return clusterInstance; return clusterInstance;
+50 -11
View File
@@ -1,6 +1,7 @@
import * as plugins from '../plugins.js'; import * as plugins from '../plugins.js';
import { Cloudly } from '../classes.cloudly.js'; import { Cloudly } from '../classes.cloudly.js';
import type { Cluster } from '../manager.cluster/classes.cluster.js'; import type { Cluster } from '../manager.cluster/classes.cluster.js';
import { logger } from '../logger.js';
/** /**
* in charge of talking to coreflow services on clusters * in charge of talking to coreflow services on clusters
@@ -35,14 +36,14 @@ export class CloudlyCoreflowManager {
'The supplied token is not valid. The user is not a machine.' 'The supplied token is not valid. The user is not a machine.'
); );
} }
let cluster: Cluster; let cluster: Cluster | undefined;
if (user.data.role === 'cluster') { if (user.data.role === 'cluster') {
cluster = await this.cloudlyRef.clusterManager.getClusterBy_UserId(user.id); cluster = await this.cloudlyRef.clusterManager.getClusterBy_UserId(user.id);
} }
const expiryTimestamp = Date.now() + 3600 * 1000 * 24 * 365; const expiryTimestamp = Date.now() + 3600 * 1000 * 24 * 365;
return { return {
identity: { identity: {
name: user.data.username, name: user.data.username || user.id,
role: user.data.role, role: user.data.role,
type: 'machine', // if someone authenticates by token, they are a machine, no matter what. type: 'machine', // if someone authenticates by token, they are a machine, no matter what.
userId: user.id, userId: user.id,
@@ -71,16 +72,9 @@ export class CloudlyCoreflowManager {
const identity = dataArg.identity; const identity = dataArg.identity;
console.log('trying to get clusterConfigSet'); console.log('trying to get clusterConfigSet');
console.log(dataArg); console.log(dataArg);
const cluster = await this.cloudlyRef.clusterManager.getClusterBy_Identity(identity); const clusterConfig = await this.getClusterConfigPayloadForIdentity(identity);
const services = await this.cloudlyRef.serviceManager.CService.getInstances({});
const platformDesiredState = await this.cloudlyRef.platformManager.getPlatformDesiredState();
console.log('got cluster config and sending it back to coreflow'); console.log('got cluster config and sending it back to coreflow');
return { return clusterConfig;
configData: await cluster.createSavableObject(),
services: await Promise.all(services.map((service) => service.createSavableObject())),
platformProviderConfigs: platformDesiredState.providerConfigs,
platformBindings: platformDesiredState.bindings,
};
} }
) )
); );
@@ -102,4 +96,49 @@ export class CloudlyCoreflowManager {
) )
); );
} }
public async getClusterConfigPayloadForIdentity(
identityArg: plugins.servezoneInterfaces.data.IIdentity,
): Promise<plugins.servezoneInterfaces.requests.config.IRequest_Cloudly_Coreflow_PushClusterConfig['request']> {
const cluster = await this.cloudlyRef.clusterManager.getClusterBy_Identity(identityArg);
const services = await this.cloudlyRef.serviceManager.CService.getInstances({});
const platformDesiredState = await this.cloudlyRef.platformManager.getPlatformDesiredState();
return {
configData: await cluster.createSavableObject(),
services: await Promise.all(services.map((service) => service.createSavableObject())),
platformProviderConfigs: platformDesiredState.providerConfigs,
platformBindings: platformDesiredState.bindings,
};
}
public async pushClusterConfigToConnectedCoreflows() {
const typedsocket = this.cloudlyRef.server.typedServer?.typedsocket;
if (!typedsocket) {
return 0;
}
const connections = await typedsocket.findAllTargetConnections(async (connectionArg) => {
const identityTag = await connectionArg.getTagById('identity');
const identity = identityTag?.payload as plugins.servezoneInterfaces.data.IIdentity | undefined;
return identity?.role === 'cluster' && !!identity.userId;
});
await Promise.all(
connections.map(async (connectionArg) => {
const identityTag = await connectionArg.getTagById('identity');
const identity = identityTag?.payload as plugins.servezoneInterfaces.data.IIdentity;
try {
const pushClusterConfig = typedsocket.createTypedRequest<plugins.servezoneInterfaces.requests.config.IRequest_Cloudly_Coreflow_PushClusterConfig>(
'pushClusterConfig',
connectionArg,
);
await pushClusterConfig.fire(await this.getClusterConfigPayloadForIdentity(identity));
} catch (error) {
logger.log('error', `failed to push cluster config to coreflow ${identity.userId}: ${(error as Error).message}`);
}
}),
);
return connections.length;
}
} }
@@ -193,6 +193,9 @@ export class CloudlyRegistryManager {
await service.save(); await service.save();
await this.recordImagePushEvent(service, pushEvent); await this.recordImagePushEvent(service, pushEvent);
if (service.data.deployOnPush !== false) {
await this.cloudlyRef.coreflowManager.pushClusterConfigToConnectedCoreflows();
}
logger.log('info', `recorded registry push ${repositoryArg}:${tagArg} -> ${digestArg}`); logger.log('info', `recorded registry push ${repositoryArg}:${tagArg} -> ${digestArg}`);
} }
@@ -100,6 +100,7 @@ export class ServiceManager {
service.data.imageVersion || 'latest', service.data.imageVersion || 'latest',
); );
await service.save(); await service.save();
await this.cloudlyRef.coreflowManager.pushClusterConfigToConnectedCoreflows();
return { return {
service: await service.createSavableObject(), service: await service.createSavableObject(),
}; };
@@ -123,6 +124,7 @@ export class ServiceManager {
service.data.imageVersion || 'latest', service.data.imageVersion || 'latest',
); );
await service.save(); await service.save();
await this.cloudlyRef.coreflowManager.pushClusterConfigToConnectedCoreflows();
return { return {
service: await service.createSavableObject(), service: await service.createSavableObject(),
}; };