// test.watcher.node.ts - Tests for BucketWatcher import { tap, expect } from '@git.zone/tstest/tapbundle'; import * as smartbucket from '../ts/index.js'; import type { IS3ChangeEvent } from '../ts/interfaces.js'; // Get test configuration import * as qenv from '@push.rocks/qenv'; const testQenv = new qenv.Qenv('./', './.nogit/'); // Test bucket reference let testBucket: smartbucket.Bucket; let testSmartbucket: smartbucket.SmartBucket; // Setup: Create test bucket tap.test('should create valid smartbucket and bucket', async () => { testSmartbucket = new smartbucket.SmartBucket({ accessKey: await testQenv.getEnvVarOnDemand('S3_ACCESSKEY'), accessSecret: await testQenv.getEnvVarOnDemand('S3_SECRETKEY'), endpoint: await testQenv.getEnvVarOnDemand('S3_ENDPOINT'), port: parseInt(await testQenv.getEnvVarOnDemand('S3_PORT')), useSsl: false, }); testBucket = await smartbucket.Bucket.getBucketByName( testSmartbucket, await testQenv.getEnvVarOnDemand('S3_BUCKET') ); expect(testBucket).toBeInstanceOf(smartbucket.Bucket); }); tap.test('should clean bucket for watcher tests', async () => { await testBucket.cleanAllContents(); }); // ========================== // Basic Watcher Tests // ========================== tap.test('should create watcher with default options', async () => { const watcher = testBucket.createWatcher(); expect(watcher).toBeInstanceOf(smartbucket.BucketWatcher); expect(watcher.changeSubject).toBeDefined(); }); tap.test('should create watcher with custom options', async () => { const watcher = testBucket.createWatcher({ prefix: 'test/', pollIntervalMs: 2000, includeInitial: true, }); expect(watcher).toBeInstanceOf(smartbucket.BucketWatcher); }); // ========================== // Add Event Detection Tests // ========================== tap.test('should detect add events for new files', async () => { const events: IS3ChangeEvent[] = []; const watcher = testBucket.createWatcher({ prefix: 'watcher-test/', pollIntervalMs: 500, }); watcher.changeSubject.subscribe((event) => { if (!Array.isArray(event)) { events.push(event); } }); await watcher.start(); await watcher.readyDeferred.promise; // Create a new file await testBucket.fastPut({ path: 'watcher-test/new-file.txt', contents: 'test content', }); // Wait for poll to detect the change await new Promise((resolve) => setTimeout(resolve, 1200)); await watcher.stop(); expect(events.length).toBeGreaterThanOrEqual(1); const addEvent = events.find((e) => e.type === 'add' && e.key === 'watcher-test/new-file.txt'); expect(addEvent).toBeDefined(); expect(addEvent!.bucket).toEqual(testBucket.name); }); // ========================== // Modify Event Detection Tests // ========================== tap.test('should detect modify events for changed files', async () => { const events: IS3ChangeEvent[] = []; const watcher = testBucket.createWatcher({ prefix: 'watcher-test/', pollIntervalMs: 500, }); watcher.changeSubject.subscribe((event) => { if (!Array.isArray(event)) { events.push(event); } }); await watcher.start(); await watcher.readyDeferred.promise; // Modify the file await testBucket.fastPut({ path: 'watcher-test/new-file.txt', contents: 'modified content with different size', overwrite: true, }); // Wait for poll to detect the change await new Promise((resolve) => setTimeout(resolve, 1200)); await watcher.stop(); expect(events.length).toBeGreaterThanOrEqual(1); const modifyEvent = events.find((e) => e.type === 'modify' && e.key === 'watcher-test/new-file.txt'); expect(modifyEvent).toBeDefined(); }); // ========================== // Delete Event Detection Tests // ========================== tap.test('should detect delete events for removed files', async () => { const events: IS3ChangeEvent[] = []; const watcher = testBucket.createWatcher({ prefix: 'watcher-test/', pollIntervalMs: 500, }); watcher.changeSubject.subscribe((event) => { if (!Array.isArray(event)) { events.push(event); } }); await watcher.start(); await watcher.readyDeferred.promise; // Delete the file await testBucket.fastRemove({ path: 'watcher-test/new-file.txt' }); // Wait for poll to detect the change await new Promise((resolve) => setTimeout(resolve, 1200)); await watcher.stop(); expect(events.length).toBeGreaterThanOrEqual(1); const deleteEvent = events.find((e) => e.type === 'delete' && e.key === 'watcher-test/new-file.txt'); expect(deleteEvent).toBeDefined(); }); // ========================== // Initial State Tests // ========================== tap.test('should emit initial state as add events when includeInitial is true', async () => { // First, create some test files await testBucket.fastPut({ path: 'watcher-initial/file1.txt', contents: 'content 1', }); await testBucket.fastPut({ path: 'watcher-initial/file2.txt', contents: 'content 2', }); const events: IS3ChangeEvent[] = []; const watcher = testBucket.createWatcher({ prefix: 'watcher-initial/', pollIntervalMs: 10000, // Long interval - we only care about initial events includeInitial: true, }); watcher.changeSubject.subscribe((event) => { if (!Array.isArray(event)) { events.push(event); } }); await watcher.start(); await watcher.readyDeferred.promise; // Give a moment for events to propagate await new Promise((resolve) => setTimeout(resolve, 100)); await watcher.stop(); expect(events.length).toEqual(2); expect(events.every((e) => e.type === 'add')).toBeTrue(); expect(events.some((e) => e.key === 'watcher-initial/file1.txt')).toBeTrue(); expect(events.some((e) => e.key === 'watcher-initial/file2.txt')).toBeTrue(); }); // ========================== // EventEmitter Pattern Tests // ========================== tap.test('should emit events via EventEmitter pattern', async () => { const events: IS3ChangeEvent[] = []; const watcher = testBucket.createWatcher({ prefix: 'watcher-emitter/', pollIntervalMs: 500, }); watcher.on('change', (event: IS3ChangeEvent) => { events.push(event); }); await watcher.start(); await watcher.readyDeferred.promise; // Create a new file await testBucket.fastPut({ path: 'watcher-emitter/emitter-test.txt', contents: 'test content', }); // Wait for poll to detect the change await new Promise((resolve) => setTimeout(resolve, 1200)); await watcher.stop(); expect(events.length).toBeGreaterThanOrEqual(1); expect(events[0].type).toEqual('add'); }); // ========================== // Buffered Events Tests // ========================== tap.test('should buffer events when bufferTimeMs is set', async () => { const bufferedEvents: (IS3ChangeEvent | IS3ChangeEvent[])[] = []; const watcher = testBucket.createWatcher({ prefix: 'watcher-buffer/', pollIntervalMs: 200, bufferTimeMs: 1000, }); watcher.changeSubject.subscribe((event) => { bufferedEvents.push(event); }); await watcher.start(); await watcher.readyDeferred.promise; // Create multiple files quickly await testBucket.fastPut({ path: 'watcher-buffer/file1.txt', contents: 'content 1', }); await testBucket.fastPut({ path: 'watcher-buffer/file2.txt', contents: 'content 2', }); await testBucket.fastPut({ path: 'watcher-buffer/file3.txt', contents: 'content 3', }); // Wait for buffer to emit await new Promise((resolve) => setTimeout(resolve, 1500)); await watcher.stop(); // With buffering, events should come as arrays expect(bufferedEvents.length).toBeGreaterThanOrEqual(1); // At least one buffered emission should contain multiple events const hasBufferedArray = bufferedEvents.some( (e) => Array.isArray(e) && e.length >= 1 ); expect(hasBufferedArray).toBeTrue(); }); // ========================== // Error Handling Tests // ========================== tap.test('should emit error events on error', async () => { const errors: Error[] = []; const watcher = testBucket.createWatcher({ prefix: 'watcher-error/', pollIntervalMs: 500, }); watcher.on('error', (err: Error) => { errors.push(err); }); await watcher.start(); await watcher.readyDeferred.promise; // Note: Triggering actual S3 errors is tricky in tests. // We just verify the error handler is properly attached. await watcher.stop(); // No errors expected in normal operation expect(errors.length).toEqual(0); }); // ========================== // Graceful Stop Tests // ========================== tap.test('should stop gracefully with stop()', async () => { const watcher = testBucket.createWatcher({ prefix: 'watcher-stop/', pollIntervalMs: 100, }); await watcher.start(); await watcher.readyDeferred.promise; // Let it poll a few times await new Promise((resolve) => setTimeout(resolve, 300)); // Stop should complete without errors await watcher.stop(); // Watcher should not poll after stop const eventsCaptured: IS3ChangeEvent[] = []; watcher.on('change', (event: IS3ChangeEvent) => { eventsCaptured.push(event); }); // Create a file after stop await testBucket.fastPut({ path: 'watcher-stop/after-stop.txt', contents: 'should not be detected', }); await new Promise((resolve) => setTimeout(resolve, 200)); // No events should be captured after stop expect(eventsCaptured.length).toEqual(0); }); tap.test('should stop gracefully with close() alias', async () => { const watcher = testBucket.createWatcher({ prefix: 'watcher-close/', pollIntervalMs: 100, }); await watcher.start(); await watcher.readyDeferred.promise; // close() should work as alias for stop() await watcher.close(); }); // ========================== // Prefix Filtering Tests // ========================== tap.test('should only detect changes within specified prefix', async () => { const events: IS3ChangeEvent[] = []; const watcher = testBucket.createWatcher({ prefix: 'watcher-prefix-a/', pollIntervalMs: 500, }); watcher.changeSubject.subscribe((event) => { if (!Array.isArray(event)) { events.push(event); } }); await watcher.start(); await watcher.readyDeferred.promise; // Create file in watched prefix await testBucket.fastPut({ path: 'watcher-prefix-a/watched.txt', contents: 'watched content', }); // Create file outside watched prefix await testBucket.fastPut({ path: 'watcher-prefix-b/unwatched.txt', contents: 'unwatched content', }); // Wait for poll await new Promise((resolve) => setTimeout(resolve, 1200)); await watcher.stop(); // Should only see the file in watcher-prefix-a/ expect(events.length).toEqual(1); expect(events[0].key).toEqual('watcher-prefix-a/watched.txt'); }); // ========================== // Cleanup // ========================== tap.test('should clean up test data', async () => { await testBucket.cleanAllContents(); }); export default tap.start();