Files
cloudly/ts/manager.coreflow/coreflowmanager.ts
T

270 lines
11 KiB
TypeScript

import * as plugins from '../plugins.js';
import { Cloudly } from '../classes.cloudly.js';
import type { Cluster } from '../manager.cluster/classes.cluster.js';
import { logger } from '../logger.js';
type TCoreflowDeploymentRequest =
| plugins.servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_RestartDeployment
| plugins.servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_KillDeployment
| plugins.servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceReadFile
| plugins.servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceWriteFile
| plugins.servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceReadDir
| plugins.servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceMkdir
| plugins.servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceRm
| plugins.servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExists
| plugins.servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_DeploymentWorkspaceExec;
type TCoreflowDeploymentActionMethod =
| 'coreflowRestartDeployment'
| 'coreflowKillDeployment';
type TCoreflowDeploymentActionRequest = Extract<TCoreflowDeploymentRequest, {
method: TCoreflowDeploymentActionMethod;
}>;
export type TCoreflowDeploymentWorkspaceMethod = Exclude<
TCoreflowDeploymentRequest['method'],
TCoreflowDeploymentActionMethod
>;
/**
* in charge of talking to coreflow services on clusters
* coreflow runs on a server when ServerManager is done.
*/
export class CloudlyCoreflowManager {
public cloudlyRef: Cloudly;
public typedRouter = new plugins.typedrequest.TypedRouter();
constructor(cloudlyRefArg: Cloudly) {
this.cloudlyRef = cloudlyRefArg;
this.cloudlyRef.typedrouter.addTypedRouter(this.typedRouter);
this.typedRouter.addTypedHandler<plugins.servezoneInterfaces.requests.identity.IRequest_Any_Cloudly_CoreflowManager_GetIdentityByToken>(
new plugins.typedrequest.TypedHandler('getIdentityByToken', async (requestData) => {
// Use getInstance with $elemMatch for querying nested arrays
const user = await this.cloudlyRef.authManager.CUser.getInstance({
data: {
tokens: {
$elemMatch: { token: requestData.token },
},
},
});
if (!user) {
throw new plugins.typedrequest.TypedResponseError(
'The supplied token is not valid. No matching user found.'
);
}
if (user.data.type !== 'machine') {
throw new plugins.typedrequest.TypedResponseError(
'The supplied token is not valid. The user is not a machine.'
);
}
let cluster: Cluster | undefined;
if (user.data.role === 'cluster') {
cluster = await this.cloudlyRef.clusterManager.getClusterBy_UserId(user.id);
}
const expiryTimestamp = Date.now() + 3600 * 1000 * 24 * 365;
return {
identity: {
name: user.data.username || user.id,
role: user.data.role,
type: 'machine', // if someone authenticates by token, they are a machine, no matter what.
userId: user.id,
expiresAt: expiryTimestamp,
...(cluster
? {
clusterId: cluster.id,
clusterName: cluster.data.name,
}
: {}),
jwt: await this.cloudlyRef.authManager.smartjwtInstance.createJWT({
status: 'loggedIn',
userId: user.id,
expiresAt: expiryTimestamp,
}),
},
};
})
);
// lets enable the getting of cluster configs
this.typedRouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.config.IRequest_Any_Cloudly_GetClusterConfig>(
'getClusterConfig',
async (dataArg) => {
const identity = dataArg.identity;
console.log('trying to get clusterConfigSet');
console.log(dataArg);
const clusterConfig = await this.getClusterConfigPayloadForIdentity(identity);
console.log('got cluster config and sending it back to coreflow');
return clusterConfig;
}
)
);
// lets enable getting of certificates
this.typedRouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.certificate.IRequest_Any_Cloudly_GetCertificateForDomain>(
'getCertificateForDomain',
async (dataArg) => {
console.log(`incoming API request for certificate ${dataArg.domainName}`);
const cert = await this.cloudlyRef.letsencryptConnector.getCertificateForDomain(
dataArg.domainName
);
console.log(`got certificate ready for reponse ${dataArg.domainName}`);
return {
certificate: cert,
};
}
)
);
}
public async getClusterConfigPayloadForIdentity(
identityArg: plugins.servezoneInterfaces.data.IIdentity,
): Promise<plugins.servezoneInterfaces.requests.config.IRequest_Cloudly_Coreflow_PushClusterConfig['request']> {
const cluster = await this.cloudlyRef.clusterManager.getClusterBy_Identity(identityArg);
const services = await this.cloudlyRef.serviceManager.CService.getInstances({});
const platformDesiredState = await this.cloudlyRef.platformManager.getPlatformDesiredState();
const settings = await this.cloudlyRef.settingsManager.getSettings();
const targetPort = Number(settings.dcrouterTargetPort || '80');
const externalGateway = settings.dcrouterGatewayUrl && settings.dcrouterGatewayApiToken
? {
url: settings.dcrouterGatewayUrl,
apiToken: settings.dcrouterGatewayApiToken,
workHosterType: 'cloudly' as const,
workHosterId: settings.dcrouterWorkHosterId || cluster.id,
targetHost: settings.dcrouterTargetHost,
targetPort: Number.isInteger(targetPort) && targetPort > 0 ? targetPort : 80,
}
: undefined;
const payload: plugins.servezoneInterfaces.requests.config.IRequest_Cloudly_Coreflow_PushClusterConfig['request'] & {
externalGateway?: typeof externalGateway;
} = {
configData: await cluster.createSavableObject(),
services: await Promise.all(services.map((service) => service.createSavableObject())),
platformProviderConfigs: platformDesiredState.providerConfigs,
platformBindings: platformDesiredState.bindings,
};
if (externalGateway) {
payload.externalGateway = externalGateway;
}
return payload;
}
public async pushClusterConfigToConnectedCoreflows() {
const typedsocket = this.cloudlyRef.server.typedServer?.typedsocket;
if (!typedsocket) {
return 0;
}
const connections = await typedsocket.findAllTargetConnections(async (connectionArg) => {
const identityTag = await connectionArg.getTagById('identity');
const identity = identityTag?.payload as plugins.servezoneInterfaces.data.IIdentity | undefined;
return identity?.role === 'cluster' && !!identity.userId;
});
await Promise.all(
connections.map(async (connectionArg) => {
const identityTag = await connectionArg.getTagById('identity');
const identity = identityTag?.payload as plugins.servezoneInterfaces.data.IIdentity;
try {
const pushClusterConfig = typedsocket.createTypedRequest<plugins.servezoneInterfaces.requests.config.IRequest_Cloudly_Coreflow_PushClusterConfig>(
'pushClusterConfig',
connectionArg,
);
await pushClusterConfig.fire(await this.getClusterConfigPayloadForIdentity(identity));
} catch (error) {
logger.log('error', `failed to push cluster config to coreflow ${identity.userId}: ${(error as Error).message}`);
}
}),
);
return connections.length;
}
public async getRuntimeDeploymentsForService(
serviceArg: plugins.servezoneInterfaces.data.IService,
): Promise<plugins.servezoneInterfaces.data.IDeployment[]> {
const connections = await this.getConnectedCoreflowConnections();
const deployments: plugins.servezoneInterfaces.data.IDeployment[] = [];
for (const connection of connections) {
try {
const request = this.cloudlyRef.server.typedServer.typedsocket.createTypedRequest<plugins.servezoneInterfaces.requests.deployment.IReq_Cloudly_Coreflow_GetServiceDeployments>(
'coreflowGetServiceDeployments',
connection,
);
const response = await request.fire({ service: serviceArg });
deployments.push(...(response.deployments || []));
} catch (error) {
logger.log('warn', `failed to query coreflow deployments: ${(error as Error).message}`);
}
}
return deployments;
}
public async fireDeploymentRuntimeAction(
methodArg: TCoreflowDeploymentActionMethod,
deploymentIdArg: string,
): Promise<{ deployment: plugins.servezoneInterfaces.data.IDeployment }> {
const response = await this.fireCoreflowRequestUntilFound<TCoreflowDeploymentActionRequest>(methodArg, {
deploymentId: deploymentIdArg,
});
if (!response.deployment) {
throw new plugins.typedrequest.TypedResponseError('Coreflow did not return deployment data');
}
return { deployment: response.deployment };
}
public async fireDeploymentWorkspaceRequest(
methodArg: TCoreflowDeploymentWorkspaceMethod,
payloadArg: Extract<TCoreflowDeploymentRequest, { method: typeof methodArg }>['request'],
) {
return await this.fireCoreflowRequestUntilFound(methodArg, payloadArg);
}
private async fireCoreflowRequestUntilFound<TRequest extends TCoreflowDeploymentRequest>(
methodArg: TRequest['method'],
payloadArg: TRequest['request'],
): Promise<TRequest['response']> {
const connections = await this.getConnectedCoreflowConnections();
if (connections.length === 0) {
throw new plugins.typedrequest.TypedResponseError('No connected coreflow');
}
let lastError: Error | undefined;
for (const connection of connections) {
try {
const request = this.cloudlyRef.server.typedServer.typedsocket.createTypedRequest<TRequest>(
methodArg,
connection,
);
const response = await request.fire(payloadArg);
if (response?.found) {
return response;
}
} catch (error) {
lastError = error as Error;
}
}
throw new plugins.typedrequest.TypedResponseError(
lastError?.message || 'No connected coreflow found the requested deployment',
);
}
private async getConnectedCoreflowConnections() {
const typedsocket = this.cloudlyRef.server.typedServer?.typedsocket;
if (!typedsocket) {
return [];
}
return await typedsocket.findAllTargetConnections(async (connectionArg) => {
const identityTag = await connectionArg.getTagById('identity');
const identity = identityTag?.payload as plugins.servezoneInterfaces.data.IIdentity | undefined;
return identity?.role === 'cluster' && !!identity.userId;
});
}
}