Compare commits

...

8 Commits

22 changed files with 5249 additions and 2805 deletions

View File

@@ -1,5 +1,92 @@
# Changelog
## 2026-01-25 - 4.4.0 - feat(watcher)
add polling-based BucketWatcher to detect add/modify/delete events and expose RxJS Observable and EventEmitter APIs
- Add new BucketWatcher implementation (ts/classes.watcher.ts) with polling, buffering and change detection by ETag/Size/LastModified
- Expose createWatcher(...) on Bucket (ts/classes.bucket.ts) and export watcher and interfaces from index (ts/index.ts)
- Add watcher-related interfaces (IS3ChangeEvent, IS3ObjectState, IBucketWatcherOptions) to ts/interfaces.ts
- Comprehensive README/docs additions and examples for BucketWatcher, buffering, options and use cases (readme.md, readme.hints.md)
- Add unit/integration tests for watcher behavior (test/test.watcher.node.ts)
- Non-breaking API additions only — ready for a minor version bump from 4.3.1 to 4.4.0
## 2026-01-24 - 4.3.1 - fix(bucket)
propagate S3 client errors instead of silently logging them; update build script, bump dev/dependencies, and refresh npmextra configuration
- Remove .catch(...) wrappers around s3Client.send so errors are no longer swallowed and will propagate to callers
- Update build script to use 'tsbuild tsfolders --allowimplicitany'
- Bump devDependencies: @git.zone/tsbuild to ^4.1.2, @git.zone/tsrun to ^2.0.1, @git.zone/tstest to ^3.1.6
- Bump dependency @aws-sdk/client-s3 to ^3.975.0
- Adjust npmextra.json structure (@git.zone/cli, @git.zone/tsdoc), add release registries and @ship.zone/szci entry
- Remove pnpm-workspace.yaml onlyBuiltDependencies configuration
## 2025-11-20 - 4.3.0 - feat(listing)
Add memory-efficient listing APIs: async generator, RxJS observable, and cursor pagination; export ListCursor and Minimatch; add minimatch dependency; bump to 4.2.0
- Added memory-efficient listing methods on Bucket: listAllObjects (async generator), listAllObjectsObservable (RxJS Observable), createCursor (returns ListCursor) and listAllObjectsArray (convenience array collector).
- New ListCursor class (ts/classes.listcursor.ts) providing page-based iteration: next(), hasMore(), reset(), getToken()/setToken().
- Added glob matching helper findByGlob(pattern) using minimatch (exported via plugins.Minimatch).
- Exported ListCursor from ts/index.ts and exported Minimatch via ts/plugins.ts.
- Added minimatch dependency in package.json and bumped package version to 4.2.0; increased test timeout to 120s.
- Updated tests to read S3_SECRETKEY, S3_PORT and to assert bucket name from env (test/test.node+deno.ts, test/test.trash.node+deno.ts).
- No breaking changes: new APIs are additive and existing behavior preserved.
## 2025-11-20 - 4.2.0 - feat(listing)
Add memory-efficient listing with async generators, RxJS observables, and cursor pagination for huge buckets
**New Memory-Efficient Listing Methods:**
**Async Generator (Recommended for most use cases):**
- `Bucket.listAllObjects(prefix?)` - Stream object keys one at a time using `for await...of`
- `Bucket.findByGlob(pattern)` - Find objects matching glob patterns (e.g., `**/*.json`, `npm/packages/*/index.json`)
- Memory efficient, supports early termination, composable
**RxJS Observable (For complex reactive pipelines):**
- `Bucket.listAllObjectsObservable(prefix?)` - Emit keys as Observable for use with RxJS operators (filter, map, take, etc.)
- Perfect for complex data transformations and reactive architectures
**Cursor Pattern (For manual pagination control):**
- `Bucket.createCursor(prefix?, options?)` - Create cursor for explicit page-by-page iteration
- `ListCursor.next()` - Fetch next page of results
- `ListCursor.hasMore()` - Check if more results available
- `ListCursor.reset()` - Reset to beginning
- `ListCursor.getToken()` / `ListCursor.setToken()` - Save/restore pagination state
- Ideal for UI pagination and resumable operations
**Convenience Methods:**
- `Bucket.listAllObjectsArray(prefix?)` - Collect all keys into array (WARNING: loads all into memory)
**Benefits:**
- ✅ Memory-efficient streaming for buckets with millions of objects
- ✅ Three patterns for different use cases (generators, observables, cursors)
- ✅ Support for early termination and incremental processing
- ✅ Glob pattern matching with minimatch
- ✅ Full TypeScript support with proper types
- ✅ Zero breaking changes - all new methods
**Dependencies:**
- Added `minimatch` for glob pattern support
**Files Changed:**
- `ts/classes.bucket.ts` - Added all listing methods
- `ts/classes.listcursor.ts` - NEW: Cursor implementation
- `ts/plugins.ts` - Export Minimatch
- `ts/index.ts` - Export ListCursor
- `test/test.listing.node+deno.ts` - NEW: Comprehensive listing tests
- `package.json` - Added minimatch dependency
## 2025-11-20 - 4.1.0 - feat(core)
Add S3 endpoint normalization, directory pagination, improved metadata checks, trash support, and related tests
- Add normalizeS3Descriptor helper to sanitize and normalize various S3 endpoint formats and emit warnings for mismatches (helpers.ts).
- Use normalized endpoint and credentials when constructing S3 client in SmartBucket (classes.smartbucket.ts).
- Implement paginated listing helper listObjectsV2AllPages in Directory and use it for listFiles and listDirectories to aggregate Contents and CommonPrefixes across pages (classes.directory.ts).
- Improve MetaData.hasMetaData to catch NotFound errors and return false instead of throwing (classes.metadata.ts).
- Export metadata and trash modules from index (ts/index.ts) and add a Trash class with utilities for trashed files and key encoding (classes.trash.ts).
- Enhance Bucket operations: fastCopy now preserves or replaces native metadata correctly, cleanAllContents supports paginated deletion, and improved fastExists error handling (classes.bucket.ts).
- Fix Directory.getSubDirectoryByName to construct new Directory instances with the correct parent directory reference.
- Add tests covering metadata absence and pagination behavior (test/test.local.node+deno.ts).
## 2025-11-20 - 4.0.1 - fix(plugins)
Use explicit node: imports for native path and stream modules in ts/plugins.ts

