feat(stocks): Add provider fetch limits, intraday incremental fetch, cache deduplication, and provider safety/warning improvements
This commit is contained in:
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@fin.cx/opendata',
|
||||
version: '3.4.0',
|
||||
version: '3.5.0',
|
||||
description: 'A comprehensive TypeScript library for accessing business data and real-time financial information. Features include German company data management with MongoDB integration, JSONL bulk processing, automated Handelsregister interactions, and real-time stock market data from multiple providers.'
|
||||
}
|
||||
|
||||
@@ -156,11 +156,22 @@ export class StockPriceService implements IProviderRegistry {
|
||||
*/
|
||||
public async getData(request: IStockDataRequest): Promise<IStockPrice | IStockPrice[]> {
|
||||
const cacheKey = this.getDataCacheKey(request);
|
||||
const cached = this.getFromCache(cacheKey);
|
||||
|
||||
if (cached) {
|
||||
console.log(`Cache hit for ${this.getRequestDescription(request)}`);
|
||||
return cached;
|
||||
// For intraday requests without date filter, ALWAYS try incremental fetch
|
||||
// This ensures we check for new data even if cache hasn't expired
|
||||
if (request.type === 'intraday' && !request.date) {
|
||||
const incrementalResult = await this.tryIncrementalFetch(request, cacheKey);
|
||||
if (incrementalResult) {
|
||||
return incrementalResult;
|
||||
}
|
||||
// If incremental fetch returns null, continue to normal fetch below
|
||||
} else {
|
||||
// For other request types (historical, current, batch), use simple cache
|
||||
const cached = this.getFromCache(cacheKey);
|
||||
if (cached) {
|
||||
console.log(`Cache hit for ${this.getRequestDescription(request)}`);
|
||||
return cached;
|
||||
}
|
||||
}
|
||||
|
||||
const providers = this.getEnabledProviders();
|
||||
@@ -204,6 +215,137 @@ export class StockPriceService implements IProviderRegistry {
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try incremental fetch: Only fetch NEW data since last cached timestamp
|
||||
* Returns merged result if successful, null if incremental fetch not applicable
|
||||
*/
|
||||
private async tryIncrementalFetch(
|
||||
request: IStockDataRequest,
|
||||
cacheKey: string
|
||||
): Promise<IStockPrice[] | null> {
|
||||
// Only applicable for intraday requests without date filter
|
||||
if (request.type !== 'intraday' || request.date) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Check if we have similar cached data (same ticker, interval, but any limit/date)
|
||||
const baseKey = `intraday:${request.ticker}:${request.interval}:latest`;
|
||||
let cachedData: IStockPrice[] | null = null;
|
||||
let matchedKey: string | null = null;
|
||||
|
||||
// Find any cached intraday data for this ticker+interval
|
||||
for (const [key, entry] of this.cache.entries()) {
|
||||
if (key.startsWith(baseKey)) {
|
||||
const age = Date.now() - entry.timestamp.getTime();
|
||||
if (entry.ttl !== Infinity && age > entry.ttl) {
|
||||
continue; // Expired
|
||||
}
|
||||
cachedData = Array.isArray(entry.price) ? entry.price as IStockPrice[] : null;
|
||||
matchedKey = key;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!cachedData || cachedData.length === 0) {
|
||||
return null; // No cached data to build on
|
||||
}
|
||||
|
||||
// Find latest timestamp in cached data
|
||||
const latestCached = cachedData.reduce((latest, price) => {
|
||||
return price.timestamp > latest ? price.timestamp : latest;
|
||||
}, new Date(0));
|
||||
|
||||
// Freshness check: If latest data is less than 1 minute old, just return cache
|
||||
const dataAge = Date.now() - latestCached.getTime();
|
||||
const freshnessThreshold = 60 * 1000; // 1 minute
|
||||
|
||||
if (dataAge < freshnessThreshold) {
|
||||
console.log(`🔄 Incremental cache: Latest data is ${Math.round(dataAge / 1000)}s old (< 1min), returning cached data`);
|
||||
return cachedData;
|
||||
}
|
||||
|
||||
console.log(`🔄 Incremental cache: Found ${cachedData.length} cached records, latest: ${latestCached.toISOString()} (${Math.round(dataAge / 1000)}s old)`);
|
||||
|
||||
// Fetch only NEW data since latest cached timestamp
|
||||
// Create a modified request with date filter
|
||||
const modifiedRequest: IStockIntradayRequest = {
|
||||
...request,
|
||||
date: latestCached // Fetch from this date forward
|
||||
};
|
||||
|
||||
const providers = this.getEnabledProviders();
|
||||
for (const provider of providers) {
|
||||
const entry = this.providers.get(provider.name)!;
|
||||
|
||||
try {
|
||||
const newData = await this.fetchWithRetry(
|
||||
() => provider.fetchData(modifiedRequest),
|
||||
entry.config
|
||||
) as IStockPrice[];
|
||||
|
||||
entry.successCount++;
|
||||
|
||||
// Filter out data at or before latest cached timestamp (avoid duplicates)
|
||||
const filteredNew = newData.filter(p => p.timestamp > latestCached);
|
||||
|
||||
if (filteredNew.length === 0) {
|
||||
console.log(`🔄 Incremental cache: No new data since ${latestCached.toISOString()}, using cache`);
|
||||
return cachedData;
|
||||
}
|
||||
|
||||
console.log(`🔄 Incremental cache: Fetched ${filteredNew.length} new records since ${latestCached.toISOString()}`);
|
||||
|
||||
// Merge cached + new data
|
||||
const merged = [...cachedData, ...filteredNew];
|
||||
|
||||
// Sort by timestamp (ascending)
|
||||
merged.sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime());
|
||||
|
||||
// Deduplicate by timestamp (keep latest)
|
||||
const deduped = this.deduplicateByTimestamp(merged);
|
||||
|
||||
// Apply limit if specified in original request
|
||||
const effectiveLimit = request.limit || deduped.length;
|
||||
const result = deduped.slice(-effectiveLimit); // Take most recent N
|
||||
|
||||
// Update cache with merged result
|
||||
const ttl = this.getRequestTTL(request, result);
|
||||
this.addToCache(cacheKey, result, ttl);
|
||||
|
||||
console.log(`🔄 Incremental cache: Returning ${result.length} total records (${cachedData.length} cached + ${filteredNew.length} new)`);
|
||||
return result;
|
||||
|
||||
} catch (error) {
|
||||
entry.errorCount++;
|
||||
entry.lastError = error as Error;
|
||||
entry.lastErrorTime = new Date();
|
||||
console.warn(`Incremental fetch failed for ${provider.name}, falling back to full fetch`);
|
||||
continue; // Try next provider or fall back to normal fetch
|
||||
}
|
||||
}
|
||||
|
||||
return null; // Incremental fetch failed, fall back to normal fetch
|
||||
}
|
||||
|
||||
/**
|
||||
* Deduplicate array of prices by timestamp, keeping the latest data for each timestamp
|
||||
*/
|
||||
private deduplicateByTimestamp(prices: IStockPrice[]): IStockPrice[] {
|
||||
const seen = new Map<number, IStockPrice>();
|
||||
|
||||
for (const price of prices) {
|
||||
const ts = price.timestamp.getTime();
|
||||
const existing = seen.get(ts);
|
||||
|
||||
// Keep the entry with the latest fetchedAt (most recent data)
|
||||
if (!existing || price.fetchedAt > existing.fetchedAt) {
|
||||
seen.set(ts, price);
|
||||
}
|
||||
}
|
||||
|
||||
return Array.from(seen.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get TTL based on request type and result
|
||||
*/
|
||||
@@ -328,7 +470,8 @@ export class StockPriceService implements IProviderRegistry {
|
||||
return `historical:${request.ticker}:${fromStr}:${toStr}${request.exchange ? `:${request.exchange}` : ''}`;
|
||||
case 'intraday':
|
||||
const dateStr = request.date ? request.date.toISOString().split('T')[0] : 'latest';
|
||||
return `intraday:${request.ticker}:${request.interval}:${dateStr}${request.exchange ? `:${request.exchange}` : ''}`;
|
||||
const limitStr = request.limit ? `:limit${request.limit}` : '';
|
||||
return `intraday:${request.ticker}:${request.interval}:${dateStr}${limitStr}${request.exchange ? `:${request.exchange}` : ''}`;
|
||||
case 'batch':
|
||||
const tickers = request.tickers.sort().join(',');
|
||||
return `batch:${tickers}${request.exchange ? `:${request.exchange}` : ''}`;
|
||||
@@ -355,6 +498,15 @@ export class StockPriceService implements IProviderRegistry {
|
||||
}
|
||||
|
||||
private addToCache(key: string, price: IStockPrice | IStockPrice[], ttl?: number): void {
|
||||
// Deduplicate array entries by timestamp before caching
|
||||
if (Array.isArray(price)) {
|
||||
const beforeCount = price.length;
|
||||
price = this.deduplicateByTimestamp(price);
|
||||
if (price.length < beforeCount) {
|
||||
console.log(`Deduplicated ${beforeCount - price.length} duplicate timestamps in cache entry for ${key}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Enforce max entries limit
|
||||
if (this.cache.size >= this.cacheConfig.maxEntries) {
|
||||
// Remove oldest entry
|
||||
|
||||
@@ -24,6 +24,8 @@ export interface IProviderConfig {
|
||||
timeout?: number;
|
||||
retryAttempts?: number;
|
||||
retryDelay?: number;
|
||||
maxRecords?: number; // Maximum records to fetch per request (default: 10000)
|
||||
defaultIntradayLimit?: number; // Default limit for intraday requests without explicit limit (default: 1000)
|
||||
}
|
||||
|
||||
export interface IProviderRegistry {
|
||||
|
||||
@@ -378,8 +378,18 @@ export class CoinGeckoProvider implements IStockProvider {
|
||||
const marketCapData = responseData.market_caps || [];
|
||||
const volumeData = responseData.total_volumes || [];
|
||||
|
||||
// Process each data point
|
||||
for (let i = 0; i < priceData.length; i++) {
|
||||
// Warn if processing large amount of historical data
|
||||
const maxRecords = this.config?.maxRecords || 10000;
|
||||
if (priceData.length > maxRecords) {
|
||||
this.logger.warn(
|
||||
`Historical request for ${request.ticker} returned ${priceData.length} records, ` +
|
||||
`which exceeds maxRecords limit of ${maxRecords}. Processing first ${maxRecords} only.`
|
||||
);
|
||||
}
|
||||
|
||||
// Process each data point (up to maxRecords)
|
||||
const recordsToProcess = Math.min(priceData.length, maxRecords);
|
||||
for (let i = 0; i < recordsToProcess; i++) {
|
||||
const [timestamp, price] = priceData[i];
|
||||
const date = new Date(timestamp);
|
||||
|
||||
@@ -480,8 +490,19 @@ export class CoinGeckoProvider implements IStockProvider {
|
||||
const marketCapData = responseData.market_caps || [];
|
||||
const volumeData = responseData.total_volumes || [];
|
||||
|
||||
// Apply limit if specified
|
||||
const limit = request.limit || priceData.length;
|
||||
// Apply default limit if user didn't specify one (performance optimization)
|
||||
const effectiveLimit = request.limit || this.config?.defaultIntradayLimit || 1000;
|
||||
|
||||
// Warn if fetching large amount of data without explicit limit
|
||||
if (!request.limit && priceData.length > effectiveLimit) {
|
||||
this.logger.warn(
|
||||
`Intraday request for ${request.ticker} returned ${priceData.length} records but no limit specified. ` +
|
||||
`Applying default limit of ${effectiveLimit}. Consider adding a limit to the request for better performance.`
|
||||
);
|
||||
}
|
||||
|
||||
// Apply limit (take most recent data)
|
||||
const limit = Math.min(effectiveLimit, priceData.length);
|
||||
const dataToProcess = priceData.slice(-limit);
|
||||
|
||||
for (let i = 0; i < dataToProcess.length; i++) {
|
||||
@@ -624,21 +645,34 @@ export class CoinGeckoProvider implements IStockProvider {
|
||||
|
||||
const coinList = await response.json() as ICoinListItem[];
|
||||
|
||||
// Clear cache before rebuilding to prevent memory leak
|
||||
// Keep only entries that are in priorityTickerMap
|
||||
const priorityEntries = new Map<string, string>();
|
||||
for (const [key, value] of this.priorityTickerMap) {
|
||||
priorityEntries.set(key, value);
|
||||
}
|
||||
this.coinMapCache.clear();
|
||||
|
||||
// Restore priority mappings
|
||||
for (const [key, value] of priorityEntries) {
|
||||
this.coinMapCache.set(key, value);
|
||||
}
|
||||
|
||||
// Build mapping: symbol -> id
|
||||
for (const coin of coinList) {
|
||||
const symbol = coin.symbol.toLowerCase();
|
||||
const id = coin.id.toLowerCase();
|
||||
|
||||
// Don't overwrite priority mappings or existing cache entries
|
||||
if (!this.priorityTickerMap.has(symbol) && !this.coinMapCache.has(symbol)) {
|
||||
// Don't overwrite priority mappings
|
||||
if (!this.priorityTickerMap.has(symbol)) {
|
||||
this.coinMapCache.set(symbol, id);
|
||||
}
|
||||
// Always cache the ID mapping
|
||||
// Always cache the ID mapping (id -> id for when users pass CoinGecko IDs directly)
|
||||
this.coinMapCache.set(id, id);
|
||||
}
|
||||
|
||||
this.coinListLoadedAt = new Date();
|
||||
this.logger.info(`Loaded ${coinList.length} coins from CoinGecko`);
|
||||
this.logger.info(`Loaded ${coinList.length} coins from CoinGecko (cache: ${this.coinMapCache.size} entries)`);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to load coin list from CoinGecko:', error);
|
||||
// Don't throw - we can still work with direct IDs
|
||||
|
||||
@@ -187,7 +187,7 @@ export class MarketstackProvider implements IStockProvider {
|
||||
const allPrices: IStockPrice[] = [];
|
||||
let offset = request.offset || 0;
|
||||
const limit = request.limit || 1000; // Max per page
|
||||
const maxRecords = 10000; // Safety limit
|
||||
const maxRecords = this.config?.maxRecords || 10000; // Safety limit (configurable)
|
||||
|
||||
while (true) {
|
||||
let url = `${this.baseUrl}/eod?access_key=${this.apiKey}`;
|
||||
@@ -259,7 +259,18 @@ export class MarketstackProvider implements IStockProvider {
|
||||
const allPrices: IStockPrice[] = [];
|
||||
let offset = 0;
|
||||
const limit = 1000; // Max per page for intraday
|
||||
const maxRecords = 10000; // Safety limit
|
||||
const maxRecords = this.config?.maxRecords || 10000; // Safety limit (configurable)
|
||||
|
||||
// Apply default limit if user didn't specify one (performance optimization)
|
||||
const effectiveLimit = request.limit || this.config?.defaultIntradayLimit || 1000;
|
||||
|
||||
// Warn if fetching large amount of data without explicit limit
|
||||
if (!request.limit && effectiveLimit > 1000) {
|
||||
this.logger.warn(
|
||||
`Intraday request for ${request.ticker} without explicit limit will fetch up to ${effectiveLimit} records. ` +
|
||||
`Consider adding a limit to the request for better performance.`
|
||||
);
|
||||
}
|
||||
|
||||
// Format symbol for intraday endpoint (replace . with -)
|
||||
const formattedSymbol = this.formatSymbolForIntraday(request.ticker);
|
||||
@@ -310,17 +321,17 @@ export class MarketstackProvider implements IStockProvider {
|
||||
const pagination = responseData.pagination;
|
||||
const hasMore = pagination && offset + limit < pagination.total;
|
||||
|
||||
// Honor limit from request if specified, or safety limit
|
||||
if (!hasMore || (request.limit && allPrices.length >= request.limit) || allPrices.length >= maxRecords) {
|
||||
// Honor effective limit or safety maxRecords
|
||||
if (!hasMore || allPrices.length >= effectiveLimit || allPrices.length >= maxRecords) {
|
||||
break;
|
||||
}
|
||||
|
||||
offset += limit;
|
||||
}
|
||||
|
||||
// Apply limit if specified
|
||||
if (request.limit && allPrices.length > request.limit) {
|
||||
return allPrices.slice(0, request.limit);
|
||||
// Apply effective limit
|
||||
if (allPrices.length > effectiveLimit) {
|
||||
return allPrices.slice(0, effectiveLimit);
|
||||
}
|
||||
|
||||
return allPrices;
|
||||
|
||||
Reference in New Issue
Block a user