feat(watcher): add polling-based BucketWatcher to detect add/modify/delete events and expose RxJS Observable and EventEmitter APIs
This commit is contained in:
289
ts/classes.watcher.ts
Normal file
289
ts/classes.watcher.ts
Normal file
@@ -0,0 +1,289 @@
|
||||
// classes.watcher.ts
|
||||
|
||||
import * as plugins from './plugins.js';
|
||||
import * as interfaces from './interfaces.js';
|
||||
import type { Bucket } from './classes.bucket.js';
|
||||
import { EventEmitter } from 'node:events';
|
||||
|
||||
/**
|
||||
* BucketWatcher monitors an S3 bucket for changes (add/modify/delete)
|
||||
* using a polling-based approach. Designed to follow the SmartdataDbWatcher pattern.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const watcher = bucket.createWatcher({ prefix: 'uploads/', pollIntervalMs: 3000 });
|
||||
*
|
||||
* // RxJS Observable pattern
|
||||
* watcher.changeSubject.subscribe((change) => {
|
||||
* console.log('Change:', change);
|
||||
* });
|
||||
*
|
||||
* // EventEmitter pattern
|
||||
* watcher.on('change', (change) => console.log(change));
|
||||
* watcher.on('error', (err) => console.error(err));
|
||||
*
|
||||
* await watcher.start();
|
||||
* await watcher.readyDeferred.promise; // Wait for initial state
|
||||
*
|
||||
* // Later...
|
||||
* await watcher.stop();
|
||||
* ```
|
||||
*/
|
||||
export class BucketWatcher extends EventEmitter {
|
||||
/** Deferred that resolves when initial state is built and watcher is ready */
|
||||
public readyDeferred = plugins.smartpromise.defer();
|
||||
|
||||
/** Observable for receiving change events (supports RxJS operators) */
|
||||
public changeSubject: plugins.smartrx.rxjs.Observable<interfaces.IS3ChangeEvent | interfaces.IS3ChangeEvent[]>;
|
||||
|
||||
// Internal subjects and state
|
||||
private rawSubject: plugins.smartrx.rxjs.Subject<interfaces.IS3ChangeEvent>;
|
||||
private previousState: Map<string, interfaces.IS3ObjectState>;
|
||||
private pollIntervalId: ReturnType<typeof setInterval> | null = null;
|
||||
private isPolling = false;
|
||||
private isStopped = false;
|
||||
|
||||
// Configuration
|
||||
private readonly bucketRef: Bucket;
|
||||
private readonly prefix: string;
|
||||
private readonly pollIntervalMs: number;
|
||||
private readonly bufferTimeMs?: number;
|
||||
private readonly includeInitial: boolean;
|
||||
private readonly pageSize: number;
|
||||
|
||||
constructor(bucketRef: Bucket, options: interfaces.IBucketWatcherOptions = {}) {
|
||||
super();
|
||||
|
||||
this.bucketRef = bucketRef;
|
||||
this.prefix = options.prefix ?? '';
|
||||
this.pollIntervalMs = options.pollIntervalMs ?? 5000;
|
||||
this.bufferTimeMs = options.bufferTimeMs;
|
||||
this.includeInitial = options.includeInitial ?? false;
|
||||
this.pageSize = options.pageSize ?? 1000;
|
||||
|
||||
// Initialize state tracking
|
||||
this.previousState = new Map();
|
||||
|
||||
// Initialize raw subject for emitting changes
|
||||
this.rawSubject = new plugins.smartrx.rxjs.Subject<interfaces.IS3ChangeEvent>();
|
||||
|
||||
// Configure the public observable with optional buffering
|
||||
if (this.bufferTimeMs && this.bufferTimeMs > 0) {
|
||||
this.changeSubject = this.rawSubject.pipe(
|
||||
plugins.smartrx.rxjs.ops.bufferTime(this.bufferTimeMs),
|
||||
plugins.smartrx.rxjs.ops.filter((events: interfaces.IS3ChangeEvent[]) => events.length > 0)
|
||||
);
|
||||
} else {
|
||||
this.changeSubject = this.rawSubject.asObservable();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start watching the bucket for changes
|
||||
*/
|
||||
public async start(): Promise<void> {
|
||||
if (this.pollIntervalId !== null) {
|
||||
console.log('BucketWatcher is already running');
|
||||
return;
|
||||
}
|
||||
|
||||
this.isStopped = false;
|
||||
|
||||
// Build initial state
|
||||
await this.buildInitialState();
|
||||
|
||||
// Emit initial state as 'add' events if configured
|
||||
if (this.includeInitial) {
|
||||
for (const state of this.previousState.values()) {
|
||||
this.emitChange({
|
||||
type: 'add',
|
||||
key: state.key,
|
||||
size: state.size,
|
||||
etag: state.etag,
|
||||
lastModified: state.lastModified,
|
||||
bucket: this.bucketRef.name,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Mark as ready
|
||||
this.readyDeferred.resolve();
|
||||
|
||||
// Start polling loop
|
||||
this.pollIntervalId = setInterval(() => {
|
||||
this.poll().catch((err) => {
|
||||
this.emit('error', err);
|
||||
});
|
||||
}, this.pollIntervalMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop watching the bucket
|
||||
*/
|
||||
public async stop(): Promise<void> {
|
||||
this.isStopped = true;
|
||||
|
||||
if (this.pollIntervalId !== null) {
|
||||
clearInterval(this.pollIntervalId);
|
||||
this.pollIntervalId = null;
|
||||
}
|
||||
|
||||
// Wait for any in-progress poll to complete
|
||||
while (this.isPolling) {
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, 50));
|
||||
}
|
||||
|
||||
this.rawSubject.complete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Alias for stop() - for consistency with other APIs
|
||||
*/
|
||||
public async close(): Promise<void> {
|
||||
return this.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the initial state by listing all objects with metadata
|
||||
*/
|
||||
private async buildInitialState(): Promise<void> {
|
||||
this.previousState.clear();
|
||||
|
||||
for await (const obj of this.listObjectsWithMetadata()) {
|
||||
if (obj.Key) {
|
||||
this.previousState.set(obj.Key, {
|
||||
key: obj.Key,
|
||||
etag: obj.ETag ?? '',
|
||||
size: obj.Size ?? 0,
|
||||
lastModified: obj.LastModified ?? new Date(0),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll for changes by comparing current state against previous state
|
||||
*/
|
||||
private async poll(): Promise<void> {
|
||||
// Guard against overlapping polls
|
||||
if (this.isPolling || this.isStopped) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.isPolling = true;
|
||||
|
||||
try {
|
||||
// Build current state
|
||||
const currentState = new Map<string, interfaces.IS3ObjectState>();
|
||||
|
||||
for await (const obj of this.listObjectsWithMetadata()) {
|
||||
if (this.isStopped) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (obj.Key) {
|
||||
currentState.set(obj.Key, {
|
||||
key: obj.Key,
|
||||
etag: obj.ETag ?? '',
|
||||
size: obj.Size ?? 0,
|
||||
lastModified: obj.LastModified ?? new Date(0),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (!this.isStopped) {
|
||||
this.detectChanges(currentState);
|
||||
this.previousState = currentState;
|
||||
}
|
||||
} catch (err) {
|
||||
this.emit('error', err);
|
||||
} finally {
|
||||
this.isPolling = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect changes between current and previous state
|
||||
*/
|
||||
private detectChanges(currentState: Map<string, interfaces.IS3ObjectState>): void {
|
||||
// Detect added and modified objects
|
||||
for (const [key, current] of currentState) {
|
||||
const previous = this.previousState.get(key);
|
||||
|
||||
if (!previous) {
|
||||
// New object - emit 'add' event
|
||||
this.emitChange({
|
||||
type: 'add',
|
||||
key: current.key,
|
||||
size: current.size,
|
||||
etag: current.etag,
|
||||
lastModified: current.lastModified,
|
||||
bucket: this.bucketRef.name,
|
||||
});
|
||||
} else if (
|
||||
previous.etag !== current.etag ||
|
||||
previous.size !== current.size ||
|
||||
previous.lastModified.getTime() !== current.lastModified.getTime()
|
||||
) {
|
||||
// Object modified - emit 'modify' event
|
||||
this.emitChange({
|
||||
type: 'modify',
|
||||
key: current.key,
|
||||
size: current.size,
|
||||
etag: current.etag,
|
||||
lastModified: current.lastModified,
|
||||
bucket: this.bucketRef.name,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Detect deleted objects
|
||||
for (const [key, previous] of this.previousState) {
|
||||
if (!currentState.has(key)) {
|
||||
// Object deleted - emit 'delete' event
|
||||
this.emitChange({
|
||||
type: 'delete',
|
||||
key: previous.key,
|
||||
bucket: this.bucketRef.name,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a change event via both RxJS Subject and EventEmitter
|
||||
*/
|
||||
private emitChange(event: interfaces.IS3ChangeEvent): void {
|
||||
this.rawSubject.next(event);
|
||||
this.emit('change', event);
|
||||
}
|
||||
|
||||
/**
|
||||
* List objects with full metadata (ETag, Size, LastModified)
|
||||
* This is a private method that yields full _Object data, not just keys
|
||||
*/
|
||||
private async *listObjectsWithMetadata(): AsyncIterableIterator<plugins.s3._Object> {
|
||||
let continuationToken: string | undefined;
|
||||
|
||||
do {
|
||||
if (this.isStopped) {
|
||||
return;
|
||||
}
|
||||
|
||||
const command = new plugins.s3.ListObjectsV2Command({
|
||||
Bucket: this.bucketRef.name,
|
||||
Prefix: this.prefix,
|
||||
MaxKeys: this.pageSize,
|
||||
ContinuationToken: continuationToken,
|
||||
});
|
||||
|
||||
const response = await this.bucketRef.smartbucketRef.s3Client.send(command);
|
||||
|
||||
for (const obj of response.Contents || []) {
|
||||
yield obj;
|
||||
}
|
||||
|
||||
continuationToken = response.NextContinuationToken;
|
||||
} while (continuationToken);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user