feat(upstream): Add upstream proxy/cache subsystem and integrate per-protocol upstreams
This commit is contained in:
521
ts/upstream/classes.baseupstream.ts
Normal file
521
ts/upstream/classes.baseupstream.ts
Normal file
@@ -0,0 +1,521 @@
|
||||
import * as plugins from '../plugins.js';
|
||||
import type {
|
||||
IUpstreamRegistryConfig,
|
||||
IUpstreamAuthConfig,
|
||||
IUpstreamCacheConfig,
|
||||
IUpstreamResilienceConfig,
|
||||
IUpstreamResult,
|
||||
IUpstreamFetchContext,
|
||||
IProtocolUpstreamConfig,
|
||||
IUpstreamScopeRule,
|
||||
TCircuitState,
|
||||
} from './interfaces.upstream.js';
|
||||
import {
|
||||
DEFAULT_CACHE_CONFIG,
|
||||
DEFAULT_RESILIENCE_CONFIG,
|
||||
} from './interfaces.upstream.js';
|
||||
import { CircuitBreaker, CircuitOpenError, withCircuitBreaker } from './classes.circuitbreaker.js';
|
||||
import { UpstreamCache } from './classes.upstreamcache.js';
|
||||
|
||||
/**
|
||||
* Base class for protocol-specific upstream implementations.
|
||||
*
|
||||
* Provides:
|
||||
* - Multi-upstream routing with priority
|
||||
* - Scope-based filtering (glob patterns)
|
||||
* - Authentication handling
|
||||
* - Circuit breaker per upstream
|
||||
* - Caching with TTL
|
||||
* - Retry with exponential backoff
|
||||
* - 429 rate limit handling
|
||||
*/
|
||||
export abstract class BaseUpstream {
|
||||
/** Protocol name for logging */
|
||||
protected abstract readonly protocolName: string;
|
||||
|
||||
/** Upstream configuration */
|
||||
protected readonly config: IProtocolUpstreamConfig;
|
||||
|
||||
/** Resolved cache configuration */
|
||||
protected readonly cacheConfig: IUpstreamCacheConfig;
|
||||
|
||||
/** Resolved resilience configuration */
|
||||
protected readonly resilienceConfig: IUpstreamResilienceConfig;
|
||||
|
||||
/** Circuit breakers per upstream */
|
||||
protected readonly circuitBreakers: Map<string, CircuitBreaker> = new Map();
|
||||
|
||||
/** Upstream cache */
|
||||
protected readonly cache: UpstreamCache;
|
||||
|
||||
/** Logger instance */
|
||||
protected readonly logger: plugins.smartlog.Smartlog;
|
||||
|
||||
constructor(config: IProtocolUpstreamConfig, logger?: plugins.smartlog.Smartlog) {
|
||||
this.config = config;
|
||||
this.cacheConfig = { ...DEFAULT_CACHE_CONFIG, ...config.cache };
|
||||
this.resilienceConfig = { ...DEFAULT_RESILIENCE_CONFIG, ...config.resilience };
|
||||
this.cache = new UpstreamCache(this.cacheConfig);
|
||||
this.logger = logger || new plugins.smartlog.Smartlog({
|
||||
logContext: {
|
||||
company: 'smartregistry',
|
||||
companyunit: 'upstream',
|
||||
environment: 'production',
|
||||
runtime: 'node',
|
||||
}
|
||||
});
|
||||
|
||||
// Initialize circuit breakers for each upstream
|
||||
for (const upstream of config.upstreams) {
|
||||
const upstreamResilience = { ...this.resilienceConfig, ...upstream.resilience };
|
||||
this.circuitBreakers.set(upstream.id, new CircuitBreaker(upstream.id, upstreamResilience));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if upstream is enabled.
|
||||
*/
|
||||
public isEnabled(): boolean {
|
||||
return this.config.enabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all configured upstreams.
|
||||
*/
|
||||
public getUpstreams(): IUpstreamRegistryConfig[] {
|
||||
return this.config.upstreams;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get circuit breaker state for an upstream.
|
||||
*/
|
||||
public getCircuitState(upstreamId: string): TCircuitState | null {
|
||||
const breaker = this.circuitBreakers.get(upstreamId);
|
||||
return breaker ? breaker.getState() : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get cache statistics.
|
||||
*/
|
||||
public getCacheStats() {
|
||||
return this.cache.getStats();
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch a resource from upstreams.
|
||||
* Tries upstreams in priority order, respecting circuit breakers and scope rules.
|
||||
*/
|
||||
public async fetch(context: IUpstreamFetchContext): Promise<IUpstreamResult | null> {
|
||||
if (!this.config.enabled) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Check cache first
|
||||
const cached = this.cache.get(context);
|
||||
if (cached && !cached.stale) {
|
||||
return {
|
||||
success: true,
|
||||
status: 200,
|
||||
headers: cached.headers,
|
||||
body: cached.data,
|
||||
upstreamId: cached.upstreamId,
|
||||
fromCache: true,
|
||||
latencyMs: 0,
|
||||
};
|
||||
}
|
||||
|
||||
// Check for negative cache (recent 404)
|
||||
if (this.cache.hasNegative(context)) {
|
||||
return {
|
||||
success: false,
|
||||
status: 404,
|
||||
headers: {},
|
||||
upstreamId: 'cache',
|
||||
fromCache: true,
|
||||
latencyMs: 0,
|
||||
};
|
||||
}
|
||||
|
||||
// Get applicable upstreams sorted by priority
|
||||
const applicableUpstreams = this.getApplicableUpstreams(context.resource);
|
||||
|
||||
if (applicableUpstreams.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// If we have stale cache, return it immediately and revalidate in background
|
||||
if (cached?.stale && this.cacheConfig.staleWhileRevalidate) {
|
||||
// Fire and forget revalidation
|
||||
this.revalidateInBackground(context, applicableUpstreams);
|
||||
return {
|
||||
success: true,
|
||||
status: 200,
|
||||
headers: cached.headers,
|
||||
body: cached.data,
|
||||
upstreamId: cached.upstreamId,
|
||||
fromCache: true,
|
||||
latencyMs: 0,
|
||||
};
|
||||
}
|
||||
|
||||
// Try each upstream in order
|
||||
let lastError: Error | null = null;
|
||||
|
||||
for (const upstream of applicableUpstreams) {
|
||||
const breaker = this.circuitBreakers.get(upstream.id);
|
||||
if (!breaker) continue;
|
||||
|
||||
try {
|
||||
const result = await withCircuitBreaker(
|
||||
breaker,
|
||||
() => this.fetchFromUpstream(upstream, context),
|
||||
);
|
||||
|
||||
// Cache successful responses
|
||||
if (result.success && result.body) {
|
||||
this.cache.set(
|
||||
context,
|
||||
Buffer.isBuffer(result.body) ? result.body : Buffer.from(JSON.stringify(result.body)),
|
||||
result.headers['content-type'] || 'application/octet-stream',
|
||||
result.headers,
|
||||
upstream.id,
|
||||
);
|
||||
}
|
||||
|
||||
// Cache 404 responses
|
||||
if (result.status === 404) {
|
||||
this.cache.setNegative(context, upstream.id);
|
||||
}
|
||||
|
||||
return result;
|
||||
} catch (error) {
|
||||
if (error instanceof CircuitOpenError) {
|
||||
this.logger.log('debug', `Circuit open for upstream ${upstream.id}, trying next`);
|
||||
} else {
|
||||
this.logger.log('warn', `Upstream ${upstream.id} failed: ${(error as Error).message}`);
|
||||
}
|
||||
lastError = error as Error;
|
||||
// Continue to next upstream
|
||||
}
|
||||
}
|
||||
|
||||
// All upstreams failed
|
||||
if (lastError) {
|
||||
this.logger.log('error', `All upstreams failed for ${context.resource}: ${lastError.message}`);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invalidate cache for a resource pattern.
|
||||
*/
|
||||
public invalidateCache(pattern: RegExp): number {
|
||||
return this.cache.invalidatePattern(pattern);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all cache entries.
|
||||
*/
|
||||
public clearCache(): void {
|
||||
this.cache.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the upstream (cleanup resources).
|
||||
*/
|
||||
public stop(): void {
|
||||
this.cache.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get upstreams that apply to a resource, sorted by priority.
|
||||
*/
|
||||
protected getApplicableUpstreams(resource: string): IUpstreamRegistryConfig[] {
|
||||
return this.config.upstreams
|
||||
.filter(upstream => {
|
||||
if (!upstream.enabled) return false;
|
||||
|
||||
// Check circuit breaker
|
||||
const breaker = this.circuitBreakers.get(upstream.id);
|
||||
if (breaker && !breaker.canRequest()) return false;
|
||||
|
||||
// Check scope rules
|
||||
return this.matchesScopeRules(resource, upstream.scopeRules);
|
||||
})
|
||||
.sort((a, b) => a.priority - b.priority);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a resource matches scope rules.
|
||||
* Empty rules = match all.
|
||||
*/
|
||||
protected matchesScopeRules(resource: string, rules?: IUpstreamScopeRule[]): boolean {
|
||||
if (!rules || rules.length === 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Process rules in order
|
||||
// Start with default exclude (nothing matches)
|
||||
let matched = false;
|
||||
|
||||
for (const rule of rules) {
|
||||
const isMatch = plugins.minimatch(resource, rule.pattern);
|
||||
if (isMatch) {
|
||||
matched = rule.action === 'include';
|
||||
}
|
||||
}
|
||||
|
||||
return matched;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch from a specific upstream with retry logic.
|
||||
*/
|
||||
protected async fetchFromUpstream(
|
||||
upstream: IUpstreamRegistryConfig,
|
||||
context: IUpstreamFetchContext,
|
||||
): Promise<IUpstreamResult> {
|
||||
const upstreamResilience = { ...this.resilienceConfig, ...upstream.resilience };
|
||||
const startTime = Date.now();
|
||||
|
||||
let lastError: Error | null = null;
|
||||
|
||||
for (let attempt = 0; attempt <= upstreamResilience.maxRetries; attempt++) {
|
||||
try {
|
||||
const result = await this.executeRequest(upstream, context, upstreamResilience.timeoutMs);
|
||||
return {
|
||||
...result,
|
||||
upstreamId: upstream.id,
|
||||
fromCache: false,
|
||||
latencyMs: Date.now() - startTime,
|
||||
};
|
||||
} catch (error) {
|
||||
lastError = error as Error;
|
||||
|
||||
// Don't retry on 4xx errors (except 429)
|
||||
if (this.isNonRetryableError(error)) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Calculate delay with exponential backoff and jitter
|
||||
if (attempt < upstreamResilience.maxRetries) {
|
||||
const delay = this.calculateBackoffDelay(
|
||||
attempt,
|
||||
upstreamResilience.retryDelayMs,
|
||||
upstreamResilience.retryMaxDelayMs,
|
||||
);
|
||||
await this.sleep(delay);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError || new Error('Request failed');
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a single HTTP request to an upstream.
|
||||
*/
|
||||
protected async executeRequest(
|
||||
upstream: IUpstreamRegistryConfig,
|
||||
context: IUpstreamFetchContext,
|
||||
timeoutMs: number,
|
||||
): Promise<Omit<IUpstreamResult, 'upstreamId' | 'fromCache' | 'latencyMs'>> {
|
||||
// Build the full URL
|
||||
const url = this.buildUpstreamUrl(upstream, context);
|
||||
|
||||
// Build headers with auth
|
||||
const headers = this.buildHeaders(upstream, context);
|
||||
|
||||
// Make the request using SmartRequest
|
||||
const request = plugins.smartrequest.SmartRequest.create()
|
||||
.url(url)
|
||||
.method(context.method as any)
|
||||
.headers(headers)
|
||||
.timeout(timeoutMs)
|
||||
.handle429Backoff({ maxRetries: 3, fallbackDelay: 1000, maxWaitTime: 30000 });
|
||||
|
||||
// Add query params if present
|
||||
if (Object.keys(context.query).length > 0) {
|
||||
request.query(context.query);
|
||||
}
|
||||
|
||||
let response: plugins.smartrequest.ICoreResponse;
|
||||
|
||||
switch (context.method.toUpperCase()) {
|
||||
case 'GET':
|
||||
response = await request.get();
|
||||
break;
|
||||
case 'HEAD':
|
||||
// SmartRequest doesn't have head(), use options
|
||||
response = await request.method('HEAD').get();
|
||||
break;
|
||||
default:
|
||||
response = await request.get();
|
||||
}
|
||||
|
||||
// Parse response
|
||||
const responseHeaders: Record<string, string> = {};
|
||||
for (const [key, value] of Object.entries(response.headers)) {
|
||||
responseHeaders[key.toLowerCase()] = Array.isArray(value) ? value[0] : value;
|
||||
}
|
||||
|
||||
let body: Buffer | any;
|
||||
const contentType = responseHeaders['content-type'] || '';
|
||||
|
||||
if (response.ok) {
|
||||
if (contentType.includes('application/json')) {
|
||||
body = await response.json();
|
||||
} else {
|
||||
const arrayBuffer = await response.arrayBuffer();
|
||||
body = Buffer.from(arrayBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: response.ok,
|
||||
status: response.status,
|
||||
headers: responseHeaders,
|
||||
body,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the full URL for an upstream request.
|
||||
* Subclasses can override for protocol-specific URL building.
|
||||
*/
|
||||
protected buildUpstreamUrl(upstream: IUpstreamRegistryConfig, context: IUpstreamFetchContext): string {
|
||||
// Remove leading slash if URL already has trailing slash
|
||||
let path = context.path;
|
||||
if (upstream.url.endsWith('/') && path.startsWith('/')) {
|
||||
path = path.slice(1);
|
||||
}
|
||||
return `${upstream.url}${path}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build headers including authentication.
|
||||
*/
|
||||
protected buildHeaders(
|
||||
upstream: IUpstreamRegistryConfig,
|
||||
context: IUpstreamFetchContext,
|
||||
): Record<string, string> {
|
||||
const headers: Record<string, string> = { ...context.headers };
|
||||
|
||||
// Remove host header (will be set by HTTP client)
|
||||
delete headers['host'];
|
||||
|
||||
// Add authentication
|
||||
this.addAuthHeaders(headers, upstream.auth);
|
||||
|
||||
return headers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add authentication headers based on auth config.
|
||||
*/
|
||||
protected addAuthHeaders(headers: Record<string, string>, auth: IUpstreamAuthConfig): void {
|
||||
switch (auth.type) {
|
||||
case 'basic':
|
||||
if (auth.username && auth.password) {
|
||||
const credentials = Buffer.from(`${auth.username}:${auth.password}`).toString('base64');
|
||||
headers['authorization'] = `Basic ${credentials}`;
|
||||
}
|
||||
break;
|
||||
case 'bearer':
|
||||
if (auth.token) {
|
||||
headers['authorization'] = `Bearer ${auth.token}`;
|
||||
}
|
||||
break;
|
||||
case 'api-key':
|
||||
if (auth.token) {
|
||||
const headerName = auth.headerName || 'authorization';
|
||||
headers[headerName.toLowerCase()] = auth.token;
|
||||
}
|
||||
break;
|
||||
case 'none':
|
||||
default:
|
||||
// No authentication
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an error should not be retried.
|
||||
*/
|
||||
protected isNonRetryableError(error: unknown): boolean {
|
||||
// Check for HTTP status errors
|
||||
if (error && typeof error === 'object' && 'status' in error) {
|
||||
const status = (error as { status: number }).status;
|
||||
// Don't retry 4xx errors except 429 (rate limited)
|
||||
if (status >= 400 && status < 500 && status !== 429) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate backoff delay with exponential backoff and jitter.
|
||||
*/
|
||||
protected calculateBackoffDelay(
|
||||
attempt: number,
|
||||
baseDelayMs: number,
|
||||
maxDelayMs: number,
|
||||
): number {
|
||||
// Exponential backoff: delay = base * 2^attempt
|
||||
const exponentialDelay = baseDelayMs * Math.pow(2, attempt);
|
||||
|
||||
// Cap at max delay
|
||||
const cappedDelay = Math.min(exponentialDelay, maxDelayMs);
|
||||
|
||||
// Add jitter (±25%)
|
||||
const jitter = cappedDelay * 0.25 * (Math.random() * 2 - 1);
|
||||
|
||||
return Math.floor(cappedDelay + jitter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep for a specified duration.
|
||||
*/
|
||||
protected sleep(ms: number): Promise<void> {
|
||||
return new Promise(resolve => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
/**
|
||||
* Revalidate cache in background.
|
||||
*/
|
||||
protected async revalidateInBackground(
|
||||
context: IUpstreamFetchContext,
|
||||
upstreams: IUpstreamRegistryConfig[],
|
||||
): Promise<void> {
|
||||
try {
|
||||
for (const upstream of upstreams) {
|
||||
const breaker = this.circuitBreakers.get(upstream.id);
|
||||
if (!breaker || !breaker.canRequest()) continue;
|
||||
|
||||
try {
|
||||
const result = await withCircuitBreaker(
|
||||
breaker,
|
||||
() => this.fetchFromUpstream(upstream, context),
|
||||
);
|
||||
|
||||
if (result.success && result.body) {
|
||||
this.cache.set(
|
||||
context,
|
||||
Buffer.isBuffer(result.body) ? result.body : Buffer.from(JSON.stringify(result.body)),
|
||||
result.headers['content-type'] || 'application/octet-stream',
|
||||
result.headers,
|
||||
upstream.id,
|
||||
);
|
||||
return; // Successfully revalidated
|
||||
}
|
||||
} catch {
|
||||
// Continue to next upstream
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.log('debug', `Background revalidation failed: ${(error as Error).message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user