import * as plugins from '../../plugins.ts'; import type { OpsServer } from '../classes.opsserver.ts'; import * as interfaces from '../../../ts_interfaces/index.ts'; import { requireValidIdentity } from '../helpers/guards.ts'; import type { BaseProvider } from '../../providers/classes.baseprovider.ts'; const TIME_RANGE_MS: Record = { '1h': 60 * 60 * 1000, '6h': 6 * 60 * 60 * 1000, '1d': 24 * 60 * 60 * 1000, '3d': 3 * 24 * 60 * 60 * 1000, '7d': 7 * 24 * 60 * 60 * 1000, '30d': 30 * 24 * 60 * 60 * 1000, }; const STATUS_PRIORITY: Record = { running: 0, pending: 1, waiting: 2, manual: 3, failed: 4, canceled: 5, success: 6, skipped: 7, }; export class PipelinesHandler { public typedrouter = new plugins.typedrequest.TypedRouter(); constructor(private opsServerRef: OpsServer) { this.opsServerRef.typedrouter.addTypedRouter(this.typedrouter); this.registerHandlers(); } private get actionLog() { return this.opsServerRef.gitopsAppRef.actionLog; } private registerHandlers(): void { // Get pipelines — supports view modes this.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler( 'getPipelines', async (dataArg) => { await requireValidIdentity(this.opsServerRef.adminHandler, dataArg); const provider = this.opsServerRef.gitopsAppRef.connectionManager.getProvider( dataArg.connectionId, ); const viewMode = dataArg.viewMode || 'project'; const timeRange = dataArg.timeRange || '1d'; const sortBy = dataArg.sortBy || 'created'; let pipelines: interfaces.data.IPipeline[]; if (viewMode === 'project') { if (!dataArg.projectId) return { pipelines: [] }; pipelines = await provider.getPipelines(dataArg.projectId, { page: dataArg.page, status: dataArg.status, }); pipelines = this.filterByTimeRange(pipelines, timeRange); } else if (viewMode === 'current') { pipelines = await this.fetchCurrentPipelines(provider, timeRange); } else if (viewMode === 'group') { if (!dataArg.groupId) return { pipelines: [] }; pipelines = await this.fetchGroupPipelines(provider, dataArg.groupId, timeRange); } else if (viewMode === 'error') { pipelines = await this.fetchErrorPipelines(provider, timeRange); } else { pipelines = []; } return { pipelines: this.sortPipelines(pipelines, sortBy).slice(0, 200) }; }, ), ); // Get pipeline jobs this.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler( 'getPipelineJobs', async (dataArg) => { await requireValidIdentity(this.opsServerRef.adminHandler, dataArg); const provider = this.opsServerRef.gitopsAppRef.connectionManager.getProvider( dataArg.connectionId, ); const jobs = await provider.getPipelineJobs(dataArg.projectId, dataArg.pipelineId); return { jobs }; }, ), ); // Retry pipeline this.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler( 'retryPipeline', async (dataArg) => { await requireValidIdentity(this.opsServerRef.adminHandler, dataArg); const provider = this.opsServerRef.gitopsAppRef.connectionManager.getProvider( dataArg.connectionId, ); await provider.retryPipeline(dataArg.projectId, dataArg.pipelineId); this.actionLog.append({ actionType: 'update', entityType: 'pipeline', entityId: dataArg.pipelineId, entityName: `Pipeline #${dataArg.pipelineId}`, details: `Retried pipeline #${dataArg.pipelineId} in project ${dataArg.projectId}`, username: dataArg.identity.username, }); return { ok: true }; }, ), ); // Cancel pipeline this.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler( 'cancelPipeline', async (dataArg) => { await requireValidIdentity(this.opsServerRef.adminHandler, dataArg); const provider = this.opsServerRef.gitopsAppRef.connectionManager.getProvider( dataArg.connectionId, ); await provider.cancelPipeline(dataArg.projectId, dataArg.pipelineId); this.actionLog.append({ actionType: 'delete', entityType: 'pipeline', entityId: dataArg.pipelineId, entityName: `Pipeline #${dataArg.pipelineId}`, details: `Cancelled pipeline #${dataArg.pipelineId} in project ${dataArg.projectId}`, username: dataArg.identity.username, }); return { ok: true }; }, ), ); } // --------------------------------------------------------------------------- // View mode helpers // --------------------------------------------------------------------------- /** * Current mode: running/pending always shown, plus recent pipelines within timeRange. * Makes two parallel aggregation passes to ensure active pipelines are never missed: * 1. Recent pipelines (no status filter) — for time-range display * 2. Active pipelines (status: 'running') — guarantees we catch all running ones */ private async fetchCurrentPipelines( provider: BaseProvider, timeRange: string, ): Promise { const projects = await provider.getProjects(); // Two parallel fetches: recent + explicitly active const [recentPipelines, activePipelines] = await Promise.all([ this.fetchAggregatedPipelines(provider, projects, { perPage: 50 }), this.fetchAggregatedPipelines(provider, projects, { status: 'running', perPage: 50 }), ]); // Merge and deduplicate (active first so they take precedence) const seenIds = new Set(); const merged: interfaces.data.IPipeline[] = []; for (const p of [...activePipelines, ...recentPipelines]) { const key = `${p.connectionId}:${p.projectId}:${p.id}`; if (!seenIds.has(key)) { seenIds.add(key); merged.push(p); } } // Running/pending pipelines are always shown regardless of time const active = merged.filter( (p) => p.status === 'running' || p.status === 'pending' || p.status === 'waiting', ); const rest = merged.filter( (p) => p.status !== 'running' && p.status !== 'pending' && p.status !== 'waiting', ); const filteredRest = this.filterByTimeRange(rest, timeRange); // Final dedup (active pipelines may also appear in filtered rest) const activeIds = new Set(active.map((p) => `${p.connectionId}:${p.projectId}:${p.id}`)); const uniqueRest = filteredRest.filter( (p) => !activeIds.has(`${p.connectionId}:${p.projectId}:${p.id}`), ); return [...active, ...uniqueRest]; } /** * Group mode: pipelines from all projects in a group */ private async fetchGroupPipelines( provider: BaseProvider, groupId: string, timeRange: string, ): Promise { const projects = await provider.getGroupProjects(groupId); const allPipelines = await this.fetchAggregatedPipelines(provider, projects); return this.filterByTimeRange(allPipelines, timeRange); } /** * Error mode: only failed pipelines */ private async fetchErrorPipelines( provider: BaseProvider, timeRange: string, ): Promise { const projects = await provider.getProjects(); const allPipelines = await this.fetchAggregatedPipelines(provider, projects, { status: 'failed' }); return this.filterByTimeRange(allPipelines, timeRange); } /** * Fetch pipelines from multiple projects in parallel (batched) */ private async fetchAggregatedPipelines( provider: BaseProvider, projects: interfaces.data.IProject[], opts?: { status?: string; perPage?: number }, ): Promise { const BATCH_SIZE = 10; const perPage = opts?.perPage || 50; const allPipelines: interfaces.data.IPipeline[] = []; for (let i = 0; i < projects.length; i += BATCH_SIZE) { const batch = projects.slice(i, i + BATCH_SIZE); const results = await Promise.allSettled( batch.map(async (project) => { const pipelines = await provider.getPipelines(project.id, { perPage, status: opts?.status, }); // Enrich with proper project name return pipelines.map((p) => ({ ...p, projectName: project.fullPath || project.name || p.projectId, })); }), ); for (let j = 0; j < results.length; j++) { const result = results[j]; if (result.status === 'fulfilled') { allPipelines.push(...result.value); } else { const projectId = batch[j]?.id || 'unknown'; console.warn(`[PipelinesHandler] Failed to fetch pipelines for project ${projectId}: ${result.reason}`); } } } return allPipelines; } // --------------------------------------------------------------------------- // Filtering and sorting // --------------------------------------------------------------------------- private filterByTimeRange( pipelines: interfaces.data.IPipeline[], timeRange: string, ): interfaces.data.IPipeline[] { const cutoffMs = TIME_RANGE_MS[timeRange] || TIME_RANGE_MS['1d']; const cutoff = Date.now() - cutoffMs; return pipelines.filter((p) => { if (!p.createdAt) return false; return new Date(p.createdAt).getTime() >= cutoff; }); } private sortPipelines( pipelines: interfaces.data.IPipeline[], sortBy: string, ): interfaces.data.IPipeline[] { const sorted = [...pipelines]; switch (sortBy) { case 'duration': sorted.sort((a, b) => (b.duration || 0) - (a.duration || 0)); break; case 'status': sorted.sort( (a, b) => (STATUS_PRIORITY[a.status] ?? 99) - (STATUS_PRIORITY[b.status] ?? 99), ); break; case 'created': default: sorted.sort( (a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), ); break; } return sorted; } }