Files
smartregistry/ts/upstream/classes.baseupstream.ts

522 lines
14 KiB
TypeScript

import * as plugins from '../plugins.js';
import type {
IUpstreamRegistryConfig,
IUpstreamAuthConfig,
IUpstreamCacheConfig,
IUpstreamResilienceConfig,
IUpstreamResult,
IUpstreamFetchContext,
IProtocolUpstreamConfig,
IUpstreamScopeRule,
TCircuitState,
} from './interfaces.upstream.js';
import {
DEFAULT_CACHE_CONFIG,
DEFAULT_RESILIENCE_CONFIG,
} from './interfaces.upstream.js';
import { CircuitBreaker, CircuitOpenError, withCircuitBreaker } from './classes.circuitbreaker.js';
import { UpstreamCache } from './classes.upstreamcache.js';
/**
* Base class for protocol-specific upstream implementations.
*
* Provides:
* - Multi-upstream routing with priority
* - Scope-based filtering (glob patterns)
* - Authentication handling
* - Circuit breaker per upstream
* - Caching with TTL
* - Retry with exponential backoff
* - 429 rate limit handling
*/
export abstract class BaseUpstream {
/** Protocol name for logging */
protected abstract readonly protocolName: string;
/** Upstream configuration */
protected readonly config: IProtocolUpstreamConfig;
/** Resolved cache configuration */
protected readonly cacheConfig: IUpstreamCacheConfig;
/** Resolved resilience configuration */
protected readonly resilienceConfig: IUpstreamResilienceConfig;
/** Circuit breakers per upstream */
protected readonly circuitBreakers: Map<string, CircuitBreaker> = new Map();
/** Upstream cache */
protected readonly cache: UpstreamCache;
/** Logger instance */
protected readonly logger: plugins.smartlog.Smartlog;
constructor(config: IProtocolUpstreamConfig, logger?: plugins.smartlog.Smartlog) {
this.config = config;
this.cacheConfig = { ...DEFAULT_CACHE_CONFIG, ...config.cache };
this.resilienceConfig = { ...DEFAULT_RESILIENCE_CONFIG, ...config.resilience };
this.cache = new UpstreamCache(this.cacheConfig);
this.logger = logger || new plugins.smartlog.Smartlog({
logContext: {
company: 'smartregistry',
companyunit: 'upstream',
environment: 'production',
runtime: 'node',
}
});
// Initialize circuit breakers for each upstream
for (const upstream of config.upstreams) {
const upstreamResilience = { ...this.resilienceConfig, ...upstream.resilience };
this.circuitBreakers.set(upstream.id, new CircuitBreaker(upstream.id, upstreamResilience));
}
}
/**
* Check if upstream is enabled.
*/
public isEnabled(): boolean {
return this.config.enabled;
}
/**
* Get all configured upstreams.
*/
public getUpstreams(): IUpstreamRegistryConfig[] {
return this.config.upstreams;
}
/**
* Get circuit breaker state for an upstream.
*/
public getCircuitState(upstreamId: string): TCircuitState | null {
const breaker = this.circuitBreakers.get(upstreamId);
return breaker ? breaker.getState() : null;
}
/**
* Get cache statistics.
*/
public getCacheStats() {
return this.cache.getStats();
}
/**
* Fetch a resource from upstreams.
* Tries upstreams in priority order, respecting circuit breakers and scope rules.
*/
public async fetch(context: IUpstreamFetchContext): Promise<IUpstreamResult | null> {
if (!this.config.enabled) {
return null;
}
// Check cache first
const cached = this.cache.get(context);
if (cached && !cached.stale) {
return {
success: true,
status: 200,
headers: cached.headers,
body: cached.data,
upstreamId: cached.upstreamId,
fromCache: true,
latencyMs: 0,
};
}
// Check for negative cache (recent 404)
if (this.cache.hasNegative(context)) {
return {
success: false,
status: 404,
headers: {},
upstreamId: 'cache',
fromCache: true,
latencyMs: 0,
};
}
// Get applicable upstreams sorted by priority
const applicableUpstreams = this.getApplicableUpstreams(context.resource);
if (applicableUpstreams.length === 0) {
return null;
}
// If we have stale cache, return it immediately and revalidate in background
if (cached?.stale && this.cacheConfig.staleWhileRevalidate) {
// Fire and forget revalidation
this.revalidateInBackground(context, applicableUpstreams);
return {
success: true,
status: 200,
headers: cached.headers,
body: cached.data,
upstreamId: cached.upstreamId,
fromCache: true,
latencyMs: 0,
};
}
// Try each upstream in order
let lastError: Error | null = null;
for (const upstream of applicableUpstreams) {
const breaker = this.circuitBreakers.get(upstream.id);
if (!breaker) continue;
try {
const result = await withCircuitBreaker(
breaker,
() => this.fetchFromUpstream(upstream, context),
);
// Cache successful responses
if (result.success && result.body) {
this.cache.set(
context,
Buffer.isBuffer(result.body) ? result.body : Buffer.from(JSON.stringify(result.body)),
result.headers['content-type'] || 'application/octet-stream',
result.headers,
upstream.id,
);
}
// Cache 404 responses
if (result.status === 404) {
this.cache.setNegative(context, upstream.id);
}
return result;
} catch (error) {
if (error instanceof CircuitOpenError) {
this.logger.log('debug', `Circuit open for upstream ${upstream.id}, trying next`);
} else {
this.logger.log('warn', `Upstream ${upstream.id} failed: ${(error as Error).message}`);
}
lastError = error as Error;
// Continue to next upstream
}
}
// All upstreams failed
if (lastError) {
this.logger.log('error', `All upstreams failed for ${context.resource}: ${lastError.message}`);
}
return null;
}
/**
* Invalidate cache for a resource pattern.
*/
public invalidateCache(pattern: RegExp): number {
return this.cache.invalidatePattern(pattern);
}
/**
* Clear all cache entries.
*/
public clearCache(): void {
this.cache.clear();
}
/**
* Stop the upstream (cleanup resources).
*/
public stop(): void {
this.cache.stop();
}
/**
* Get upstreams that apply to a resource, sorted by priority.
*/
protected getApplicableUpstreams(resource: string): IUpstreamRegistryConfig[] {
return this.config.upstreams
.filter(upstream => {
if (!upstream.enabled) return false;
// Check circuit breaker
const breaker = this.circuitBreakers.get(upstream.id);
if (breaker && !breaker.canRequest()) return false;
// Check scope rules
return this.matchesScopeRules(resource, upstream.scopeRules);
})
.sort((a, b) => a.priority - b.priority);
}
/**
* Check if a resource matches scope rules.
* Empty rules = match all.
*/
protected matchesScopeRules(resource: string, rules?: IUpstreamScopeRule[]): boolean {
if (!rules || rules.length === 0) {
return true;
}
// Process rules in order
// Start with default exclude (nothing matches)
let matched = false;
for (const rule of rules) {
const isMatch = plugins.minimatch(resource, rule.pattern);
if (isMatch) {
matched = rule.action === 'include';
}
}
return matched;
}
/**
* Fetch from a specific upstream with retry logic.
*/
protected async fetchFromUpstream(
upstream: IUpstreamRegistryConfig,
context: IUpstreamFetchContext,
): Promise<IUpstreamResult> {
const upstreamResilience = { ...this.resilienceConfig, ...upstream.resilience };
const startTime = Date.now();
let lastError: Error | null = null;
for (let attempt = 0; attempt <= upstreamResilience.maxRetries; attempt++) {
try {
const result = await this.executeRequest(upstream, context, upstreamResilience.timeoutMs);
return {
...result,
upstreamId: upstream.id,
fromCache: false,
latencyMs: Date.now() - startTime,
};
} catch (error) {
lastError = error as Error;
// Don't retry on 4xx errors (except 429)
if (this.isNonRetryableError(error)) {
break;
}
// Calculate delay with exponential backoff and jitter
if (attempt < upstreamResilience.maxRetries) {
const delay = this.calculateBackoffDelay(
attempt,
upstreamResilience.retryDelayMs,
upstreamResilience.retryMaxDelayMs,
);
await this.sleep(delay);
}
}
}
throw lastError || new Error('Request failed');
}
/**
* Execute a single HTTP request to an upstream.
*/
protected async executeRequest(
upstream: IUpstreamRegistryConfig,
context: IUpstreamFetchContext,
timeoutMs: number,
): Promise<Omit<IUpstreamResult, 'upstreamId' | 'fromCache' | 'latencyMs'>> {
// Build the full URL
const url = this.buildUpstreamUrl(upstream, context);
// Build headers with auth
const headers = this.buildHeaders(upstream, context);
// Make the request using SmartRequest
const request = plugins.smartrequest.SmartRequest.create()
.url(url)
.method(context.method as any)
.headers(headers)
.timeout(timeoutMs)
.handle429Backoff({ maxRetries: 3, fallbackDelay: 1000, maxWaitTime: 30000 });
// Add query params if present
if (Object.keys(context.query).length > 0) {
request.query(context.query);
}
let response: plugins.smartrequest.ICoreResponse;
switch (context.method.toUpperCase()) {
case 'GET':
response = await request.get();
break;
case 'HEAD':
// SmartRequest doesn't have head(), use options
response = await request.method('HEAD').get();
break;
default:
response = await request.get();
}
// Parse response
const responseHeaders: Record<string, string> = {};
for (const [key, value] of Object.entries(response.headers)) {
responseHeaders[key.toLowerCase()] = Array.isArray(value) ? value[0] : value;
}
let body: Buffer | any;
const contentType = responseHeaders['content-type'] || '';
if (response.ok) {
if (contentType.includes('application/json')) {
body = await response.json();
} else {
const arrayBuffer = await response.arrayBuffer();
body = Buffer.from(arrayBuffer);
}
}
return {
success: response.ok,
status: response.status,
headers: responseHeaders,
body,
};
}
/**
* Build the full URL for an upstream request.
* Subclasses can override for protocol-specific URL building.
*/
protected buildUpstreamUrl(upstream: IUpstreamRegistryConfig, context: IUpstreamFetchContext): string {
// Remove leading slash if URL already has trailing slash
let path = context.path;
if (upstream.url.endsWith('/') && path.startsWith('/')) {
path = path.slice(1);
}
return `${upstream.url}${path}`;
}
/**
* Build headers including authentication.
*/
protected buildHeaders(
upstream: IUpstreamRegistryConfig,
context: IUpstreamFetchContext,
): Record<string, string> {
const headers: Record<string, string> = { ...context.headers };
// Remove host header (will be set by HTTP client)
delete headers['host'];
// Add authentication
this.addAuthHeaders(headers, upstream.auth);
return headers;
}
/**
* Add authentication headers based on auth config.
*/
protected addAuthHeaders(headers: Record<string, string>, auth: IUpstreamAuthConfig): void {
switch (auth.type) {
case 'basic':
if (auth.username && auth.password) {
const credentials = Buffer.from(`${auth.username}:${auth.password}`).toString('base64');
headers['authorization'] = `Basic ${credentials}`;
}
break;
case 'bearer':
if (auth.token) {
headers['authorization'] = `Bearer ${auth.token}`;
}
break;
case 'api-key':
if (auth.token) {
const headerName = auth.headerName || 'authorization';
headers[headerName.toLowerCase()] = auth.token;
}
break;
case 'none':
default:
// No authentication
break;
}
}
/**
* Check if an error should not be retried.
*/
protected isNonRetryableError(error: unknown): boolean {
// Check for HTTP status errors
if (error && typeof error === 'object' && 'status' in error) {
const status = (error as { status: number }).status;
// Don't retry 4xx errors except 429 (rate limited)
if (status >= 400 && status < 500 && status !== 429) {
return true;
}
}
return false;
}
/**
* Calculate backoff delay with exponential backoff and jitter.
*/
protected calculateBackoffDelay(
attempt: number,
baseDelayMs: number,
maxDelayMs: number,
): number {
// Exponential backoff: delay = base * 2^attempt
const exponentialDelay = baseDelayMs * Math.pow(2, attempt);
// Cap at max delay
const cappedDelay = Math.min(exponentialDelay, maxDelayMs);
// Add jitter (±25%)
const jitter = cappedDelay * 0.25 * (Math.random() * 2 - 1);
return Math.floor(cappedDelay + jitter);
}
/**
* Sleep for a specified duration.
*/
protected sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Revalidate cache in background.
*/
protected async revalidateInBackground(
context: IUpstreamFetchContext,
upstreams: IUpstreamRegistryConfig[],
): Promise<void> {
try {
for (const upstream of upstreams) {
const breaker = this.circuitBreakers.get(upstream.id);
if (!breaker || !breaker.canRequest()) continue;
try {
const result = await withCircuitBreaker(
breaker,
() => this.fetchFromUpstream(upstream, context),
);
if (result.success && result.body) {
this.cache.set(
context,
Buffer.isBuffer(result.body) ? result.body : Buffer.from(JSON.stringify(result.body)),
result.headers['content-type'] || 'application/octet-stream',
result.headers,
upstream.id,
);
return; // Successfully revalidated
}
} catch {
// Continue to next upstream
}
}
} catch (error) {
this.logger.log('debug', `Background revalidation failed: ${(error as Error).message}`);
}
}
}