Files
elasticsearch/ts/domain/kv/kv-store.ts

1072 lines
27 KiB
TypeScript
Raw Normal View History

/**
* Enterprise Key-Value Store with TTL and Caching
*
* Features:
* - Distributed key-value storage backed by Elasticsearch
* - Time-to-live (TTL) support with automatic expiration
* - Optional in-memory caching layer with multiple eviction policies
* - Batch operations (mget, mset, mdelete)
* - Scan/cursor support for large keyspaces
* - Compression for large values
* - Optimistic concurrency control
*/
import { ElasticsearchConnectionManager } from '../../core/connection/connection-manager.js';
import { Logger, defaultLogger } from '../../core/observability/logger.js';
import { MetricsCollector, defaultMetricsCollector } from '../../core/observability/metrics.js';
import type {
KVStoreConfig,
KVSetOptions,
KVGetOptions,
KVDeleteOptions,
KVScanOptions,
KVScanResult,
KVOperationResult,
KVStoreStats,
CacheEntry,
KVDocument,
KVBatchGetResult,
KVBatchSetResult,
KVBatchDeleteResult,
} from './types.js';
import { promisify } from 'util';
import { gzip, gunzip } from 'zlib';
const gzipAsync = promisify(gzip);
const gunzipAsync = promisify(gunzip);
/**
* Default configuration
*/
const DEFAULT_CONFIG: Required<Omit<KVStoreConfig, 'index' | 'defaultRouting'>> = {
defaultTTL: undefined!,
enableCache: true,
cacheMaxSize: 10000,
cacheEvictionPolicy: 'lru',
cacheTTL: 300, // 5 minutes
enableExpirationCleanup: true,
cleanupIntervalSeconds: 300, // 5 minutes
cleanupBatchSize: 1000,
enableCompression: false,
compressionThreshold: 1024, // 1KB
refresh: false,
enableOptimisticConcurrency: false,
};
/**
* Enterprise Key-Value Store
*/
export class KVStore<T = unknown> {
private config: Required<KVStoreConfig>;
private cache: Map<string, CacheEntry<T>> = new Map();
private stats: KVStoreStats;
private cleanupTimer?: NodeJS.Timeout;
private logger: Logger;
private metrics: MetricsCollector;
constructor(config: KVStoreConfig) {
this.config = {
...DEFAULT_CONFIG,
...config,
defaultTTL: config.defaultTTL ?? DEFAULT_CONFIG.defaultTTL,
} as Required<KVStoreConfig>;
this.logger = defaultLogger;
this.metrics = defaultMetricsCollector;
this.stats = {
totalKeys: 0,
totalGets: 0,
totalSets: 0,
totalDeletes: 0,
totalScans: 0,
totalExpired: 0,
cacheStats: this.config.enableCache
? {
size: 0,
maxSize: this.config.cacheMaxSize,
hits: 0,
misses: 0,
hitRatio: 0,
evictions: 0,
memoryUsage: 0,
}
: undefined,
avgGetDurationMs: 0,
avgSetDurationMs: 0,
avgDeleteDurationMs: 0,
storageSize: 0,
};
}
/**
* Initialize the KV store
*/
async initialize(): Promise<void> {
const startTime = Date.now();
try {
// Ensure index exists with proper mappings
await this.ensureIndex();
// Start expiration cleanup if enabled
if (this.config.enableExpirationCleanup) {
this.startExpirationCleanup();
}
// Update stats
await this.updateKeyCount();
this.logger.info('KVStore initialized', {
index: this.config.index,
cacheEnabled: this.config.enableCache,
expirationCleanup: this.config.enableExpirationCleanup,
duration: Date.now() - startTime,
});
} catch (error) {
this.logger.error('Failed to initialize KVStore', { error });
throw error;
}
}
/**
* Destroy the KV store and clean up resources
*/
async destroy(): Promise<void> {
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer);
}
this.cache.clear();
this.logger.info('KVStore destroyed', {
index: this.config.index,
});
}
/**
* Set a key-value pair
*/
async set(key: string, value: T, options: KVSetOptions = {}): Promise<KVOperationResult<T>> {
const startTime = Date.now();
this.stats.totalSets++;
try {
const client = ElasticsearchConnectionManager.getInstance().getClient();
// Check if value needs compression
const valueStr = JSON.stringify(value);
const valueSize = Buffer.byteLength(valueStr, 'utf8');
const shouldCompress =
this.config.enableCompression && valueSize >= this.config.compressionThreshold;
// Prepare value
let storedValue: unknown = value;
if (shouldCompress) {
const compressed = await gzipAsync(Buffer.from(valueStr, 'utf8'));
storedValue = compressed.toString('base64');
}
// Calculate expiration
const ttl = options.ttl ?? this.config.defaultTTL;
const expiresAt = ttl ? new Date(Date.now() + ttl * 1000) : null;
// Prepare document
const now = new Date();
const doc: KVDocument<unknown> = {
key,
value: storedValue,
createdAt: now,
updatedAt: now,
expiresAt,
metadata: {
size: valueSize,
compressed: shouldCompress,
contentType: 'application/json',
},
};
// Build index request
const indexRequest: any = {
index: this.config.index,
id: key,
document: doc,
refresh: this.config.refresh,
};
if (options.routing ?? this.config.defaultRouting) {
indexRequest.routing = options.routing ?? this.config.defaultRouting;
}
if (options.pipeline) {
indexRequest.pipeline = options.pipeline;
}
if (options.ifSeqNo !== undefined && options.ifPrimaryTerm !== undefined) {
indexRequest.if_seq_no = options.ifSeqNo;
indexRequest.if_primary_term = options.ifPrimaryTerm;
}
// Handle nx/xx options
if (options.nx) {
indexRequest.op_type = 'create';
}
// Execute set operation
const result = await client.index(indexRequest);
// Update cache
if (this.config.enableCache && !options.skipCache) {
this.cacheSet(key, value, {
seqNo: result._seq_no ?? 0,
primaryTerm: result._primary_term ?? 0,
}, ttl);
}
// Update stats
const duration = Date.now() - startTime;
this.stats.avgSetDurationMs =
(this.stats.avgSetDurationMs * (this.stats.totalSets - 1) + duration) /
this.stats.totalSets;
this.metrics.recordCounter('kv.set', 1, {
index: this.config.index,
cached: options.skipCache ? 'no' : 'yes',
});
this.metrics.recordHistogram('kv.set.duration', duration);
return {
success: true,
exists: result.result === 'updated',
version: {
seqNo: result._seq_no ?? 0,
primaryTerm: result._primary_term ?? 0,
},
expiresAt: expiresAt ?? undefined,
};
} catch (error: any) {
this.logger.error('KV set failed', { key, error });
return {
success: false,
exists: false,
error: {
type: error.name,
reason: error.message,
},
};
}
}
/**
* Get a value by key
*/
async get(key: string, options: KVGetOptions = {}): Promise<KVOperationResult<T>> {
const startTime = Date.now();
this.stats.totalGets++;
try {
// Check cache first
if (this.config.enableCache && !options.skipCache) {
const cached = this.cacheGet(key);
if (cached) {
this.stats.avgGetDurationMs =
(this.stats.avgGetDurationMs * (this.stats.totalGets - 1) + (Date.now() - startTime)) /
this.stats.totalGets;
this.metrics.recordCounter('kv.get', 1, {
index: this.config.index,
cache_hit: 'true',
});
return {
success: true,
value: cached.value,
exists: true,
cacheHit: true,
version: cached.version,
expiresAt: cached.expiresAt,
};
}
}
// Fetch from Elasticsearch
const client = ElasticsearchConnectionManager.getInstance().getClient();
const getRequest: any = {
index: this.config.index,
id: key,
};
if (options.routing ?? this.config.defaultRouting) {
getRequest.routing = options.routing ?? this.config.defaultRouting;
}
const result = await client.get(getRequest);
if (!result.found) {
this.metrics.recordCounter('kv.get.not_found', 1, {
index: this.config.index,
});
return {
success: true,
value: options.default as T,
exists: false,
cacheHit: false,
};
}
const doc = result._source as KVDocument<unknown>;
// Check if expired
if (doc.expiresAt && new Date(doc.expiresAt) < new Date()) {
// Key is expired, delete it
await this.delete(key, { invalidateCache: true });
return {
success: true,
value: options.default as T,
exists: false,
cacheHit: false,
};
}
// Decompress if needed
let value: T;
if (doc.metadata?.compressed) {
const decompressed = await gunzipAsync(Buffer.from(doc.value as string, 'base64'));
value = JSON.parse(decompressed.toString('utf8'));
} else {
value = doc.value as T;
}
// Update cache
if (this.config.enableCache && !options.skipCache) {
const ttl = doc.expiresAt
? Math.floor((new Date(doc.expiresAt).getTime() - Date.now()) / 1000)
: undefined;
this.cacheSet(key, value, {
seqNo: result._seq_no ?? 0,
primaryTerm: result._primary_term ?? 0,
}, ttl);
}
// Update stats
const duration = Date.now() - startTime;
this.stats.avgGetDurationMs =
(this.stats.avgGetDurationMs * (this.stats.totalGets - 1) + duration) /
this.stats.totalGets;
this.metrics.recordCounter('kv.get', 1, {
index: this.config.index,
cache_hit: 'false',
});
this.metrics.recordHistogram('kv.get.duration', duration);
return {
success: true,
value,
exists: true,
cacheHit: false,
version: {
seqNo: result._seq_no ?? 0,
primaryTerm: result._primary_term ?? 0,
},
expiresAt: doc.expiresAt ? new Date(doc.expiresAt) : undefined,
};
} catch (error: any) {
if (error.name === 'ResponseError' && error.meta?.statusCode === 404) {
return {
success: true,
value: options.default as T,
exists: false,
cacheHit: false,
};
}
this.logger.error('KV get failed', { key, error });
return {
success: false,
exists: false,
error: {
type: error.name,
reason: error.message,
},
};
}
}
/**
* Delete a key
*/
async delete(key: string, options: KVDeleteOptions = {}): Promise<KVOperationResult> {
const startTime = Date.now();
this.stats.totalDeletes++;
try {
const client = ElasticsearchConnectionManager.getInstance().getClient();
const deleteRequest: any = {
index: this.config.index,
id: key,
refresh: this.config.refresh,
};
if (options.routing ?? this.config.defaultRouting) {
deleteRequest.routing = options.routing ?? this.config.defaultRouting;
}
if (options.ifSeqNo !== undefined && options.ifPrimaryTerm !== undefined) {
deleteRequest.if_seq_no = options.ifSeqNo;
deleteRequest.if_primary_term = options.ifPrimaryTerm;
}
const result = await client.delete(deleteRequest);
// Remove from cache
if (this.config.enableCache && options.invalidateCache !== false) {
this.cacheDelete(key);
}
// Update stats
const duration = Date.now() - startTime;
this.stats.avgDeleteDurationMs =
(this.stats.avgDeleteDurationMs * (this.stats.totalDeletes - 1) + duration) /
this.stats.totalDeletes;
this.metrics.recordCounter('kv.delete', 1, {
index: this.config.index,
});
this.metrics.recordHistogram('kv.delete.duration', duration);
return {
success: true,
exists: result.result === 'deleted',
};
} catch (error: any) {
if (error.name === 'ResponseError' && error.meta?.statusCode === 404) {
return {
success: true,
exists: false,
};
}
this.logger.error('KV delete failed', { key, error });
return {
success: false,
exists: false,
error: {
type: error.name,
reason: error.message,
},
};
}
}
/**
* Check if a key exists
*/
async exists(key: string): Promise<boolean> {
// Check cache first
if (this.config.enableCache && this.cache.has(key)) {
const entry = this.cache.get(key)!;
if (!entry.expiresAt || entry.expiresAt > new Date()) {
return true;
}
}
try {
const client = ElasticsearchConnectionManager.getInstance().getClient();
const result = await client.exists({
index: this.config.index,
id: key,
});
return result;
} catch (error) {
this.logger.error('KV exists check failed', { key, error });
return false;
}
}
/**
* Batch get multiple keys
*/
async mget(keys: string[], options: KVGetOptions = {}): Promise<KVBatchGetResult<T>> {
const results = new Map<string, KVOperationResult<T>>();
let found = 0;
let notFound = 0;
let cacheHits = 0;
// Separate cached and uncached keys
const uncachedKeys: string[] = [];
const cachedResults = new Map<string, T>();
if (this.config.enableCache && !options.skipCache) {
for (const key of keys) {
const cached = this.cacheGet(key);
if (cached) {
cachedResults.set(key, cached.value);
cacheHits++;
} else {
uncachedKeys.push(key);
}
}
} else {
uncachedKeys.push(...keys);
}
// Fetch uncached keys from Elasticsearch
if (uncachedKeys.length > 0) {
try {
const client = ElasticsearchConnectionManager.getInstance().getClient();
const mgetResult = await client.mget({
index: this.config.index,
ids: uncachedKeys,
});
for (const item of mgetResult.docs) {
if ('found' in item && item.found) {
const doc = item._source as KVDocument<unknown>;
// Check expiration
if (doc.expiresAt && new Date(doc.expiresAt) < new Date()) {
results.set(item._id, {
success: true,
exists: false,
cacheHit: false,
});
notFound++;
continue;
}
// Decompress if needed
let value: T;
if (doc.metadata?.compressed) {
const decompressed = await gunzipAsync(
Buffer.from(doc.value as string, 'base64')
);
value = JSON.parse(decompressed.toString('utf8'));
} else {
value = doc.value as T;
}
results.set(item._id, {
success: true,
value,
exists: true,
cacheHit: false,
version: {
seqNo: item._seq_no!,
primaryTerm: item._primary_term!,
},
});
found++;
// Update cache
if (this.config.enableCache && !options.skipCache) {
const ttl = doc.expiresAt
? Math.floor((new Date(doc.expiresAt).getTime() - Date.now()) / 1000)
: undefined;
this.cacheSet(item._id, value, {
seqNo: item._seq_no!,
primaryTerm: item._primary_term!,
}, ttl);
}
} else {
results.set(item._id, {
success: true,
exists: false,
cacheHit: false,
});
notFound++;
}
}
} catch (error) {
this.logger.error('Batch get failed', { keys: uncachedKeys, error });
}
}
// Add cached results
for (const [key, value] of cachedResults) {
results.set(key, {
success: true,
value,
exists: true,
cacheHit: true,
});
found++;
}
this.metrics.recordCounter('kv.mget', keys.length, {
index: this.config.index,
cache_hits: cacheHits,
});
return {
results,
found,
notFound,
cacheHits,
};
}
/**
* Batch set multiple key-value pairs
*/
async mset(
entries: Array<{ key: string; value: T; options?: KVSetOptions }>
): Promise<KVBatchSetResult> {
const results = new Map<string, KVOperationResult>();
let successful = 0;
let failed = 0;
for (const { key, value, options } of entries) {
const result = await this.set(key, value, options);
results.set(key, result);
if (result.success) {
successful++;
} else {
failed++;
}
}
return {
successful,
failed,
results,
};
}
/**
* Batch delete multiple keys
*/
async mdelete(keys: string[], options: KVDeleteOptions = {}): Promise<KVBatchDeleteResult> {
const results = new Map<string, KVOperationResult>();
let successful = 0;
let failed = 0;
for (const key of keys) {
const result = await this.delete(key, options);
results.set(key, result);
if (result.success) {
successful++;
} else {
failed++;
}
}
return {
successful,
failed,
results,
};
}
/**
* Scan keys matching a pattern
*/
async scan(options: KVScanOptions = {}): Promise<KVScanResult<T>> {
this.stats.totalScans++;
try {
const client = ElasticsearchConnectionManager.getInstance().getClient();
const searchRequest: any = {
index: this.config.index,
size: options.limit ?? 100,
_source: options.includeValues ? true : false,
sort: ['_doc'],
};
if (options.pattern) {
searchRequest.query = {
wildcard: {
key: options.pattern,
},
};
}
if (options.cursor) {
searchRequest.search_after = [options.cursor];
}
const result = await client.search(searchRequest);
const keys: string[] = [];
const values: T[] = [];
for (const hit of result.hits.hits) {
const doc = hit._source as KVDocument<unknown>;
// Check expiration
if (doc.expiresAt && new Date(doc.expiresAt) < new Date()) {
continue;
}
keys.push(doc.key);
if (options.includeValues) {
// Decompress if needed
let value: T;
if (doc.metadata?.compressed) {
const decompressed = await gunzipAsync(Buffer.from(doc.value as string, 'base64'));
value = JSON.parse(decompressed.toString('utf8'));
} else {
value = doc.value as T;
}
values.push(value);
}
}
const lastHit = result.hits.hits[result.hits.hits.length - 1];
const nextCursor = lastHit?.sort?.[0] as string | undefined;
this.metrics.recordCounter('kv.scan', 1, {
index: this.config.index,
pattern: options.pattern ?? '*',
});
return {
keys,
values: options.includeValues ? values : undefined,
nextCursor,
total: typeof result.hits.total === 'number' ? result.hits.total : result.hits.total?.value ?? 0,
hasMore: keys.length === (options.limit ?? 100),
};
} catch (error) {
this.logger.error('KV scan failed', { options, error });
return {
keys: [],
values: options.includeValues ? [] : undefined,
total: 0,
hasMore: false,
};
}
}
/**
* Get store statistics
*/
getStats(): KVStoreStats {
// Update cache stats
if (this.config.enableCache && this.stats.cacheStats) {
this.stats.cacheStats.size = this.cache.size;
this.stats.cacheStats.hitRatio =
this.stats.cacheStats.hits / (this.stats.cacheStats.hits + this.stats.cacheStats.misses) ||
0;
// Estimate memory usage
let memoryUsage = 0;
for (const entry of this.cache.values()) {
memoryUsage += entry.size;
}
this.stats.cacheStats.memoryUsage = memoryUsage;
}
return { ...this.stats };
}
/**
* Clear the cache
*/
clearCache(): void {
if (this.config.enableCache) {
this.cache.clear();
this.logger.info('Cache cleared', { index: this.config.index });
}
}
// ============================================================================
// Private Methods
// ============================================================================
/**
* Ensure index exists with proper mappings
*/
private async ensureIndex(): Promise<void> {
try {
const client = ElasticsearchConnectionManager.getInstance().getClient();
const exists = await client.indices.exists({
index: this.config.index,
});
if (!exists) {
await client.indices.create({
index: this.config.index,
mappings: {
properties: {
key: { type: 'keyword' },
value: { type: 'object', enabled: false },
createdAt: { type: 'date' },
updatedAt: { type: 'date' },
expiresAt: { type: 'date' },
metadata: {
properties: {
size: { type: 'long' },
compressed: { type: 'boolean' },
contentType: { type: 'keyword' },
tags: { type: 'keyword' },
},
},
},
},
});
this.logger.info('KV index created', { index: this.config.index });
}
} catch (error) {
this.logger.error('Failed to ensure KV index', { error });
throw error;
}
}
/**
* Update total key count
*/
private async updateKeyCount(): Promise<void> {
try {
const client = ElasticsearchConnectionManager.getInstance().getClient();
const result = await client.count({
index: this.config.index,
});
this.stats.totalKeys = result.count;
} catch (error) {
// Index might not exist yet
this.stats.totalKeys = 0;
}
}
/**
* Start expiration cleanup timer
*/
private startExpirationCleanup(): void {
this.cleanupTimer = setInterval(
async () => {
await this.cleanupExpiredKeys();
},
this.config.cleanupIntervalSeconds * 1000
);
}
/**
* Clean up expired keys
*/
private async cleanupExpiredKeys(): Promise<void> {
try {
const client = ElasticsearchConnectionManager.getInstance().getClient();
// Find expired keys
const result = await client.search({
index: this.config.index,
size: this.config.cleanupBatchSize,
_source: false,
query: {
range: {
expiresAt: {
lt: 'now',
gte: null,
},
},
},
});
if (result.hits.hits.length === 0) {
return;
}
// Delete expired keys
const operations = result.hits.hits.flatMap((hit) => [
{ delete: { _index: this.config.index, _id: hit._id } },
]);
await client.bulk({
operations,
refresh: false,
});
this.stats.totalExpired += result.hits.hits.length;
this.logger.info('Expired keys cleaned up', {
index: this.config.index,
count: result.hits.hits.length,
});
this.metrics.recordCounter('kv.expired', result.hits.hits.length, {
index: this.config.index,
});
} catch (error) {
this.logger.error('Failed to cleanup expired keys', { error });
}
}
/**
* Get value from cache
*/
private cacheGet(key: string): CacheEntry<T> | null {
if (!this.config.enableCache || !this.stats.cacheStats) {
return null;
}
const entry = this.cache.get(key);
if (!entry) {
this.stats.cacheStats.misses++;
return null;
}
// Check expiration
if (entry.expiresAt && entry.expiresAt < new Date()) {
this.cache.delete(key);
this.stats.cacheStats.misses++;
return null;
}
// Update access stats (for LRU/LFU)
entry.lastAccessedAt = new Date();
entry.accessCount++;
this.stats.cacheStats.hits++;
return entry;
}
/**
* Set value in cache
*/
private cacheSet(
key: string,
value: T,
version?: { seqNo: number; primaryTerm: number },
ttl?: number
): void {
if (!this.config.enableCache || !this.stats.cacheStats) {
return;
}
// Check if cache is full
if (this.cache.size >= this.config.cacheMaxSize && !this.cache.has(key)) {
this.evictCacheEntry();
}
const now = new Date();
const expiresAt = ttl
? new Date(now.getTime() + Math.min(ttl, this.config.cacheTTL) * 1000)
: new Date(now.getTime() + this.config.cacheTTL * 1000);
const size = Buffer.byteLength(JSON.stringify(value), 'utf8');
const entry: CacheEntry<T> = {
value,
cachedAt: now,
expiresAt,
lastAccessedAt: now,
accessCount: 0,
size,
version,
};
this.cache.set(key, entry);
}
/**
* Delete value from cache
*/
private cacheDelete(key: string): void {
if (!this.config.enableCache) {
return;
}
this.cache.delete(key);
}
/**
* Evict a cache entry based on eviction policy
*/
private evictCacheEntry(): void {
if (!this.stats.cacheStats) {
return;
}
let keyToEvict: string | null = null;
switch (this.config.cacheEvictionPolicy) {
case 'lru': {
// Evict least recently used
let oldestTime = Infinity;
for (const [key, entry] of this.cache) {
const time = entry.lastAccessedAt.getTime();
if (time < oldestTime) {
oldestTime = time;
keyToEvict = key;
}
}
break;
}
case 'lfu': {
// Evict least frequently used
let lowestCount = Infinity;
for (const [key, entry] of this.cache) {
if (entry.accessCount < lowestCount) {
lowestCount = entry.accessCount;
keyToEvict = key;
}
}
break;
}
case 'fifo': {
// Evict first in (oldest cached)
let oldestTime = Infinity;
for (const [key, entry] of this.cache) {
const time = entry.cachedAt.getTime();
if (time < oldestTime) {
oldestTime = time;
keyToEvict = key;
}
}
break;
}
case 'ttl': {
// Evict soonest to expire
let soonestExpiry = Infinity;
for (const [key, entry] of this.cache) {
if (entry.expiresAt) {
const time = entry.expiresAt.getTime();
if (time < soonestExpiry) {
soonestExpiry = time;
keyToEvict = key;
}
}
}
break;
}
}
if (keyToEvict) {
this.cache.delete(keyToEvict);
this.stats.cacheStats.evictions++;
}
}
}
/**
* Factory function to create a KV store
*/
export function createKVStore<T = unknown>(config: KVStoreConfig): KVStore<T> {
return new KVStore<T>(config);
}