1761
deno.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +1,5 @@
{
"npmci": {
"npmGlobalTools": []
},
"gitzone": {
"@git.zone/cli": {
"projectType": "npm",
"module": {
"githost": "code.foss.global",
@@ -33,9 +30,19 @@
"data management",
"streaming"
]
},
"release": {
"registries": [
"https://verdaccio.lossless.digital",
"https://registry.npmjs.org"
],
"accessLevel": "public"
}
},
"tsdoc": {
"@git.zone/tsdoc": {
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. \n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or if you require further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
},
"@ship.zone/szci": {
"npmGlobalTools": []
}
}

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartbucket",
"version": "4.0.1",
"version": "4.4.0",
"description": "A TypeScript library providing a cloud-agnostic interface for managing object storage with functionalities like bucket management, file and directory operations, and advanced features such as metadata handling and file locking.",
"main": "dist_ts/index.js",
"typings": "dist_ts/index.d.ts",
@@ -8,18 +8,18 @@
"author": "Task Venture Capital GmbH",
"license": "MIT",
"scripts": {
"test": "(tstest test/ --verbose --logfile --timeout 60)",
"build": "(tsbuild --web --allowimplicitany)"
"test": "(tstest test/ --verbose --logfile --timeout 120)",
"build": "(tsbuild tsfolders --allowimplicitany)"
},
"devDependencies": {
"@git.zone/tsbuild": "^3.1.0",
"@git.zone/tsrun": "^2.0.0",
"@git.zone/tstest": "^3.0.1",
"@git.zone/tsbuild": "^4.1.2",
"@git.zone/tsrun": "^2.0.1",
"@git.zone/tstest": "^3.1.6",
"@push.rocks/qenv": "^6.1.3",
"@push.rocks/tapbundle": "^6.0.3"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.936.0",
"@aws-sdk/client-s3": "^3.975.0",
"@push.rocks/smartmime": "^2.0.4",
"@push.rocks/smartpath": "^6.0.0",
"@push.rocks/smartpromise": "^4.2.3",
@@ -27,7 +27,8 @@
"@push.rocks/smartstream": "^3.2.5",
"@push.rocks/smartstring": "^4.1.0",
"@push.rocks/smartunique": "^3.0.9",
"@tsclass/tsclass": "^9.3.0"
"@tsclass/tsclass": "^9.3.0",
"minimatch": "^10.1.1"
},
"private": false,
"files": [

3928
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +0,0 @@
onlyBuiltDependencies:
- esbuild
- mongodb-memory-server
- puppeteer

View File

@@ -3,3 +3,4 @@
* **Use exists() methods to check before getting**: `bucketExists`, `fileExists`, `directoryExists`, `fastExists`
* **No *Strict methods**: All removed (fastPutStrict, getBucketByNameStrict, getFileStrict, getSubDirectoryByNameStrict)
* metadata is handled though the MetaData class. Important!
* **BucketWatcher** - Polling-based S3 change watcher (`bucket.createWatcher()`). Detects add/modify/delete via ETag/Size/LastModified comparison. Supports RxJS Observable pattern (`changeSubject`) and EventEmitter pattern (`on('change')`). Options: `prefix`, `pollIntervalMs`, `bufferTimeMs`, `includeInitial`, `pageSize`.

764
readme.md

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,298 @@
// test.listing.node+deno.ts - Tests for memory-efficient listing methods
import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as smartbucket from '../ts/index.js';
// Get test configuration
import * as qenv from '@push.rocks/qenv';
const testQenv = new qenv.Qenv('./', './.nogit/');
// Test bucket reference
let testBucket: smartbucket.Bucket;
let testSmartbucket: smartbucket.SmartBucket;
// Setup: Create test bucket and populate with test data
tap.test('should create valid smartbucket and bucket', async () => {
testSmartbucket = new smartbucket.SmartBucket({
accessKey: await testQenv.getEnvVarOnDemand('S3_ACCESSKEY'),
accessSecret: await testQenv.getEnvVarOnDemand('S3_SECRETKEY'),
endpoint: await testQenv.getEnvVarOnDemand('S3_ENDPOINT'),
port: parseInt(await testQenv.getEnvVarOnDemand('S3_PORT')),
useSsl: false,
});
testBucket = await smartbucket.Bucket.getBucketByName(
testSmartbucket,
await testQenv.getEnvVarOnDemand('S3_BUCKET')
);
expect(testBucket).toBeInstanceOf(smartbucket.Bucket);
});
tap.test('should clean bucket and create test data for listing tests', async () => {
// Clean bucket first
await testBucket.cleanAllContents();
// Create test structure:
// npm/packages/foo/index.json
// npm/packages/foo/1.0.0.tgz
// npm/packages/bar/index.json
// npm/packages/bar/2.0.0.tgz
// oci/blobs/sha256-abc.tar
// oci/blobs/sha256-def.tar
// oci/manifests/latest.json
// docs/readme.md
// docs/api.md
const testFiles = [
'npm/packages/foo/index.json',
'npm/packages/foo/1.0.0.tgz',
'npm/packages/bar/index.json',
'npm/packages/bar/2.0.0.tgz',
'oci/blobs/sha256-abc.tar',
'oci/blobs/sha256-def.tar',
'oci/manifests/latest.json',
'docs/readme.md',
'docs/api.md',
];
for (const filePath of testFiles) {
await testBucket.fastPut({
path: filePath,
contents: `test content for ${filePath}`,
});
}
});
// ==========================
// Async Generator Tests
// ==========================
tap.test('listAllObjects should iterate all objects with prefix', async () => {
const keys: string[] = [];
for await (const key of testBucket.listAllObjects('npm/')) {
keys.push(key);
}
expect(keys.length).toEqual(4);
expect(keys).toContain('npm/packages/foo/index.json');
expect(keys).toContain('npm/packages/bar/2.0.0.tgz');
});
tap.test('listAllObjects should support early termination', async () => {
let count = 0;
for await (const key of testBucket.listAllObjects('')) {
count++;
if (count >= 3) break; // Early exit
}
expect(count).toEqual(3);
});
tap.test('listAllObjects without prefix should list all objects', async () => {
const keys: string[] = [];
for await (const key of testBucket.listAllObjects()) {
keys.push(key);
}
expect(keys.length).toBeGreaterThanOrEqual(9);
});
// ==========================
// Observable Tests
// ==========================
tap.test('listAllObjectsObservable should emit all objects', async () => {
const keys: string[] = [];
await new Promise<void>((resolve, reject) => {
testBucket.listAllObjectsObservable('oci/')
.subscribe({
next: (key) => keys.push(key),
error: (err) => reject(err),
complete: () => resolve(),
});
});
expect(keys.length).toEqual(3);
expect(keys).toContain('oci/blobs/sha256-abc.tar');
expect(keys).toContain('oci/manifests/latest.json');
});
tap.test('listAllObjectsObservable should support RxJS operators', async () => {
const jsonFiles: string[] = [];
await new Promise<void>((resolve, reject) => {
testBucket.listAllObjectsObservable('npm/')
.subscribe({
next: (key: string) => {
if (key.endsWith('.json')) {
jsonFiles.push(key);
}
},
error: (err: any) => reject(err),
complete: () => resolve(),
});
});
expect(jsonFiles.length).toEqual(2);
expect(jsonFiles.every((k) => k.endsWith('.json'))).toBeTrue();
});
// ==========================
// Cursor Tests
// ==========================
tap.test('createCursor should allow manual pagination', async () => {
const cursor = testBucket.createCursor('npm/', { pageSize: 2 });
// First page
const page1 = await cursor.next();
expect(page1.keys.length).toEqual(2);
expect(page1.done).toBeFalse();
// Second page
const page2 = await cursor.next();
expect(page2.keys.length).toEqual(2);
expect(page2.done).toBeTrue();
});
tap.test('cursor.hasMore() should accurately track state', async () => {
const cursor = testBucket.createCursor('docs/', { pageSize: 10 });
expect(cursor.hasMore()).toBeTrue();
await cursor.next(); // Should get all docs files
expect(cursor.hasMore()).toBeFalse();
});
tap.test('cursor.reset() should allow re-iteration', async () => {
const cursor = testBucket.createCursor('docs/');
const firstRun = await cursor.next();
expect(firstRun.keys.length).toBeGreaterThan(0);
cursor.reset();
expect(cursor.hasMore()).toBeTrue();
const secondRun = await cursor.next();
expect(secondRun.keys).toEqual(firstRun.keys);
});
tap.test('cursor should support save/restore with token', async () => {
const cursor1 = testBucket.createCursor('npm/', { pageSize: 2 });
await cursor1.next(); // Advance cursor
const token = cursor1.getToken();
expect(token).toBeDefined();
// Create new cursor and restore state
const cursor2 = testBucket.createCursor('npm/', { pageSize: 2 });
cursor2.setToken(token);
const page = await cursor2.next();
expect(page.keys.length).toBeGreaterThan(0);
});
// ==========================
// findByGlob Tests
// ==========================
tap.test('findByGlob should match simple patterns', async () => {
const matches: string[] = [];
for await (const key of testBucket.findByGlob('**/*.json')) {
matches.push(key);
}
expect(matches.length).toEqual(3); // foo/index.json, bar/index.json, latest.json
expect(matches.every((k) => k.endsWith('.json'))).toBeTrue();
});
tap.test('findByGlob should match specific path patterns', async () => {
const matches: string[] = [];
for await (const key of testBucket.findByGlob('npm/packages/*/index.json')) {
matches.push(key);
}
expect(matches.length).toEqual(2);
expect(matches).toContain('npm/packages/foo/index.json');
expect(matches).toContain('npm/packages/bar/index.json');
});
tap.test('findByGlob should match wildcard patterns', async () => {
const matches: string[] = [];
for await (const key of testBucket.findByGlob('oci/blobs/*')) {
matches.push(key);
}
expect(matches.length).toEqual(2);
expect(matches.every((k) => k.startsWith('oci/blobs/'))).toBeTrue();
});
// ==========================
// listAllObjectsArray Tests
// ==========================
tap.test('listAllObjectsArray should collect all keys into array', async () => {
const keys = await testBucket.listAllObjectsArray('docs/');
expect(Array.isArray(keys)).toBeTrue();
expect(keys.length).toEqual(2);
expect(keys).toContain('docs/readme.md');
expect(keys).toContain('docs/api.md');
});
tap.test('listAllObjectsArray without prefix should return all objects', async () => {
const keys = await testBucket.listAllObjectsArray();
expect(keys.length).toBeGreaterThanOrEqual(9);
});
// ==========================
// Performance/Edge Case Tests
// ==========================
tap.test('should handle empty prefix results gracefully', async () => {
const keys: string[] = [];
for await (const key of testBucket.listAllObjects('nonexistent/')) {
keys.push(key);
}
expect(keys.length).toEqual(0);
});
tap.test('cursor should handle empty results', async () => {
const cursor = testBucket.createCursor('nonexistent/');
const result = await cursor.next();
expect(result.keys.length).toEqual(0);
expect(result.done).toBeTrue();
expect(cursor.hasMore()).toBeFalse();
});
tap.test('observable should complete immediately on empty results', async () => {
let completed = false;
let count = 0;
await new Promise<void>((resolve, reject) => {
testBucket.listAllObjectsObservable('nonexistent/')
.subscribe({
next: () => count++,
error: (err) => reject(err),
complete: () => {
completed = true;
resolve();
},
});
});
expect(count).toEqual(0);
expect(completed).toBeTrue();
});
// Cleanup
tap.test('should clean up test data', async () => {
await testBucket.cleanAllContents();
});
export default tap.start();

View File

@@ -0,0 +1,76 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as plugins from '../ts/plugins.js';
import * as smartbucket from '../ts/index.js';
class FakeS3Client {
private callIndex = 0;
constructor(private readonly pages: Array<Partial<plugins.s3.ListObjectsV2Output>>) {}
public async send(_command: any) {
const page = this.pages[this.callIndex] || { Contents: [], CommonPrefixes: [], IsTruncated: false };
this.callIndex += 1;
return page;
}
}
tap.test('MetaData.hasMetaData should return false when metadata file does not exist', async () => {
const fakeFile = {
name: 'file.txt',
parentDirectoryRef: {
async getFile() {
throw new Error(`File not found at path 'file.txt.metadata'`);
},
},
} as unknown as smartbucket.File;
const hasMetaData = await smartbucket.MetaData.hasMetaData({ file: fakeFile });
expect(hasMetaData).toBeFalse();
});
tap.test('getSubDirectoryByName should create correct parent chain for new nested directories', async () => {
const fakeSmartbucket = { s3Client: new FakeS3Client([{ Contents: [], CommonPrefixes: [] }]) } as unknown as smartbucket.SmartBucket;
const bucket = new smartbucket.Bucket(fakeSmartbucket, 'test-bucket');
const baseDirectory = new smartbucket.Directory(bucket, null as any, '');
const nestedDirectory = await baseDirectory.getSubDirectoryByName('level1/level2', { getEmptyDirectory: true });
expect(nestedDirectory.name).toEqual('level2');
expect(nestedDirectory.parentDirectoryRef.name).toEqual('level1');
expect(nestedDirectory.getBasePath()).toEqual('level1/level2/');
});
tap.test('listFiles should aggregate results across paginated ListObjectsV2 responses', async () => {
const firstPage = {
Contents: Array.from({ length: 1000 }, (_, index) => ({ Key: `file-${index}` })),
IsTruncated: true,
NextContinuationToken: 'token-1',
};
const secondPage = {
Contents: Array.from({ length: 200 }, (_, index) => ({ Key: `file-${1000 + index}` })),
IsTruncated: false,
};
const fakeSmartbucket = { s3Client: new FakeS3Client([firstPage, secondPage]) } as unknown as smartbucket.SmartBucket;
const bucket = new smartbucket.Bucket(fakeSmartbucket, 'test-bucket');
const baseDirectory = new smartbucket.Directory(bucket, null as any, '');
const files = await baseDirectory.listFiles();
expect(files.length).toEqual(1200);
});
tap.test('listDirectories should aggregate CommonPrefixes across pagination', async () => {
const fakeSmartbucket = {
s3Client: new FakeS3Client([
{ CommonPrefixes: [{ Prefix: 'dirA/' }], IsTruncated: true, NextContinuationToken: 'token-1' },
{ CommonPrefixes: [{ Prefix: 'dirB/' }], IsTruncated: false },
]),
} as unknown as smartbucket.SmartBucket;
const bucket = new smartbucket.Bucket(fakeSmartbucket, 'test-bucket');
const baseDirectory = new smartbucket.Directory(bucket, null as any, '');
const directories = await baseDirectory.listDirectories();
expect(directories.map((d) => d.name)).toEqual(['dirA', 'dirB']);
});
export default tap.start();

View File

@@ -12,13 +12,16 @@ let baseDirectory: smartbucket.Directory;
tap.test('should create a valid smartbucket', async () => {
testSmartbucket = new smartbucket.SmartBucket({
accessKey: await testQenv.getEnvVarOnDemandStrict('S3_ACCESSKEY'),
accessSecret: await testQenv.getEnvVarOnDemandStrict('S3_ACCESSSECRET'),
accessSecret: await testQenv.getEnvVarOnDemandStrict('S3_SECRETKEY'),
endpoint: await testQenv.getEnvVarOnDemandStrict('S3_ENDPOINT'),
port: parseInt(await testQenv.getEnvVarOnDemandStrict('S3_PORT')),
useSsl: false,
});
expect(testSmartbucket).toBeInstanceOf(smartbucket.SmartBucket);
myBucket = await testSmartbucket.getBucketByName(await testQenv.getEnvVarOnDemandStrict('S3_BUCKET'),);
const bucketName = await testQenv.getEnvVarOnDemandStrict('S3_BUCKET');
myBucket = await testSmartbucket.getBucketByName(bucketName);
expect(myBucket).toBeInstanceOf(smartbucket.Bucket);
expect(myBucket.name).toEqual('test-pushrocks-smartbucket');
expect(myBucket.name).toEqual(bucketName);
});
tap.test('should clean all contents', async () => {

View File

@@ -13,13 +13,15 @@ let baseDirectory: smartbucket.Directory;
tap.test('should create a valid smartbucket', async () => {
testSmartbucket = new smartbucket.SmartBucket({
accessKey: await testQenv.getEnvVarOnDemandStrict('S3_ACCESSKEY'),
accessSecret: await testQenv.getEnvVarOnDemandStrict('S3_ACCESSSECRET'),
accessSecret: await testQenv.getEnvVarOnDemandStrict('S3_SECRETKEY'),
endpoint: await testQenv.getEnvVarOnDemandStrict('S3_ENDPOINT'),
port: parseInt(await testQenv.getEnvVarOnDemandStrict('S3_PORT')),
useSsl: false,
});
expect(testSmartbucket).toBeInstanceOf(smartbucket.SmartBucket);
myBucket = await testSmartbucket.getBucketByName(await testQenv.getEnvVarOnDemandStrict('S3_BUCKET'),);
const bucketName = await testQenv.getEnvVarOnDemandStrict('S3_BUCKET');
myBucket = await testSmartbucket.getBucketByName(bucketName);
expect(myBucket).toBeInstanceOf(smartbucket.Bucket);
expect(myBucket.name).toEqual('test-pushrocks-smartbucket');
});
tap.test('should clean all contents', async () => {

410
test/test.watcher.node.ts Normal file
View File

@@ -0,0 +1,410 @@
// test.watcher.node.ts - Tests for BucketWatcher
import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as smartbucket from '../ts/index.js';
import type { IS3ChangeEvent } from '../ts/interfaces.js';
// Get test configuration
import * as qenv from '@push.rocks/qenv';
const testQenv = new qenv.Qenv('./', './.nogit/');
// Test bucket reference
let testBucket: smartbucket.Bucket;
let testSmartbucket: smartbucket.SmartBucket;
// Setup: Create test bucket
tap.test('should create valid smartbucket and bucket', async () => {
testSmartbucket = new smartbucket.SmartBucket({
accessKey: await testQenv.getEnvVarOnDemand('S3_ACCESSKEY'),
accessSecret: await testQenv.getEnvVarOnDemand('S3_SECRETKEY'),
endpoint: await testQenv.getEnvVarOnDemand('S3_ENDPOINT'),
port: parseInt(await testQenv.getEnvVarOnDemand('S3_PORT')),
useSsl: false,
});
testBucket = await smartbucket.Bucket.getBucketByName(
testSmartbucket,
await testQenv.getEnvVarOnDemand('S3_BUCKET')
);
expect(testBucket).toBeInstanceOf(smartbucket.Bucket);
});
tap.test('should clean bucket for watcher tests', async () => {
await testBucket.cleanAllContents();
});
// ==========================
// Basic Watcher Tests
// ==========================
tap.test('should create watcher with default options', async () => {
const watcher = testBucket.createWatcher();
expect(watcher).toBeInstanceOf(smartbucket.BucketWatcher);
expect(watcher.changeSubject).toBeDefined();
});
tap.test('should create watcher with custom options', async () => {
const watcher = testBucket.createWatcher({
prefix: 'test/',
pollIntervalMs: 2000,
includeInitial: true,
});
expect(watcher).toBeInstanceOf(smartbucket.BucketWatcher);
});
// ==========================
// Add Event Detection Tests
// ==========================
tap.test('should detect add events for new files', async () => {
const events: IS3ChangeEvent[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-test/',
pollIntervalMs: 500,
});
watcher.changeSubject.subscribe((event) => {
if (!Array.isArray(event)) {
events.push(event);
}
});
await watcher.start();
await watcher.readyDeferred.promise;
// Create a new file
await testBucket.fastPut({
path: 'watcher-test/new-file.txt',
contents: 'test content',
});
// Wait for poll to detect the change
await new Promise((resolve) => setTimeout(resolve, 1200));
await watcher.stop();
expect(events.length).toBeGreaterThanOrEqual(1);
const addEvent = events.find((e) => e.type === 'add' && e.key === 'watcher-test/new-file.txt');
expect(addEvent).toBeDefined();
expect(addEvent!.bucket).toEqual(testBucket.name);
});
// ==========================
// Modify Event Detection Tests
// ==========================
tap.test('should detect modify events for changed files', async () => {
const events: IS3ChangeEvent[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-test/',
pollIntervalMs: 500,
});
watcher.changeSubject.subscribe((event) => {
if (!Array.isArray(event)) {
events.push(event);
}
});
await watcher.start();
await watcher.readyDeferred.promise;
// Modify the file
await testBucket.fastPut({
path: 'watcher-test/new-file.txt',
contents: 'modified content with different size',
overwrite: true,
});
// Wait for poll to detect the change
await new Promise((resolve) => setTimeout(resolve, 1200));
await watcher.stop();
expect(events.length).toBeGreaterThanOrEqual(1);
const modifyEvent = events.find((e) => e.type === 'modify' && e.key === 'watcher-test/new-file.txt');
expect(modifyEvent).toBeDefined();
});
// ==========================
// Delete Event Detection Tests
// ==========================
tap.test('should detect delete events for removed files', async () => {
const events: IS3ChangeEvent[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-test/',
pollIntervalMs: 500,
});
watcher.changeSubject.subscribe((event) => {
if (!Array.isArray(event)) {
events.push(event);
}
});
await watcher.start();
await watcher.readyDeferred.promise;
// Delete the file
await testBucket.fastRemove({ path: 'watcher-test/new-file.txt' });
// Wait for poll to detect the change
await new Promise((resolve) => setTimeout(resolve, 1200));
await watcher.stop();
expect(events.length).toBeGreaterThanOrEqual(1);
const deleteEvent = events.find((e) => e.type === 'delete' && e.key === 'watcher-test/new-file.txt');
expect(deleteEvent).toBeDefined();
});
// ==========================
// Initial State Tests
// ==========================
tap.test('should emit initial state as add events when includeInitial is true', async () => {
// First, create some test files
await testBucket.fastPut({
path: 'watcher-initial/file1.txt',
contents: 'content 1',
});
await testBucket.fastPut({
path: 'watcher-initial/file2.txt',
contents: 'content 2',
});
const events: IS3ChangeEvent[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-initial/',
pollIntervalMs: 10000, // Long interval - we only care about initial events
includeInitial: true,
});
watcher.changeSubject.subscribe((event) => {
if (!Array.isArray(event)) {
events.push(event);
}
});
await watcher.start();
await watcher.readyDeferred.promise;
// Give a moment for events to propagate
await new Promise((resolve) => setTimeout(resolve, 100));
await watcher.stop();
expect(events.length).toEqual(2);
expect(events.every((e) => e.type === 'add')).toBeTrue();
expect(events.some((e) => e.key === 'watcher-initial/file1.txt')).toBeTrue();
expect(events.some((e) => e.key === 'watcher-initial/file2.txt')).toBeTrue();
});
// ==========================
// EventEmitter Pattern Tests
// ==========================
tap.test('should emit events via EventEmitter pattern', async () => {
const events: IS3ChangeEvent[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-emitter/',
pollIntervalMs: 500,
});
watcher.on('change', (event: IS3ChangeEvent) => {
events.push(event);
});
await watcher.start();
await watcher.readyDeferred.promise;
// Create a new file
await testBucket.fastPut({
path: 'watcher-emitter/emitter-test.txt',
contents: 'test content',
});
// Wait for poll to detect the change
await new Promise((resolve) => setTimeout(resolve, 1200));
await watcher.stop();
expect(events.length).toBeGreaterThanOrEqual(1);
expect(events[0].type).toEqual('add');
});
// ==========================
// Buffered Events Tests
// ==========================
tap.test('should buffer events when bufferTimeMs is set', async () => {
const bufferedEvents: (IS3ChangeEvent | IS3ChangeEvent[])[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-buffer/',
pollIntervalMs: 200,
bufferTimeMs: 1000,
});
watcher.changeSubject.subscribe((event) => {
bufferedEvents.push(event);
});
await watcher.start();
await watcher.readyDeferred.promise;
// Create multiple files quickly
await testBucket.fastPut({
path: 'watcher-buffer/file1.txt',
contents: 'content 1',
});
await testBucket.fastPut({
path: 'watcher-buffer/file2.txt',
contents: 'content 2',
});
await testBucket.fastPut({
path: 'watcher-buffer/file3.txt',
contents: 'content 3',
});
// Wait for buffer to emit
await new Promise((resolve) => setTimeout(resolve, 1500));
await watcher.stop();
// With buffering, events should come as arrays
expect(bufferedEvents.length).toBeGreaterThanOrEqual(1);
// At least one buffered emission should contain multiple events
const hasBufferedArray = bufferedEvents.some(
(e) => Array.isArray(e) && e.length >= 1
);
expect(hasBufferedArray).toBeTrue();
});
// ==========================
// Error Handling Tests
// ==========================
tap.test('should emit error events on error', async () => {
const errors: Error[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-error/',
pollIntervalMs: 500,
});
watcher.on('error', (err: Error) => {
errors.push(err);
});
await watcher.start();
await watcher.readyDeferred.promise;
// Note: Triggering actual S3 errors is tricky in tests.
// We just verify the error handler is properly attached.
await watcher.stop();
// No errors expected in normal operation
expect(errors.length).toEqual(0);
});
// ==========================
// Graceful Stop Tests
// ==========================
tap.test('should stop gracefully with stop()', async () => {
const watcher = testBucket.createWatcher({
prefix: 'watcher-stop/',
pollIntervalMs: 100,
});
await watcher.start();
await watcher.readyDeferred.promise;
// Let it poll a few times
await new Promise((resolve) => setTimeout(resolve, 300));
// Stop should complete without errors
await watcher.stop();
// Watcher should not poll after stop
const eventsCaptured: IS3ChangeEvent[] = [];
watcher.on('change', (event: IS3ChangeEvent) => {
eventsCaptured.push(event);
});
// Create a file after stop
await testBucket.fastPut({
path: 'watcher-stop/after-stop.txt',
contents: 'should not be detected',
});
await new Promise((resolve) => setTimeout(resolve, 200));
// No events should be captured after stop
expect(eventsCaptured.length).toEqual(0);
});
tap.test('should stop gracefully with close() alias', async () => {
const watcher = testBucket.createWatcher({
prefix: 'watcher-close/',
pollIntervalMs: 100,
});
await watcher.start();
await watcher.readyDeferred.promise;
// close() should work as alias for stop()
await watcher.close();
});
// ==========================
// Prefix Filtering Tests
// ==========================
tap.test('should only detect changes within specified prefix', async () => {
const events: IS3ChangeEvent[] = [];
const watcher = testBucket.createWatcher({
prefix: 'watcher-prefix-a/',
pollIntervalMs: 500,
});
watcher.changeSubject.subscribe((event) => {
if (!Array.isArray(event)) {
events.push(event);
}
});
await watcher.start();
await watcher.readyDeferred.promise;
// Create file in watched prefix
await testBucket.fastPut({
path: 'watcher-prefix-a/watched.txt',
contents: 'watched content',
});
// Create file outside watched prefix
await testBucket.fastPut({
path: 'watcher-prefix-b/unwatched.txt',
contents: 'unwatched content',
});
// Wait for poll
await new Promise((resolve) => setTimeout(resolve, 1200));
await watcher.stop();
// Should only see the file in watcher-prefix-a/
expect(events.length).toEqual(1);
expect(events[0].key).toEqual('watcher-prefix-a/watched.txt');
});
// ==========================
// Cleanup
// ==========================
tap.test('should clean up test data', async () => {
await testBucket.cleanAllContents();
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartbucket',
version: '4.0.1',
version: '4.4.0',
description: 'A TypeScript library providing a cloud-agnostic interface for managing object storage with functionalities like bucket management, file and directory operations, and advanced features such as metadata handling and file locking.'
}

View File

@@ -7,6 +7,8 @@ import { SmartBucket } from './classes.smartbucket.js';
import { Directory } from './classes.directory.js';
import { File } from './classes.file.js';
import { Trash } from './classes.trash.js';
import { ListCursor, type IListCursorOptions } from './classes.listcursor.js';
import { BucketWatcher } from './classes.watcher.js';
/**
* The bucket class exposes the basic functionality of a bucket.
@@ -30,13 +32,13 @@ export class Bucket {
public static async createBucketByName(smartbucketRef: SmartBucket, bucketName: string) {
const command = new plugins.s3.CreateBucketCommand({ Bucket: bucketName });
await smartbucketRef.s3Client.send(command).catch((e) => console.log(e));
await smartbucketRef.s3Client.send(command);
return new Bucket(smartbucketRef, bucketName);
}
public static async removeBucketByName(smartbucketRef: SmartBucket, bucketName: string) {
const command = new plugins.s3.DeleteBucketCommand({ Bucket: bucketName });
await smartbucketRef.s3Client.send(command).catch((e) => console.log(e));
await smartbucketRef.s3Client.send(command);
}
public smartbucketRef: SmartBucket;
@@ -469,6 +471,162 @@ export class Bucket {
}
}
// ==========================================
// Memory-Efficient Listing Methods (Phase 1)
// ==========================================
/**
* List all objects with a given prefix using async generator (memory-efficient streaming)
* @param prefix - Optional prefix to filter objects (default: '' for all objects)
* @yields Object keys one at a time
* @example
* ```ts
* for await (const key of bucket.listAllObjects('npm/')) {
* console.log(key);
* if (shouldStop) break; // Early exit supported
* }
* ```
*/
public async *listAllObjects(prefix: string = ''): AsyncIterableIterator<string> {
let continuationToken: string | undefined;
do {
const command = new plugins.s3.ListObjectsV2Command({
Bucket: this.name,
Prefix: prefix,
ContinuationToken: continuationToken,
});
const response = await this.smartbucketRef.s3Client.send(command);
for (const obj of response.Contents || []) {
if (obj.Key) yield obj.Key;
}
continuationToken = response.NextContinuationToken;
} while (continuationToken);
}
/**
* List all objects as an RxJS Observable (for complex reactive pipelines)
* @param prefix - Optional prefix to filter objects (default: '' for all objects)
* @returns Observable that emits object keys
* @example
* ```ts
* bucket.listAllObjectsObservable('npm/')
* .pipe(
* filter(key => key.endsWith('.json')),
* take(100)
* )
* .subscribe(key => console.log(key));
* ```
*/
public listAllObjectsObservable(prefix: string = ''): plugins.smartrx.rxjs.Observable<string> {
return new plugins.smartrx.rxjs.Observable<string>((subscriber) => {
const fetchPage = async (token?: string) => {
try {
const command = new plugins.s3.ListObjectsV2Command({
Bucket: this.name,
Prefix: prefix,
ContinuationToken: token,
});
const response = await this.smartbucketRef.s3Client.send(command);
for (const obj of response.Contents || []) {
if (obj.Key) subscriber.next(obj.Key);
}
if (response.NextContinuationToken) {
await fetchPage(response.NextContinuationToken);
} else {
subscriber.complete();
}
} catch (error) {
subscriber.error(error);
}
};
fetchPage();
});
}
/**
* Create a cursor for manual pagination control
* @param prefix - Optional prefix to filter objects (default: '' for all objects)
* @param options - Cursor options (pageSize, etc.)
* @returns ListCursor instance
* @example
* ```ts
* const cursor = bucket.createCursor('npm/', { pageSize: 500 });
* while (cursor.hasMore()) {
* const { keys, done } = await cursor.next();
* console.log(`Processing ${keys.length} keys...`);
* }
* ```
*/
public createCursor(prefix: string = '', options?: IListCursorOptions): ListCursor {
return new ListCursor(this, prefix, options);
}
/**
* Create a watcher for monitoring bucket changes (add/modify/delete)
* @param options - Watcher options (prefix, pollIntervalMs, etc.)
* @returns BucketWatcher instance
* @example
* ```ts
* const watcher = bucket.createWatcher({ prefix: 'uploads/', pollIntervalMs: 3000 });
* watcher.changeSubject.subscribe((change) => console.log('Change:', change));
* await watcher.start();
* // ... later
* await watcher.stop();
* ```
*/
public createWatcher(options?: interfaces.IBucketWatcherOptions): BucketWatcher {
return new BucketWatcher(this, options);
}
// ==========================================
// High-Level Listing Helpers (Phase 2)
// ==========================================
/**
* Find objects matching a glob pattern (memory-efficient)
* @param pattern - Glob pattern (e.g., "**\/*.json", "npm/packages/*\/index.json")
* @yields Matching object keys
* @example
* ```ts
* for await (const key of bucket.findByGlob('npm/packages/*\/index.json')) {
* console.log('Found package index:', key);
* }
* ```
*/
public async *findByGlob(pattern: string): AsyncIterableIterator<string> {
const matcher = new plugins.Minimatch(pattern);
for await (const key of this.listAllObjects('')) {
if (matcher.match(key)) yield key;
}
}
/**
* List all objects and collect into an array (convenience method)
* WARNING: Loads entire result set into memory. Use listAllObjects() generator for large buckets.
* @param prefix - Optional prefix to filter objects (default: '' for all objects)
* @returns Array of all object keys
* @example
* ```ts
* const allKeys = await bucket.listAllObjectsArray('npm/');
* console.log(`Found ${allKeys.length} objects`);
* ```
*/
public async listAllObjectsArray(prefix: string = ''): Promise<string[]> {
const keys: string[] = [];
for await (const key of this.listAllObjects(prefix)) {
keys.push(key);
}
return keys;
}
public async cleanAllContents(): Promise<void> {
try {
// Define the command type explicitly

View File

@@ -120,19 +120,44 @@ export class Directory {
return directories.some(dir => dir.name === dirNameArg);
}
/**
* Collects all ListObjectsV2 pages for a prefix.
*/
private async listObjectsV2AllPages(prefix: string, delimiter?: string) {
const allContents: plugins.s3._Object[] = [];
const allCommonPrefixes: plugins.s3.CommonPrefix[] = [];
let continuationToken: string | undefined;
do {
const command = new plugins.s3.ListObjectsV2Command({
Bucket: this.bucketRef.name,
Prefix: prefix,
Delimiter: delimiter,
ContinuationToken: continuationToken,
});
const response = await this.bucketRef.smartbucketRef.s3Client.send(command);
if (response.Contents) {
allContents.push(...response.Contents);
}
if (response.CommonPrefixes) {
allCommonPrefixes.push(...response.CommonPrefixes);
}
continuationToken = response.IsTruncated ? response.NextContinuationToken : undefined;
} while (continuationToken);
return { contents: allContents, commonPrefixes: allCommonPrefixes };
}
/**
* lists all files
*/
public async listFiles(): Promise<File[]> {
const command = new plugins.s3.ListObjectsV2Command({
Bucket: this.bucketRef.name,
Prefix: this.getBasePath(),
Delimiter: '/',
});
const response = await this.bucketRef.smartbucketRef.s3Client.send(command);
const { contents } = await this.listObjectsV2AllPages(this.getBasePath(), '/');
const fileArray: File[] = [];
response.Contents?.forEach((item) => {
contents.forEach((item) => {
if (item.Key && !item.Key.endsWith('/')) {
const subtractedPath = item.Key.replace(this.getBasePath(), '');
if (!subtractedPath.includes('/')) {
@@ -154,16 +179,11 @@ export class Directory {
*/
public async listDirectories(): Promise<Directory[]> {
try {
const command = new plugins.s3.ListObjectsV2Command({
Bucket: this.bucketRef.name,
Prefix: this.getBasePath(),
Delimiter: '/',
});
const response = await this.bucketRef.smartbucketRef.s3Client.send(command);
const { commonPrefixes } = await this.listObjectsV2AllPages(this.getBasePath(), '/');
const directoryArray: Directory[] = [];
if (response.CommonPrefixes) {
response.CommonPrefixes.forEach((item) => {
if (commonPrefixes) {
commonPrefixes.forEach((item) => {
if (item.Prefix) {
const subtractedPath = item.Prefix.replace(this.getBasePath(), '');
if (subtractedPath.endsWith('/')) {
@@ -235,7 +255,7 @@ export class Directory {
return returnDirectory;
}
if (optionsArg.getEmptyDirectory || optionsArg.createWithInitializerFile) {
returnDirectory = new Directory(this.bucketRef, this, dirNameToSearch);
returnDirectory = new Directory(this.bucketRef, directoryArg, dirNameToSearch);
}
if (isFinalDirectory && optionsArg.createWithInitializerFile) {
returnDirectory?.createEmptyFile('00init.txt');

89
ts/classes.listcursor.ts Normal file
View File

@@ -0,0 +1,89 @@
// classes.listcursor.ts
import * as plugins from './plugins.js';
import type { Bucket } from './classes.bucket.js';
export interface IListCursorOptions {
pageSize?: number;
}
export interface IListCursorResult {
keys: string[];
done: boolean;
}
/**
* ListCursor provides explicit pagination control for listing objects in a bucket.
* Useful for UI pagination, resumable operations, and manual batch processing.
*/
export class ListCursor {
private continuationToken?: string;
private exhausted = false;
private pageSize: number;
constructor(
private bucket: Bucket,
private prefix: string,
options: IListCursorOptions = {}
) {
this.pageSize = options.pageSize || 1000;
}
/**
* Fetch the next page of object keys
* @returns Object with keys array and done flag
*/
public async next(): Promise<IListCursorResult> {
if (this.exhausted) {
return { keys: [], done: true };
}
const command = new plugins.s3.ListObjectsV2Command({
Bucket: this.bucket.name,
Prefix: this.prefix,
MaxKeys: this.pageSize,
ContinuationToken: this.continuationToken,
});
const response = await this.bucket.smartbucketRef.s3Client.send(command);
const keys = (response.Contents || [])
.map((obj) => obj.Key)
.filter((key): key is string => !!key);
this.continuationToken = response.NextContinuationToken;
this.exhausted = !this.continuationToken;
return { keys, done: this.exhausted };
}
/**
* Check if there are more pages to fetch
*/
public hasMore(): boolean {
return !this.exhausted;
}
/**
* Reset the cursor to start from the beginning
*/
public reset(): void {
this.continuationToken = undefined;
this.exhausted = false;
}
/**
* Get the current continuation token (for saving/restoring state)
*/
public getToken(): string | undefined {
return this.continuationToken;
}
/**
* Set the continuation token (for resuming from a saved state)
*/
public setToken(token: string | undefined): void {
this.continuationToken = token;
this.exhausted = !token;
}
}

View File

@@ -4,11 +4,23 @@ import { File } from './classes.file.js';
export class MetaData {
public static async hasMetaData(optionsArg: { file: File }) {
// lets find the existing metadata file
const existingFile = await optionsArg.file.parentDirectoryRef.getFile({
path: optionsArg.file.name + '.metadata',
});
return !!existingFile;
// try finding the existing metadata file; return false if it doesn't exist
try {
const existingFile = await optionsArg.file.parentDirectoryRef.getFile({
path: optionsArg.file.name + '.metadata',
});
return !!existingFile;
} catch (error: any) {
const message = error?.message || '';
const isNotFound =
message.includes('File not found') ||
error?.name === 'NotFound' ||
error?.$metadata?.httpStatusCode === 404;
if (isNotFound) {
return false;
}
throw error;
}
}
// static

289
ts/classes.watcher.ts Normal file
View File

@@ -0,0 +1,289 @@
// classes.watcher.ts
import * as plugins from './plugins.js';
import * as interfaces from './interfaces.js';
import type { Bucket } from './classes.bucket.js';
import { EventEmitter } from 'node:events';
/**
* BucketWatcher monitors an S3 bucket for changes (add/modify/delete)
* using a polling-based approach. Designed to follow the SmartdataDbWatcher pattern.
*
* @example
* ```typescript
* const watcher = bucket.createWatcher({ prefix: 'uploads/', pollIntervalMs: 3000 });
*
* // RxJS Observable pattern
* watcher.changeSubject.subscribe((change) => {
* console.log('Change:', change);
* });
*
* // EventEmitter pattern
* watcher.on('change', (change) => console.log(change));
* watcher.on('error', (err) => console.error(err));
*
* await watcher.start();
* await watcher.readyDeferred.promise; // Wait for initial state
*
* // Later...
* await watcher.stop();
* ```
*/
export class BucketWatcher extends EventEmitter {
/** Deferred that resolves when initial state is built and watcher is ready */
public readyDeferred = plugins.smartpromise.defer();
/** Observable for receiving change events (supports RxJS operators) */
public changeSubject: plugins.smartrx.rxjs.Observable<interfaces.IS3ChangeEvent | interfaces.IS3ChangeEvent[]>;
// Internal subjects and state
private rawSubject: plugins.smartrx.rxjs.Subject<interfaces.IS3ChangeEvent>;
private previousState: Map<string, interfaces.IS3ObjectState>;
private pollIntervalId: ReturnType<typeof setInterval> | null = null;
private isPolling = false;
private isStopped = false;
// Configuration
private readonly bucketRef: Bucket;
private readonly prefix: string;
private readonly pollIntervalMs: number;
private readonly bufferTimeMs?: number;
private readonly includeInitial: boolean;
private readonly pageSize: number;
constructor(bucketRef: Bucket, options: interfaces.IBucketWatcherOptions = {}) {
super();
this.bucketRef = bucketRef;
this.prefix = options.prefix ?? '';
this.pollIntervalMs = options.pollIntervalMs ?? 5000;
this.bufferTimeMs = options.bufferTimeMs;
this.includeInitial = options.includeInitial ?? false;
this.pageSize = options.pageSize ?? 1000;
// Initialize state tracking
this.previousState = new Map();
// Initialize raw subject for emitting changes
this.rawSubject = new plugins.smartrx.rxjs.Subject<interfaces.IS3ChangeEvent>();
// Configure the public observable with optional buffering
if (this.bufferTimeMs && this.bufferTimeMs > 0) {
this.changeSubject = this.rawSubject.pipe(
plugins.smartrx.rxjs.ops.bufferTime(this.bufferTimeMs),
plugins.smartrx.rxjs.ops.filter((events: interfaces.IS3ChangeEvent[]) => events.length > 0)
);
} else {
this.changeSubject = this.rawSubject.asObservable();
}
}
/**
* Start watching the bucket for changes
*/
public async start(): Promise<void> {
if (this.pollIntervalId !== null) {
console.log('BucketWatcher is already running');
return;
}
this.isStopped = false;
// Build initial state
await this.buildInitialState();
// Emit initial state as 'add' events if configured
if (this.includeInitial) {
for (const state of this.previousState.values()) {
this.emitChange({
type: 'add',
key: state.key,
size: state.size,
etag: state.etag,
lastModified: state.lastModified,
bucket: this.bucketRef.name,
});
}
}
// Mark as ready
this.readyDeferred.resolve();
// Start polling loop
this.pollIntervalId = setInterval(() => {
this.poll().catch((err) => {
this.emit('error', err);
});
}, this.pollIntervalMs);
}
/**
* Stop watching the bucket
*/
public async stop(): Promise<void> {
this.isStopped = true;
if (this.pollIntervalId !== null) {
clearInterval(this.pollIntervalId);
this.pollIntervalId = null;
}
// Wait for any in-progress poll to complete
while (this.isPolling) {
await new Promise<void>((resolve) => setTimeout(resolve, 50));
}
this.rawSubject.complete();
}
/**
* Alias for stop() - for consistency with other APIs
*/
public async close(): Promise<void> {
return this.stop();
}
/**
* Build the initial state by listing all objects with metadata
*/
private async buildInitialState(): Promise<void> {
this.previousState.clear();
for await (const obj of this.listObjectsWithMetadata()) {
if (obj.Key) {
this.previousState.set(obj.Key, {
key: obj.Key,
etag: obj.ETag ?? '',
size: obj.Size ?? 0,
lastModified: obj.LastModified ?? new Date(0),
});
}
}
}
/**
* Poll for changes by comparing current state against previous state
*/
private async poll(): Promise<void> {
// Guard against overlapping polls
if (this.isPolling || this.isStopped) {
return;
}
this.isPolling = true;
try {
// Build current state
const currentState = new Map<string, interfaces.IS3ObjectState>();
for await (const obj of this.listObjectsWithMetadata()) {
if (this.isStopped) {
break;
}
if (obj.Key) {
currentState.set(obj.Key, {
key: obj.Key,
etag: obj.ETag ?? '',
size: obj.Size ?? 0,
lastModified: obj.LastModified ?? new Date(0),
});
}
}
if (!this.isStopped) {
this.detectChanges(currentState);
this.previousState = currentState;
}
} catch (err) {
this.emit('error', err);
} finally {
this.isPolling = false;
}
}
/**
* Detect changes between current and previous state
*/
private detectChanges(currentState: Map<string, interfaces.IS3ObjectState>): void {
// Detect added and modified objects
for (const [key, current] of currentState) {
const previous = this.previousState.get(key);
if (!previous) {
// New object - emit 'add' event
this.emitChange({
type: 'add',
key: current.key,
size: current.size,
etag: current.etag,
lastModified: current.lastModified,
bucket: this.bucketRef.name,
});
} else if (
previous.etag !== current.etag ||
previous.size !== current.size ||
previous.lastModified.getTime() !== current.lastModified.getTime()
) {
// Object modified - emit 'modify' event
this.emitChange({
type: 'modify',
key: current.key,
size: current.size,
etag: current.etag,
lastModified: current.lastModified,
bucket: this.bucketRef.name,
});
}
}
// Detect deleted objects
for (const [key, previous] of this.previousState) {
if (!currentState.has(key)) {
// Object deleted - emit 'delete' event
this.emitChange({
type: 'delete',
key: previous.key,
bucket: this.bucketRef.name,
});
}
}
}
/**
* Emit a change event via both RxJS Subject and EventEmitter
*/
private emitChange(event: interfaces.IS3ChangeEvent): void {
this.rawSubject.next(event);
this.emit('change', event);
}
/**
* List objects with full metadata (ETag, Size, LastModified)
* This is a private method that yields full _Object data, not just keys
*/
private async *listObjectsWithMetadata(): AsyncIterableIterator<plugins.s3._Object> {
let continuationToken: string | undefined;
do {
if (this.isStopped) {
return;
}
const command = new plugins.s3.ListObjectsV2Command({
Bucket: this.bucketRef.name,
Prefix: this.prefix,
MaxKeys: this.pageSize,
ContinuationToken: continuationToken,
});
const response = await this.bucketRef.smartbucketRef.s3Client.send(command);
for (const obj of response.Contents || []) {
yield obj;
}
continuationToken = response.NextContinuationToken;
} while (continuationToken);
}
}

View File

@@ -2,3 +2,8 @@ export * from './classes.smartbucket.js';
export * from './classes.bucket.js';
export * from './classes.directory.js';
export * from './classes.file.js';
export * from './classes.listcursor.js';
export * from './classes.metadata.js';
export * from './classes.trash.js';
export * from './classes.watcher.js';
export * from './interfaces.js';

View File

@@ -3,4 +3,55 @@ import type { Directory } from "./classes.directory.js";
export interface IPathDecriptor {
path?: string;
directory?: Directory;
}
// ================================
// Bucket Watcher Interfaces
// ================================
/**
* Internal state tracking for an S3 object
*/
export interface IS3ObjectState {
key: string;
etag: string;
size: number;
lastModified: Date;
}
/**
* Change event emitted by BucketWatcher
*/
export interface IS3ChangeEvent {
type: 'add' | 'modify' | 'delete';
key: string;
size?: number;
etag?: string;
lastModified?: Date;
bucket: string;
}
/**
* Watcher mode - 'poll' is the default, 'websocket' reserved for future implementation
*/
export type TBucketWatcherMode = 'poll' | 'websocket';
/**
* Options for creating a BucketWatcher
*/
export interface IBucketWatcherOptions {
/** Watcher mode: 'poll' (default) or 'websocket' (future) */
mode?: TBucketWatcherMode;
/** Prefix to filter objects (default: '' for all objects) */
prefix?: string;
/** Polling interval in milliseconds (default: 5000, poll mode only) */
pollIntervalMs?: number;
/** Optional RxJS buffering time in milliseconds */
bufferTimeMs?: number;
/** Emit initial state as 'add' events (default: false) */
includeInitial?: boolean;
/** Page size for listing operations (default: 1000) */
pageSize?: number;
// Future websocket options will be added here
// websocketUrl?: string;
}

View File

@@ -26,7 +26,9 @@ export {
// third party scope
import * as s3 from '@aws-sdk/client-s3';
import { Minimatch } from 'minimatch';
export {
s3,
Minimatch,
}