feat(storage): generalize S3 client and watcher interfaces to storage-oriented naming with backward compatibility
This commit is contained in:
@@ -6,7 +6,7 @@ import type { Bucket } from './classes.bucket.js';
|
||||
import { EventEmitter } from 'node:events';
|
||||
|
||||
/**
|
||||
* BucketWatcher monitors an S3 bucket for changes (add/modify/delete)
|
||||
* BucketWatcher monitors a storage bucket for changes (add/modify/delete)
|
||||
* using a polling-based approach. Designed to follow the SmartdataDbWatcher pattern.
|
||||
*
|
||||
* @example
|
||||
@@ -34,11 +34,11 @@ export class BucketWatcher extends EventEmitter {
|
||||
public readyDeferred = plugins.smartpromise.defer();
|
||||
|
||||
/** Observable for receiving change events (supports RxJS operators) */
|
||||
public changeSubject: plugins.smartrx.rxjs.Observable<interfaces.IS3ChangeEvent | interfaces.IS3ChangeEvent[]>;
|
||||
public changeSubject: plugins.smartrx.rxjs.Observable<interfaces.IStorageChangeEvent | interfaces.IStorageChangeEvent[]>;
|
||||
|
||||
// Internal subjects and state
|
||||
private rawSubject: plugins.smartrx.rxjs.Subject<interfaces.IS3ChangeEvent>;
|
||||
private previousState: Map<string, interfaces.IS3ObjectState>;
|
||||
private rawSubject: plugins.smartrx.rxjs.Subject<interfaces.IStorageChangeEvent>;
|
||||
private previousState: Map<string, interfaces.IStorageObjectState>;
|
||||
private pollIntervalId: ReturnType<typeof setInterval> | null = null;
|
||||
private isPolling = false;
|
||||
private isStopped = false;
|
||||
@@ -65,13 +65,13 @@ export class BucketWatcher extends EventEmitter {
|
||||
this.previousState = new Map();
|
||||
|
||||
// Initialize raw subject for emitting changes
|
||||
this.rawSubject = new plugins.smartrx.rxjs.Subject<interfaces.IS3ChangeEvent>();
|
||||
this.rawSubject = new plugins.smartrx.rxjs.Subject<interfaces.IStorageChangeEvent>();
|
||||
|
||||
// Configure the public observable with optional buffering
|
||||
if (this.bufferTimeMs && this.bufferTimeMs > 0) {
|
||||
this.changeSubject = this.rawSubject.pipe(
|
||||
plugins.smartrx.rxjs.ops.bufferTime(this.bufferTimeMs),
|
||||
plugins.smartrx.rxjs.ops.filter((events: interfaces.IS3ChangeEvent[]) => events.length > 0)
|
||||
plugins.smartrx.rxjs.ops.filter((events: interfaces.IStorageChangeEvent[]) => events.length > 0)
|
||||
);
|
||||
} else {
|
||||
this.changeSubject = this.rawSubject.asObservable();
|
||||
@@ -174,7 +174,7 @@ export class BucketWatcher extends EventEmitter {
|
||||
|
||||
try {
|
||||
// Build current state
|
||||
const currentState = new Map<string, interfaces.IS3ObjectState>();
|
||||
const currentState = new Map<string, interfaces.IStorageObjectState>();
|
||||
|
||||
for await (const obj of this.listObjectsWithMetadata()) {
|
||||
if (this.isStopped) {
|
||||
@@ -205,7 +205,7 @@ export class BucketWatcher extends EventEmitter {
|
||||
/**
|
||||
* Detect changes between current and previous state
|
||||
*/
|
||||
private detectChanges(currentState: Map<string, interfaces.IS3ObjectState>): void {
|
||||
private detectChanges(currentState: Map<string, interfaces.IStorageObjectState>): void {
|
||||
// Detect added and modified objects
|
||||
for (const [key, current] of currentState) {
|
||||
const previous = this.previousState.get(key);
|
||||
@@ -253,7 +253,7 @@ export class BucketWatcher extends EventEmitter {
|
||||
/**
|
||||
* Emit a change event via both RxJS Subject and EventEmitter
|
||||
*/
|
||||
private emitChange(event: interfaces.IS3ChangeEvent): void {
|
||||
private emitChange(event: interfaces.IStorageChangeEvent): void {
|
||||
this.rawSubject.next(event);
|
||||
this.emit('change', event);
|
||||
}
|
||||
@@ -277,7 +277,7 @@ export class BucketWatcher extends EventEmitter {
|
||||
ContinuationToken: continuationToken,
|
||||
});
|
||||
|
||||
const response = await this.bucketRef.smartbucketRef.s3Client.send(command);
|
||||
const response = await this.bucketRef.smartbucketRef.storageClient.send(command);
|
||||
|
||||
for (const obj of response.Contents || []) {
|
||||
yield obj;
|
||||
|
||||
Reference in New Issue
Block a user