Files
smartbucket/test/test.watcher.node.ts

411 lines
11 KiB
TypeScript
Raw Normal View History

// test.watcher.node.ts - Tests for BucketWatcher
import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as smartbucket from '../ts/index.js';
import type { IS3ChangeEvent } from '../ts/interfaces.js';
// Get test configuration
import * as qenv from '@push.rocks/qenv';
const testQenv = new qenv.Qenv('./', './.nogit/');
// Test bucket reference
let testBucket: smartbucket.Bucket;
let testSmartbucket: smartbucket.SmartBucket;
// Setup: Create test bucket
tap.test('should create valid smartbucket and bucket', async () => {
testSmartbucket = new smartbucket.SmartBucket({
accessKey: await testQenv.getEnvVarOnDemand('S3_ACCESSKEY'),
accessSecret: await testQenv.getEnvVarOnDemand('S3_SECRETKEY'),
endpoint: await testQenv.getEnvVarOnDemand('S3_ENDPOINT'),
port: parseInt(await testQenv.getEnvVarOnDemand('S3_PORT')),
useSsl: false,
});
testBucket = await smartbucket.Bucket.getBucketByName(
testSmartbucket,
await testQenv.getEnvVarOnDemand('S3_BUCKET')
);
expect(testBucket).toBeInstanceOf(smartbucket.Bucket);
});
tap.test('should clean bucket for watcher tests', async () => {
await testBucket.cleanAllContents();
});
// ==========================
// Basic Watcher Tests
// ==========================
tap.test('should create watcher with default options', async () => {
const watcher = testBucket.createWatcher();
expect(watcher).toBeInstanceOf(smartbucket.BucketWatcher);
expect(watcher.changeSubject).toBeDefined();
});
tap.test('should create watcher with custom options', async () => {
const watcher = testBucket.createWatcher({
prefix: 'test/',
pollIntervalMs: 2000,
includeInitial: true,
});
expect(watcher).toBeInstanceOf(smartbucket.BucketWatcher);
});
// ==========================
// Add Event Detection Tests
// ==========================
tap.test('should detect add events for new files', async () => {
const events: IS3ChangeEvent[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-test/',
pollIntervalMs: 500,
});
watcher.changeSubject.subscribe((event) => {
if (!Array.isArray(event)) {
events.push(event);
}
});
await watcher.start();
await watcher.readyDeferred.promise;
// Create a new file
await testBucket.fastPut({
path: 'watcher-test/new-file.txt',
contents: 'test content',
});
// Wait for poll to detect the change
await new Promise((resolve) => setTimeout(resolve, 1200));
await watcher.stop();
expect(events.length).toBeGreaterThanOrEqual(1);
const addEvent = events.find((e) => e.type === 'add' && e.key === 'watcher-test/new-file.txt');
expect(addEvent).toBeDefined();
expect(addEvent!.bucket).toEqual(testBucket.name);
});
// ==========================
// Modify Event Detection Tests
// ==========================
tap.test('should detect modify events for changed files', async () => {
const events: IS3ChangeEvent[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-test/',
pollIntervalMs: 500,
});
watcher.changeSubject.subscribe((event) => {
if (!Array.isArray(event)) {
events.push(event);
}
});
await watcher.start();
await watcher.readyDeferred.promise;
// Modify the file
await testBucket.fastPut({
path: 'watcher-test/new-file.txt',
contents: 'modified content with different size',
overwrite: true,
});
// Wait for poll to detect the change
await new Promise((resolve) => setTimeout(resolve, 1200));
await watcher.stop();
expect(events.length).toBeGreaterThanOrEqual(1);
const modifyEvent = events.find((e) => e.type === 'modify' && e.key === 'watcher-test/new-file.txt');
expect(modifyEvent).toBeDefined();
});
// ==========================
// Delete Event Detection Tests
// ==========================
tap.test('should detect delete events for removed files', async () => {
const events: IS3ChangeEvent[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-test/',
pollIntervalMs: 500,
});
watcher.changeSubject.subscribe((event) => {
if (!Array.isArray(event)) {
events.push(event);
}
});
await watcher.start();
await watcher.readyDeferred.promise;
// Delete the file
await testBucket.fastRemove({ path: 'watcher-test/new-file.txt' });
// Wait for poll to detect the change
await new Promise((resolve) => setTimeout(resolve, 1200));
await watcher.stop();
expect(events.length).toBeGreaterThanOrEqual(1);
const deleteEvent = events.find((e) => e.type === 'delete' && e.key === 'watcher-test/new-file.txt');
expect(deleteEvent).toBeDefined();
});
// ==========================
// Initial State Tests
// ==========================
tap.test('should emit initial state as add events when includeInitial is true', async () => {
// First, create some test files
await testBucket.fastPut({
path: 'watcher-initial/file1.txt',
contents: 'content 1',
});
await testBucket.fastPut({
path: 'watcher-initial/file2.txt',
contents: 'content 2',
});
const events: IS3ChangeEvent[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-initial/',
pollIntervalMs: 10000, // Long interval - we only care about initial events
includeInitial: true,
});
watcher.changeSubject.subscribe((event) => {
if (!Array.isArray(event)) {
events.push(event);
}
});
await watcher.start();
await watcher.readyDeferred.promise;
// Give a moment for events to propagate
await new Promise((resolve) => setTimeout(resolve, 100));
await watcher.stop();
expect(events.length).toEqual(2);
expect(events.every((e) => e.type === 'add')).toBeTrue();
expect(events.some((e) => e.key === 'watcher-initial/file1.txt')).toBeTrue();
expect(events.some((e) => e.key === 'watcher-initial/file2.txt')).toBeTrue();
});
// ==========================
// EventEmitter Pattern Tests
// ==========================
tap.test('should emit events via EventEmitter pattern', async () => {
const events: IS3ChangeEvent[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-emitter/',
pollIntervalMs: 500,
});
watcher.on('change', (event: IS3ChangeEvent) => {
events.push(event);
});
await watcher.start();
await watcher.readyDeferred.promise;
// Create a new file
await testBucket.fastPut({
path: 'watcher-emitter/emitter-test.txt',
contents: 'test content',
});
// Wait for poll to detect the change
await new Promise((resolve) => setTimeout(resolve, 1200));
await watcher.stop();
expect(events.length).toBeGreaterThanOrEqual(1);
expect(events[0].type).toEqual('add');
});
// ==========================
// Buffered Events Tests
// ==========================
tap.test('should buffer events when bufferTimeMs is set', async () => {
const bufferedEvents: (IS3ChangeEvent | IS3ChangeEvent[])[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-buffer/',
pollIntervalMs: 200,
bufferTimeMs: 1000,
});
watcher.changeSubject.subscribe((event) => {
bufferedEvents.push(event);
});
await watcher.start();
await watcher.readyDeferred.promise;
// Create multiple files quickly
await testBucket.fastPut({
path: 'watcher-buffer/file1.txt',
contents: 'content 1',
});
await testBucket.fastPut({
path: 'watcher-buffer/file2.txt',
contents: 'content 2',
});
await testBucket.fastPut({
path: 'watcher-buffer/file3.txt',
contents: 'content 3',
});
// Wait for buffer to emit
await new Promise((resolve) => setTimeout(resolve, 1500));
await watcher.stop();
// With buffering, events should come as arrays
expect(bufferedEvents.length).toBeGreaterThanOrEqual(1);
// At least one buffered emission should contain multiple events
const hasBufferedArray = bufferedEvents.some(
(e) => Array.isArray(e) && e.length >= 1
);
expect(hasBufferedArray).toBeTrue();
});
// ==========================
// Error Handling Tests
// ==========================
tap.test('should emit error events on error', async () => {
const errors: Error[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-error/',
pollIntervalMs: 500,
});
watcher.on('error', (err: Error) => {
errors.push(err);
});
await watcher.start();
await watcher.readyDeferred.promise;
// Note: Triggering actual S3 errors is tricky in tests.
// We just verify the error handler is properly attached.
await watcher.stop();
// No errors expected in normal operation
expect(errors.length).toEqual(0);
});
// ==========================
// Graceful Stop Tests
// ==========================
tap.test('should stop gracefully with stop()', async () => {
const watcher = testBucket.createWatcher({
prefix: 'watcher-stop/',
pollIntervalMs: 100,
});
await watcher.start();
await watcher.readyDeferred.promise;
// Let it poll a few times
await new Promise((resolve) => setTimeout(resolve, 300));
// Stop should complete without errors
await watcher.stop();
// Watcher should not poll after stop
const eventsCaptured: IS3ChangeEvent[] = [];
watcher.on('change', (event: IS3ChangeEvent) => {
eventsCaptured.push(event);
});
// Create a file after stop
await testBucket.fastPut({
path: 'watcher-stop/after-stop.txt',
contents: 'should not be detected',
});
await new Promise((resolve) => setTimeout(resolve, 200));
// No events should be captured after stop
expect(eventsCaptured.length).toEqual(0);
});
tap.test('should stop gracefully with close() alias', async () => {
const watcher = testBucket.createWatcher({
prefix: 'watcher-close/',
pollIntervalMs: 100,
});
await watcher.start();
await watcher.readyDeferred.promise;
// close() should work as alias for stop()
await watcher.close();
});
// ==========================
// Prefix Filtering Tests
// ==========================
tap.test('should only detect changes within specified prefix', async () => {
const events: IS3ChangeEvent[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-prefix-a/',
pollIntervalMs: 500,
});
watcher.changeSubject.subscribe((event) => {
if (!Array.isArray(event)) {
events.push(event);
}
});
await watcher.start();
await watcher.readyDeferred.promise;
// Create file in watched prefix
await testBucket.fastPut({
path: 'watcher-prefix-a/watched.txt',
contents: 'watched content',
});
// Create file outside watched prefix
await testBucket.fastPut({
path: 'watcher-prefix-b/unwatched.txt',
contents: 'unwatched content',
});
// Wait for poll
await new Promise((resolve) => setTimeout(resolve, 1200));
await watcher.stop();
// Should only see the file in watcher-prefix-a/
expect(events.length).toEqual(1);
expect(events[0].key).toEqual('watcher-prefix-a/watched.txt');
});
// ==========================
// Cleanup
// ==========================
tap.test('should clean up test data', async () => {
await testBucket.cleanAllContents();
});
export default tap.start();