290 lines
8.0 KiB
TypeScript
290 lines
8.0 KiB
TypeScript
|
|
// classes.watcher.ts
|
||
|
|
|
||
|
|
import * as plugins from './plugins.js';
|
||
|
|
import * as interfaces from './interfaces.js';
|
||
|
|
import type { Bucket } from './classes.bucket.js';
|
||
|
|
import { EventEmitter } from 'node:events';
|
||
|
|
|
||
|
|
/**
|
||
|
|
* BucketWatcher monitors an S3 bucket for changes (add/modify/delete)
|
||
|
|
* using a polling-based approach. Designed to follow the SmartdataDbWatcher pattern.
|
||
|
|
*
|
||
|
|
* @example
|
||
|
|
* ```typescript
|
||
|
|
* const watcher = bucket.createWatcher({ prefix: 'uploads/', pollIntervalMs: 3000 });
|
||
|
|
*
|
||
|
|
* // RxJS Observable pattern
|
||
|
|
* watcher.changeSubject.subscribe((change) => {
|
||
|
|
* console.log('Change:', change);
|
||
|
|
* });
|
||
|
|
*
|
||
|
|
* // EventEmitter pattern
|
||
|
|
* watcher.on('change', (change) => console.log(change));
|
||
|
|
* watcher.on('error', (err) => console.error(err));
|
||
|
|
*
|
||
|
|
* await watcher.start();
|
||
|
|
* await watcher.readyDeferred.promise; // Wait for initial state
|
||
|
|
*
|
||
|
|
* // Later...
|
||
|
|
* await watcher.stop();
|
||
|
|
* ```
|
||
|
|
*/
|
||
|
|
export class BucketWatcher extends EventEmitter {
|
||
|
|
/** Deferred that resolves when initial state is built and watcher is ready */
|
||
|
|
public readyDeferred = plugins.smartpromise.defer();
|
||
|
|
|
||
|
|
/** Observable for receiving change events (supports RxJS operators) */
|
||
|
|
public changeSubject: plugins.smartrx.rxjs.Observable<interfaces.IS3ChangeEvent | interfaces.IS3ChangeEvent[]>;
|
||
|
|
|
||
|
|
// Internal subjects and state
|
||
|
|
private rawSubject: plugins.smartrx.rxjs.Subject<interfaces.IS3ChangeEvent>;
|
||
|
|
private previousState: Map<string, interfaces.IS3ObjectState>;
|
||
|
|
private pollIntervalId: ReturnType<typeof setInterval> | null = null;
|
||
|
|
private isPolling = false;
|
||
|
|
private isStopped = false;
|
||
|
|
|
||
|
|
// Configuration
|
||
|
|
private readonly bucketRef: Bucket;
|
||
|
|
private readonly prefix: string;
|
||
|
|
private readonly pollIntervalMs: number;
|
||
|
|
private readonly bufferTimeMs?: number;
|
||
|
|
private readonly includeInitial: boolean;
|
||
|
|
private readonly pageSize: number;
|
||
|
|
|
||
|
|
constructor(bucketRef: Bucket, options: interfaces.IBucketWatcherOptions = {}) {
|
||
|
|
super();
|
||
|
|
|
||
|
|
this.bucketRef = bucketRef;
|
||
|
|
this.prefix = options.prefix ?? '';
|
||
|
|
this.pollIntervalMs = options.pollIntervalMs ?? 5000;
|
||
|
|
this.bufferTimeMs = options.bufferTimeMs;
|
||
|
|
this.includeInitial = options.includeInitial ?? false;
|
||
|
|
this.pageSize = options.pageSize ?? 1000;
|
||
|
|
|
||
|
|
// Initialize state tracking
|
||
|
|
this.previousState = new Map();
|
||
|
|
|
||
|
|
// Initialize raw subject for emitting changes
|
||
|
|
this.rawSubject = new plugins.smartrx.rxjs.Subject<interfaces.IS3ChangeEvent>();
|
||
|
|
|
||
|
|
// Configure the public observable with optional buffering
|
||
|
|
if (this.bufferTimeMs && this.bufferTimeMs > 0) {
|
||
|
|
this.changeSubject = this.rawSubject.pipe(
|
||
|
|
plugins.smartrx.rxjs.ops.bufferTime(this.bufferTimeMs),
|
||
|
|
plugins.smartrx.rxjs.ops.filter((events: interfaces.IS3ChangeEvent[]) => events.length > 0)
|
||
|
|
);
|
||
|
|
} else {
|
||
|
|
this.changeSubject = this.rawSubject.asObservable();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Start watching the bucket for changes
|
||
|
|
*/
|
||
|
|
public async start(): Promise<void> {
|
||
|
|
if (this.pollIntervalId !== null) {
|
||
|
|
console.log('BucketWatcher is already running');
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
this.isStopped = false;
|
||
|
|
|
||
|
|
// Build initial state
|
||
|
|
await this.buildInitialState();
|
||
|
|
|
||
|
|
// Emit initial state as 'add' events if configured
|
||
|
|
if (this.includeInitial) {
|
||
|
|
for (const state of this.previousState.values()) {
|
||
|
|
this.emitChange({
|
||
|
|
type: 'add',
|
||
|
|
key: state.key,
|
||
|
|
size: state.size,
|
||
|
|
etag: state.etag,
|
||
|
|
lastModified: state.lastModified,
|
||
|
|
bucket: this.bucketRef.name,
|
||
|
|
});
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Mark as ready
|
||
|
|
this.readyDeferred.resolve();
|
||
|
|
|
||
|
|
// Start polling loop
|
||
|
|
this.pollIntervalId = setInterval(() => {
|
||
|
|
this.poll().catch((err) => {
|
||
|
|
this.emit('error', err);
|
||
|
|
});
|
||
|
|
}, this.pollIntervalMs);
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Stop watching the bucket
|
||
|
|
*/
|
||
|
|
public async stop(): Promise<void> {
|
||
|
|
this.isStopped = true;
|
||
|
|
|
||
|
|
if (this.pollIntervalId !== null) {
|
||
|
|
clearInterval(this.pollIntervalId);
|
||
|
|
this.pollIntervalId = null;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Wait for any in-progress poll to complete
|
||
|
|
while (this.isPolling) {
|
||
|
|
await new Promise<void>((resolve) => setTimeout(resolve, 50));
|
||
|
|
}
|
||
|
|
|
||
|
|
this.rawSubject.complete();
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Alias for stop() - for consistency with other APIs
|
||
|
|
*/
|
||
|
|
public async close(): Promise<void> {
|
||
|
|
return this.stop();
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Build the initial state by listing all objects with metadata
|
||
|
|
*/
|
||
|
|
private async buildInitialState(): Promise<void> {
|
||
|
|
this.previousState.clear();
|
||
|
|
|
||
|
|
for await (const obj of this.listObjectsWithMetadata()) {
|
||
|
|
if (obj.Key) {
|
||
|
|
this.previousState.set(obj.Key, {
|
||
|
|
key: obj.Key,
|
||
|
|
etag: obj.ETag ?? '',
|
||
|
|
size: obj.Size ?? 0,
|
||
|
|
lastModified: obj.LastModified ?? new Date(0),
|
||
|
|
});
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Poll for changes by comparing current state against previous state
|
||
|
|
*/
|
||
|
|
private async poll(): Promise<void> {
|
||
|
|
// Guard against overlapping polls
|
||
|
|
if (this.isPolling || this.isStopped) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
this.isPolling = true;
|
||
|
|
|
||
|
|
try {
|
||
|
|
// Build current state
|
||
|
|
const currentState = new Map<string, interfaces.IS3ObjectState>();
|
||
|
|
|
||
|
|
for await (const obj of this.listObjectsWithMetadata()) {
|
||
|
|
if (this.isStopped) {
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
|
||
|
|
if (obj.Key) {
|
||
|
|
currentState.set(obj.Key, {
|
||
|
|
key: obj.Key,
|
||
|
|
etag: obj.ETag ?? '',
|
||
|
|
size: obj.Size ?? 0,
|
||
|
|
lastModified: obj.LastModified ?? new Date(0),
|
||
|
|
});
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if (!this.isStopped) {
|
||
|
|
this.detectChanges(currentState);
|
||
|
|
this.previousState = currentState;
|
||
|
|
}
|
||
|
|
} catch (err) {
|
||
|
|
this.emit('error', err);
|
||
|
|
} finally {
|
||
|
|
this.isPolling = false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Detect changes between current and previous state
|
||
|
|
*/
|
||
|
|
private detectChanges(currentState: Map<string, interfaces.IS3ObjectState>): void {
|
||
|
|
// Detect added and modified objects
|
||
|
|
for (const [key, current] of currentState) {
|
||
|
|
const previous = this.previousState.get(key);
|
||
|
|
|
||
|
|
if (!previous) {
|
||
|
|
// New object - emit 'add' event
|
||
|
|
this.emitChange({
|
||
|
|
type: 'add',
|
||
|
|
key: current.key,
|
||
|
|
size: current.size,
|
||
|
|
etag: current.etag,
|
||
|
|
lastModified: current.lastModified,
|
||
|
|
bucket: this.bucketRef.name,
|
||
|
|
});
|
||
|
|
} else if (
|
||
|
|
previous.etag !== current.etag ||
|
||
|
|
previous.size !== current.size ||
|
||
|
|
previous.lastModified.getTime() !== current.lastModified.getTime()
|
||
|
|
) {
|
||
|
|
// Object modified - emit 'modify' event
|
||
|
|
this.emitChange({
|
||
|
|
type: 'modify',
|
||
|
|
key: current.key,
|
||
|
|
size: current.size,
|
||
|
|
etag: current.etag,
|
||
|
|
lastModified: current.lastModified,
|
||
|
|
bucket: this.bucketRef.name,
|
||
|
|
});
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Detect deleted objects
|
||
|
|
for (const [key, previous] of this.previousState) {
|
||
|
|
if (!currentState.has(key)) {
|
||
|
|
// Object deleted - emit 'delete' event
|
||
|
|
this.emitChange({
|
||
|
|
type: 'delete',
|
||
|
|
key: previous.key,
|
||
|
|
bucket: this.bucketRef.name,
|
||
|
|
});
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Emit a change event via both RxJS Subject and EventEmitter
|
||
|
|
*/
|
||
|
|
private emitChange(event: interfaces.IS3ChangeEvent): void {
|
||
|
|
this.rawSubject.next(event);
|
||
|
|
this.emit('change', event);
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* List objects with full metadata (ETag, Size, LastModified)
|
||
|
|
* This is a private method that yields full _Object data, not just keys
|
||
|
|
*/
|
||
|
|
private async *listObjectsWithMetadata(): AsyncIterableIterator<plugins.s3._Object> {
|
||
|
|
let continuationToken: string | undefined;
|
||
|
|
|
||
|
|
do {
|
||
|
|
if (this.isStopped) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
const command = new plugins.s3.ListObjectsV2Command({
|
||
|
|
Bucket: this.bucketRef.name,
|
||
|
|
Prefix: this.prefix,
|
||
|
|
MaxKeys: this.pageSize,
|
||
|
|
ContinuationToken: continuationToken,
|
||
|
|
});
|
||
|
|
|
||
|
|
const response = await this.bucketRef.smartbucketRef.s3Client.send(command);
|
||
|
|
|
||
|
|
for (const obj of response.Contents || []) {
|
||
|
|
yield obj;
|
||
|
|
}
|
||
|
|
|
||
|
|
continuationToken = response.NextContinuationToken;
|
||
|
|
} while (continuationToken);
|
||
|
|
}
|
||
|
|
}
|