import * as plugins from '../plugins.js'; import type { IUpstreamRegistryConfig, IUpstreamAuthConfig, IUpstreamCacheConfig, IUpstreamResilienceConfig, IUpstreamResult, IUpstreamFetchContext, IProtocolUpstreamConfig, IUpstreamScopeRule, TCircuitState, } from './interfaces.upstream.js'; import { DEFAULT_CACHE_CONFIG, DEFAULT_RESILIENCE_CONFIG, } from './interfaces.upstream.js'; import { CircuitBreaker, CircuitOpenError, withCircuitBreaker } from './classes.circuitbreaker.js'; import { UpstreamCache } from './classes.upstreamcache.js'; /** * Base class for protocol-specific upstream implementations. * * Provides: * - Multi-upstream routing with priority * - Scope-based filtering (glob patterns) * - Authentication handling * - Circuit breaker per upstream * - Caching with TTL * - Retry with exponential backoff * - 429 rate limit handling */ export abstract class BaseUpstream { /** Protocol name for logging */ protected abstract readonly protocolName: string; /** Upstream configuration */ protected readonly config: IProtocolUpstreamConfig; /** Resolved cache configuration */ protected readonly cacheConfig: IUpstreamCacheConfig; /** Resolved resilience configuration */ protected readonly resilienceConfig: IUpstreamResilienceConfig; /** Circuit breakers per upstream */ protected readonly circuitBreakers: Map = new Map(); /** Upstream cache */ protected readonly cache: UpstreamCache; /** Logger instance */ protected readonly logger: plugins.smartlog.Smartlog; constructor(config: IProtocolUpstreamConfig, logger?: plugins.smartlog.Smartlog) { this.config = config; this.cacheConfig = { ...DEFAULT_CACHE_CONFIG, ...config.cache }; this.resilienceConfig = { ...DEFAULT_RESILIENCE_CONFIG, ...config.resilience }; this.cache = new UpstreamCache(this.cacheConfig); this.logger = logger || new plugins.smartlog.Smartlog({ logContext: { company: 'smartregistry', companyunit: 'upstream', environment: 'production', runtime: 'node', } }); // Initialize circuit breakers for each upstream for (const upstream of config.upstreams) { const upstreamResilience = { ...this.resilienceConfig, ...upstream.resilience }; this.circuitBreakers.set(upstream.id, new CircuitBreaker(upstream.id, upstreamResilience)); } } /** * Check if upstream is enabled. */ public isEnabled(): boolean { return this.config.enabled; } /** * Get all configured upstreams. */ public getUpstreams(): IUpstreamRegistryConfig[] { return this.config.upstreams; } /** * Get circuit breaker state for an upstream. */ public getCircuitState(upstreamId: string): TCircuitState | null { const breaker = this.circuitBreakers.get(upstreamId); return breaker ? breaker.getState() : null; } /** * Get cache statistics. */ public getCacheStats() { return this.cache.getStats(); } /** * Fetch a resource from upstreams. * Tries upstreams in priority order, respecting circuit breakers and scope rules. */ public async fetch(context: IUpstreamFetchContext): Promise { if (!this.config.enabled) { return null; } // Get applicable upstreams sorted by priority const applicableUpstreams = this.getApplicableUpstreams(context.resource); if (applicableUpstreams.length === 0) { return null; } // Use the first applicable upstream's URL for cache key const primaryUpstreamUrl = applicableUpstreams[0]?.url; // Check cache first const cached = await this.cache.get(context, primaryUpstreamUrl); if (cached && !cached.stale) { return { success: true, status: 200, headers: cached.headers, body: cached.data, upstreamId: cached.upstreamId, fromCache: true, latencyMs: 0, }; } // Check for negative cache (recent 404) if (await this.cache.hasNegative(context, primaryUpstreamUrl)) { return { success: false, status: 404, headers: {}, upstreamId: 'cache', fromCache: true, latencyMs: 0, }; } // If we have stale cache, return it immediately and revalidate in background if (cached?.stale && this.cacheConfig.staleWhileRevalidate) { // Fire and forget revalidation this.revalidateInBackground(context, applicableUpstreams); return { success: true, status: 200, headers: cached.headers, body: cached.data, upstreamId: cached.upstreamId, fromCache: true, latencyMs: 0, }; } // Try each upstream in order let lastError: Error | null = null; for (const upstream of applicableUpstreams) { const breaker = this.circuitBreakers.get(upstream.id); if (!breaker) continue; try { const result = await withCircuitBreaker( breaker, () => this.fetchFromUpstream(upstream, context), ); // Cache successful responses if (result.success && result.body) { await this.cache.set( context, Buffer.isBuffer(result.body) ? result.body : Buffer.from(JSON.stringify(result.body)), result.headers['content-type'] || 'application/octet-stream', result.headers, upstream.id, upstream.url, ); } // Cache 404 responses if (result.status === 404) { await this.cache.setNegative(context, upstream.id, upstream.url); } return result; } catch (error) { if (error instanceof CircuitOpenError) { this.logger.log('debug', `Circuit open for upstream ${upstream.id}, trying next`); } else { this.logger.log('warn', `Upstream ${upstream.id} failed: ${(error as Error).message}`); } lastError = error as Error; // Continue to next upstream } } // All upstreams failed if (lastError) { this.logger.log('error', `All upstreams failed for ${context.resource}: ${lastError.message}`); } return null; } /** * Invalidate cache for a resource pattern. */ public async invalidateCache(pattern: RegExp): Promise { return this.cache.invalidatePattern(pattern); } /** * Clear all cache entries. */ public async clearCache(): Promise { await this.cache.clear(); } /** * Stop the upstream (cleanup resources). */ public stop(): void { this.cache.stop(); } /** * Get upstreams that apply to a resource, sorted by priority. */ protected getApplicableUpstreams(resource: string): IUpstreamRegistryConfig[] { return this.config.upstreams .filter(upstream => { if (!upstream.enabled) return false; // Check circuit breaker const breaker = this.circuitBreakers.get(upstream.id); if (breaker && !breaker.canRequest()) return false; // Check scope rules return this.matchesScopeRules(resource, upstream.scopeRules); }) .sort((a, b) => a.priority - b.priority); } /** * Check if a resource matches scope rules. * Empty rules = match all. */ protected matchesScopeRules(resource: string, rules?: IUpstreamScopeRule[]): boolean { if (!rules || rules.length === 0) { return true; } // Process rules in order // Start with default exclude (nothing matches) let matched = false; for (const rule of rules) { const isMatch = plugins.minimatch(resource, rule.pattern); if (isMatch) { matched = rule.action === 'include'; } } return matched; } /** * Fetch from a specific upstream with retry logic. */ protected async fetchFromUpstream( upstream: IUpstreamRegistryConfig, context: IUpstreamFetchContext, ): Promise { const upstreamResilience = { ...this.resilienceConfig, ...upstream.resilience }; const startTime = Date.now(); let lastError: Error | null = null; for (let attempt = 0; attempt <= upstreamResilience.maxRetries; attempt++) { try { const result = await this.executeRequest(upstream, context, upstreamResilience.timeoutMs); return { ...result, upstreamId: upstream.id, fromCache: false, latencyMs: Date.now() - startTime, }; } catch (error) { lastError = error as Error; // Don't retry on 4xx errors (except 429) if (this.isNonRetryableError(error)) { break; } // Calculate delay with exponential backoff and jitter if (attempt < upstreamResilience.maxRetries) { const delay = this.calculateBackoffDelay( attempt, upstreamResilience.retryDelayMs, upstreamResilience.retryMaxDelayMs, ); await this.sleep(delay); } } } throw lastError || new Error('Request failed'); } /** * Execute a single HTTP request to an upstream. */ protected async executeRequest( upstream: IUpstreamRegistryConfig, context: IUpstreamFetchContext, timeoutMs: number, ): Promise> { // Build the full URL const url = this.buildUpstreamUrl(upstream, context); // Build headers with auth const headers = this.buildHeaders(upstream, context); // Make the request using SmartRequest const request = plugins.smartrequest.SmartRequest.create() .url(url) .method(context.method as any) .headers(headers) .timeout(timeoutMs) .handle429Backoff({ maxRetries: 3, fallbackDelay: 1000, maxWaitTime: 30000 }); // Add query params if present if (Object.keys(context.query).length > 0) { request.query(context.query); } let response: plugins.smartrequest.ICoreResponse; switch (context.method.toUpperCase()) { case 'GET': response = await request.get(); break; case 'HEAD': // SmartRequest doesn't have head(), use options response = await request.method('HEAD').get(); break; default: response = await request.get(); } // Parse response const responseHeaders: Record = {}; for (const [key, value] of Object.entries(response.headers)) { responseHeaders[key.toLowerCase()] = Array.isArray(value) ? value[0] : value; } let body: Buffer | any; const contentType = responseHeaders['content-type'] || ''; if (response.ok) { if (contentType.includes('application/json')) { body = await response.json(); } else { const arrayBuffer = await response.arrayBuffer(); body = Buffer.from(arrayBuffer); } } return { success: response.ok, status: response.status, headers: responseHeaders, body, }; } /** * Build the full URL for an upstream request. * Subclasses can override for protocol-specific URL building. */ protected buildUpstreamUrl(upstream: IUpstreamRegistryConfig, context: IUpstreamFetchContext): string { // Remove leading slash if URL already has trailing slash let path = context.path; if (upstream.url.endsWith('/') && path.startsWith('/')) { path = path.slice(1); } return `${upstream.url}${path}`; } /** * Build headers including authentication. */ protected buildHeaders( upstream: IUpstreamRegistryConfig, context: IUpstreamFetchContext, ): Record { const headers: Record = { ...context.headers }; // Remove host header (will be set by HTTP client) delete headers['host']; // Add authentication this.addAuthHeaders(headers, upstream.auth); return headers; } /** * Add authentication headers based on auth config. */ protected addAuthHeaders(headers: Record, auth: IUpstreamAuthConfig): void { switch (auth.type) { case 'basic': if (auth.username && auth.password) { const credentials = Buffer.from(`${auth.username}:${auth.password}`).toString('base64'); headers['authorization'] = `Basic ${credentials}`; } break; case 'bearer': if (auth.token) { headers['authorization'] = `Bearer ${auth.token}`; } break; case 'api-key': if (auth.token) { const headerName = auth.headerName || 'authorization'; headers[headerName.toLowerCase()] = auth.token; } break; case 'none': default: // No authentication break; } } /** * Check if an error should not be retried. */ protected isNonRetryableError(error: unknown): boolean { // Check for HTTP status errors if (error && typeof error === 'object' && 'status' in error) { const status = (error as { status: number }).status; // Don't retry 4xx errors except 429 (rate limited) if (status >= 400 && status < 500 && status !== 429) { return true; } } return false; } /** * Calculate backoff delay with exponential backoff and jitter. */ protected calculateBackoffDelay( attempt: number, baseDelayMs: number, maxDelayMs: number, ): number { // Exponential backoff: delay = base * 2^attempt const exponentialDelay = baseDelayMs * Math.pow(2, attempt); // Cap at max delay const cappedDelay = Math.min(exponentialDelay, maxDelayMs); // Add jitter (±25%) const jitter = cappedDelay * 0.25 * (Math.random() * 2 - 1); return Math.floor(cappedDelay + jitter); } /** * Sleep for a specified duration. */ protected sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } /** * Revalidate cache in background. */ protected async revalidateInBackground( context: IUpstreamFetchContext, upstreams: IUpstreamRegistryConfig[], ): Promise { try { for (const upstream of upstreams) { const breaker = this.circuitBreakers.get(upstream.id); if (!breaker || !breaker.canRequest()) continue; try { const result = await withCircuitBreaker( breaker, () => this.fetchFromUpstream(upstream, context), ); if (result.success && result.body) { await this.cache.set( context, Buffer.isBuffer(result.body) ? result.body : Buffer.from(JSON.stringify(result.body)), result.headers['content-type'] || 'application/octet-stream', result.headers, upstream.id, upstream.url, ); return; // Successfully revalidated } } catch { // Continue to next upstream } } } catch (error) { this.logger.log('debug', `Background revalidation failed: ${(error as Error).message}`); } } }