fix(storage): rename S3 configuration and change stream interfaces to storage-oriented types

This commit is contained in:
2026-03-14 23:27:25 +00:00
parent 2da2d57df1
commit a829f76d4b
21 changed files with 2133 additions and 2014 deletions

View File

@@ -1,7 +1,7 @@
import * as plugins from '../plugins.js';
import type { TsView } from '../tsview.classes.tsview.js';
import type * as interfaces from './interfaces.streaming.js';
import type { IS3ChangeEvent } from '@push.rocks/smartbucket';
import type { IStorageChangeEvent } from './interfaces.streaming.js';
/**
* Subscription entry tracking a client's subscription to a resource
@@ -21,19 +21,19 @@ interface IMongoWatcherEntry {
}
/**
* S3 watcher entry
* Storage watcher entry
*/
interface IS3WatcherEntry {
interface IStorageWatcherEntry {
watcher: plugins.smartbucket.BucketWatcher;
subscriptions: Map<string, ISubscriptionEntry>; // connectionId -> subscription
}
/**
* ChangeStreamManager manages real-time change streaming for both MongoDB and S3.
* ChangeStreamManager manages real-time change streaming for both MongoDB and storage.
*
* Features:
* - MongoDB Change Streams for real-time database updates
* - S3 BucketWatcher for polling-based S3 change detection
* - S3 BucketWatcher for polling-based storage change detection
* - Subscription management per WebSocket client
* - Activity stream with ring buffer for recent events
* - Automatic cleanup on client disconnect
@@ -45,8 +45,8 @@ export class ChangeStreamManager {
// MongoDB watchers: "db/collection" -> watcher entry
private mongoWatchers: Map<string, IMongoWatcherEntry> = new Map();
// S3 watchers: "bucket/prefix" -> watcher entry
private s3Watchers: Map<string, IS3WatcherEntry> = new Map();
// Storage watchers: "bucket/prefix" -> watcher entry
private storageWatchers: Map<string, IStorageWatcherEntry> = new Map();
// Activity subscribers: connectionId -> subscription entry
private activitySubscribers: Map<string, ISubscriptionEntry> = new Map();
@@ -57,7 +57,7 @@ export class ChangeStreamManager {
// Global watchers for the activity stream (started lazily on first subscriber)
private globalMongoWatcher: plugins.mongodb.ChangeStream | null = null;
private globalS3Watchers: Map<string, plugins.smartbucket.BucketWatcher> = new Map();
private globalStorageWatchers: Map<string, plugins.smartbucket.BucketWatcher> = new Map();
private globalWatchersActive: boolean = false;
// Counter for generating unique subscription IDs
@@ -89,9 +89,9 @@ export class ChangeStreamManager {
}
/**
* Get the S3 key for a bucket/prefix pair
* Get the storage key for a bucket/prefix pair
*/
private getS3Key(bucket: string, prefix?: string): string {
private getStorageKey(bucket: string, prefix?: string): string {
return prefix ? `${bucket}/${prefix}` : bucket;
}
@@ -280,24 +280,24 @@ export class ChangeStreamManager {
}
// ===========================================
// S3 Change Watching
// Storage Change Watching
// ===========================================
/**
* Subscribe a client to S3 bucket/prefix changes
* Subscribe a client to storage bucket/prefix changes
*/
public async subscribeToS3(
connectionId: string,
bucket: string,
prefix?: string
): Promise<{ success: boolean; subscriptionId: string }> {
const key = this.getS3Key(bucket, prefix);
const key = this.getStorageKey(bucket, prefix);
let entry = this.s3Watchers.get(key);
let entry = this.storageWatchers.get(key);
// Create watcher if it doesn't exist
if (!entry) {
const watcher = await this.createS3Watcher(bucket, prefix);
const watcher = await this.createStorageWatcher(bucket, prefix);
if (!watcher) {
return { success: false, subscriptionId: '' };
}
@@ -306,7 +306,7 @@ export class ChangeStreamManager {
watcher,
subscriptions: new Map(),
};
this.s3Watchers.set(key, entry);
this.storageWatchers.set(key, entry);
}
// Add subscription
@@ -317,47 +317,47 @@ export class ChangeStreamManager {
createdAt: new Date(),
});
console.log(`[ChangeStream] S3 subscription added: ${key} for connection ${connectionId}`);
console.log(`[ChangeStream] Storage subscription added: ${key} for connection ${connectionId}`);
return { success: true, subscriptionId };
}
/**
* Unsubscribe a client from S3 bucket/prefix changes
* Unsubscribe a client from storage bucket/prefix changes
*/
public async unsubscribeFromS3(
connectionId: string,
bucket: string,
prefix?: string
): Promise<boolean> {
const key = this.getS3Key(bucket, prefix);
const entry = this.s3Watchers.get(key);
const key = this.getStorageKey(bucket, prefix);
const entry = this.storageWatchers.get(key);
if (!entry) {
return false;
}
entry.subscriptions.delete(connectionId);
console.log(`[ChangeStream] S3 subscription removed: ${key} for connection ${connectionId}`);
console.log(`[ChangeStream] Storage subscription removed: ${key} for connection ${connectionId}`);
// Close watcher if no more subscribers
if (entry.subscriptions.size === 0) {
await this.closeS3Watcher(key);
await this.closeStorageWatcher(key);
}
return true;
}
/**
* Create an S3 bucket watcher
* Create a storage bucket watcher
*/
private async createS3Watcher(
private async createStorageWatcher(
bucket: string,
prefix?: string
): Promise<plugins.smartbucket.BucketWatcher | null> {
try {
const smartbucket = await this.tsview.getSmartBucket();
if (!smartbucket) {
console.error('[ChangeStream] S3 not configured');
console.error('[ChangeStream] Storage not configured');
return null;
}
@@ -371,10 +371,10 @@ export class ChangeStreamManager {
});
// Subscribe to change events
watcher.changeSubject.subscribe((eventOrEvents: IS3ChangeEvent | IS3ChangeEvent[]) => {
watcher.changeSubject.subscribe((eventOrEvents: IStorageChangeEvent | IStorageChangeEvent[]) => {
const events = Array.isArray(eventOrEvents) ? eventOrEvents : [eventOrEvents];
for (const event of events) {
this.handleS3Change(bucket, prefix, event);
this.handleStorageChange(bucket, prefix, event);
}
});
@@ -382,41 +382,41 @@ export class ChangeStreamManager {
await watcher.start();
await watcher.readyDeferred.promise;
console.log(`[ChangeStream] S3 watcher created for ${bucket}${prefix ? '/' + prefix : ''}`);
console.log(`[ChangeStream] Storage watcher created for ${bucket}${prefix ? '/' + prefix : ''}`);
return watcher;
} catch (error) {
console.error(`[ChangeStream] Failed to create S3 watcher for ${bucket}:`, error);
console.error(`[ChangeStream] Failed to create storage watcher for ${bucket}:`, error);
return null;
}
}
/**
* Handle an S3 change event
* Handle a storage change event
*/
private handleS3Change(bucket: string, prefix: string | undefined, event: IS3ChangeEvent): void {
const key = this.getS3Key(bucket, prefix);
const entry = this.s3Watchers.get(key);
private handleStorageChange(bucket: string, prefix: string | undefined, event: IStorageChangeEvent): void {
const key = this.getStorageKey(bucket, prefix);
const entry = this.storageWatchers.get(key);
if (!entry) return;
// Only add to activity buffer if global watchers are NOT active.
// When active, the global S3 watchers already feed the activity stream.
// When active, the global storage watchers already feed the activity stream.
if (!this.globalWatchersActive) {
this.addToActivityBuffer('s3', event);
this.addToActivityBuffer('storage', event);
}
// Push to all subscribed clients
this.pushS3ChangeToClients(key, event);
this.pushStorageChangeToClients(key, event);
}
/**
* Push S3 change to subscribed clients
* Push storage change to subscribed clients
*/
private async pushS3ChangeToClients(
private async pushStorageChangeToClients(
key: string,
event: IS3ChangeEvent
event: IStorageChangeEvent
): Promise<void> {
const entry = this.s3Watchers.get(key);
const entry = this.storageWatchers.get(key);
if (!entry || !this.typedSocket) return;
for (const [connectionId, _sub] of entry.subscriptions) {
@@ -426,31 +426,31 @@ export class ChangeStreamManager {
});
if (connection) {
const request = this.typedSocket.createTypedRequest<interfaces.IReq_PushS3Change>(
const request = this.typedSocket.createTypedRequest<interfaces.IReq_PushStorageChange>(
'pushS3Change',
connection
);
await request.fire({ event });
}
} catch (error) {
console.error(`[ChangeStream] Failed to push S3 change to ${connectionId}:`, error);
console.error(`[ChangeStream] Failed to push storage change to ${connectionId}:`, error);
}
}
}
/**
* Close an S3 bucket watcher
* Close a storage bucket watcher
*/
private async closeS3Watcher(key: string): Promise<void> {
const entry = this.s3Watchers.get(key);
private async closeStorageWatcher(key: string): Promise<void> {
const entry = this.storageWatchers.get(key);
if (!entry) return;
try {
await entry.watcher.stop();
this.s3Watchers.delete(key);
console.log(`[ChangeStream] S3 watcher closed for ${key}`);
this.storageWatchers.delete(key);
console.log(`[ChangeStream] Storage watcher closed for ${key}`);
} catch (error) {
console.error(`[ChangeStream] Error closing S3 watcher for ${key}:`, error);
console.error(`[ChangeStream] Error closing storage watcher for ${key}:`, error);
}
}
@@ -515,11 +515,11 @@ export class ChangeStreamManager {
* Add an event to the activity buffer
*/
private addToActivityBuffer(
source: 'mongodb' | 's3',
event: interfaces.IMongoChangeEvent | IS3ChangeEvent
source: 'mongodb' | 'storage',
event: interfaces.IMongoChangeEvent | IStorageChangeEvent
): void {
const activityEvent: interfaces.IActivityEvent = {
id: `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
id: `evt_${Date.now()}_${Math.random().toString(36).substring(2, 11)}`,
source,
event,
timestamp: new Date().toISOString(),
@@ -567,7 +567,7 @@ export class ChangeStreamManager {
/**
* Start global watchers when the first activity subscriber connects.
* These watch all MongoDB and S3 activity and feed into the activity buffer.
* These watch all MongoDB and storage activity and feed into the activity buffer.
*/
private async startGlobalWatchers(): Promise<void> {
if (this.globalWatchersActive) return;
@@ -577,7 +577,7 @@ export class ChangeStreamManager {
await Promise.all([
this.startGlobalMongoWatcher(),
this.startGlobalS3Watchers(),
this.startGlobalStorageWatchers(),
]);
}
@@ -628,13 +628,13 @@ export class ChangeStreamManager {
}
/**
* Start S3 bucket watchers — one BucketWatcher per bucket.
* Start storage bucket watchers — one BucketWatcher per bucket.
*/
private async startGlobalS3Watchers(): Promise<void> {
private async startGlobalStorageWatchers(): Promise<void> {
try {
const smartbucket = await this.tsview.getSmartBucket();
if (!smartbucket) {
console.log('[ChangeStream] S3 not configured, skipping global S3 watchers');
console.log('[ChangeStream] Storage not configured, skipping global storage watchers');
return;
}
@@ -652,26 +652,26 @@ export class ChangeStreamManager {
bufferTimeMs: 500,
});
watcher.changeSubject.subscribe((eventOrEvents: IS3ChangeEvent | IS3ChangeEvent[]) => {
watcher.changeSubject.subscribe((eventOrEvents: IStorageChangeEvent | IStorageChangeEvent[]) => {
const events = Array.isArray(eventOrEvents) ? eventOrEvents : [eventOrEvents];
for (const event of events) {
this.addToActivityBuffer('s3', event);
this.addToActivityBuffer('storage', event);
}
});
await watcher.start();
await watcher.readyDeferred.promise;
this.globalS3Watchers.set(bucketName, watcher);
console.log(`[ChangeStream] Global S3 watcher started for bucket: ${bucketName}`);
this.globalStorageWatchers.set(bucketName, watcher);
console.log(`[ChangeStream] Global storage watcher started for bucket: ${bucketName}`);
} catch (bucketError) {
console.error(`[ChangeStream] Failed to start global S3 watcher for bucket ${bucketName}:`, bucketError);
console.error(`[ChangeStream] Failed to start global storage watcher for bucket ${bucketName}:`, bucketError);
}
}
console.log(`[ChangeStream] Global S3 watchers started (${this.globalS3Watchers.size}/${bucketNames.length} buckets)`);
console.log(`[ChangeStream] Global storage watchers started (${this.globalStorageWatchers.size}/${bucketNames.length} buckets)`);
} catch (error) {
console.error('[ChangeStream] Failed to start global S3 watchers:', error);
console.error('[ChangeStream] Failed to start global storage watchers:', error);
}
}
@@ -694,16 +694,16 @@ export class ChangeStreamManager {
this.globalMongoWatcher = null;
}
// Close all global S3 watchers
for (const [bucketName, watcher] of this.globalS3Watchers) {
// Close all global storage watchers
for (const [bucketName, watcher] of this.globalStorageWatchers) {
try {
await watcher.stop();
console.log(`[ChangeStream] Global S3 watcher stopped for bucket: ${bucketName}`);
console.log(`[ChangeStream] Global storage watcher stopped for bucket: ${bucketName}`);
} catch (error) {
console.error(`[ChangeStream] Error closing global S3 watcher for ${bucketName}:`, error);
console.error(`[ChangeStream] Error closing global storage watcher for ${bucketName}:`, error);
}
}
this.globalS3Watchers.clear();
this.globalStorageWatchers.clear();
this.globalWatchersActive = false;
console.log('[ChangeStream] Global watchers stopped');
@@ -729,12 +729,12 @@ export class ChangeStreamManager {
}
}
// Clean up S3 subscriptions
for (const [key, entry] of this.s3Watchers) {
// Clean up storage subscriptions
for (const [key, entry] of this.storageWatchers) {
if (entry.subscriptions.has(connectionId)) {
entry.subscriptions.delete(connectionId);
if (entry.subscriptions.size === 0) {
await this.closeS3Watcher(key);
await this.closeStorageWatcher(key);
}
}
}
@@ -762,9 +762,9 @@ export class ChangeStreamManager {
await this.closeMongoWatcher(key);
}
// Close all S3 watchers
for (const key of this.s3Watchers.keys()) {
await this.closeS3Watcher(key);
// Close all storage watchers
for (const key of this.storageWatchers.keys()) {
await this.closeStorageWatcher(key);
}
// Clear activity buffer and subscribers

View File

@@ -1,7 +1,8 @@
import type * as plugins from '../plugins.js';
// Re-export S3 change event from smartbucket
export type { IS3ChangeEvent } from '@push.rocks/smartbucket';
// Re-export storage change event from smartbucket
import type { IStorageChangeEvent } from '@push.rocks/smartbucket';
export type { IStorageChangeEvent };
/**
* MongoDB change event - wraps smartdata watcher output
@@ -24,8 +25,8 @@ export interface IMongoChangeEvent {
*/
export interface IActivityEvent {
id: string;
source: 'mongodb' | 's3';
event: IMongoChangeEvent | import('@push.rocks/smartbucket').IS3ChangeEvent;
source: 'mongodb' | 'storage';
event: IMongoChangeEvent | IStorageChangeEvent;
timestamp: string;
}
@@ -69,11 +70,11 @@ export interface IReq_UnsubscribeMongo extends plugins.typedrequestInterfaces.im
}
/**
* Subscribe to S3 bucket/prefix changes
* Subscribe to storage bucket/prefix changes
*/
export interface IReq_SubscribeS3 extends plugins.typedrequestInterfaces.implementsTR<
export interface IReq_SubscribeStorage extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_SubscribeS3
IReq_SubscribeStorage
> {
method: 'subscribeS3';
request: {
@@ -87,11 +88,11 @@ export interface IReq_SubscribeS3 extends plugins.typedrequestInterfaces.impleme
}
/**
* Unsubscribe from S3 bucket/prefix changes
* Unsubscribe from storage bucket/prefix changes
*/
export interface IReq_UnsubscribeS3 extends plugins.typedrequestInterfaces.implementsTR<
export interface IReq_UnsubscribeStorage extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_UnsubscribeS3
IReq_UnsubscribeStorage
> {
method: 'unsubscribeS3';
request: {
@@ -104,7 +105,7 @@ export interface IReq_UnsubscribeS3 extends plugins.typedrequestInterfaces.imple
}
/**
* Subscribe to activity stream (all changes from MongoDB and S3)
* Subscribe to activity stream (all changes from MongoDB and storage)
*/
export interface IReq_SubscribeActivity extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
@@ -169,15 +170,15 @@ export interface IReq_PushMongoChange extends plugins.typedrequestInterfaces.imp
}
/**
* Server pushes S3 change to client
* Server pushes storage change to client
*/
export interface IReq_PushS3Change extends plugins.typedrequestInterfaces.implementsTR<
export interface IReq_PushStorageChange extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_PushS3Change
IReq_PushStorageChange
> {
method: 'pushS3Change';
request: {
event: import('@push.rocks/smartbucket').IS3ChangeEvent;
event: IStorageChangeEvent;
};
response: {
received: boolean;
@@ -206,7 +207,7 @@ export interface IReq_PushActivityEvent extends plugins.typedrequestInterfaces.i
export interface ISubscriptionTag extends plugins.typedrequestInterfaces.ITag {
name: 'subscription';
payload: {
type: 'mongo' | 's3' | 'activity';
type: 'mongo' | 'storage' | 'activity';
key: string; // e.g., "db/collection" or "bucket/prefix" or "activity"
};
}