From 27403a73b57252849b6189fc862610f4626149e6 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Tue, 7 Nov 2023 14:09:48 +0100 Subject: [PATCH] fix(core): update --- ts/00_commitinfo_data.ts | 2 +- ts/fs.ts | 69 +++++++++++------ ts/fsstream.ts | 156 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 204 insertions(+), 23 deletions(-) diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 15e1560..76e982c 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartfile', - version: '11.0.0', + version: '11.0.1', description: 'offers smart ways to work with files in nodejs' } diff --git a/ts/fs.ts b/ts/fs.ts index d296c19..f7eed6c 100644 --- a/ts/fs.ts +++ b/ts/fs.ts @@ -389,27 +389,52 @@ export const listFileTree = async ( }; /** - * checks wether a file is ready for processing + * Watches for file stability before resolving the promise. */ -export const waitForFileToBeReady = async (filePathArg: string): Promise => { - if (!plugins.path.isAbsolute(filePathArg)) { - filePathArg = plugins.path.resolve(filePathArg); - } - const limitedArray = new plugins.lik.LimitedArray(3); - let fileReady = false; - while (!fileReady) { - const stats = await plugins.fsExtra.stat(filePathArg); - limitedArray.addOne(stats.size); - if ( - limitedArray.array.length < 3 || - !( - limitedArray.array[0] === limitedArray.array[1] && - limitedArray.array[1] === limitedArray.array[2] - ) - ) { - await plugins.smartdelay.delayFor(5000); - } else { - fileReady = true; - } - } +export const waitForFileToBeReady = (filePathArg: string): Promise => { + return new Promise((resolve, reject) => { + let lastSize = -1; + let stableCheckTimeout: NodeJS.Timeout | null = null; + + const clearStableCheckTimeout = () => { + if (stableCheckTimeout) { + clearTimeout(stableCheckTimeout); + stableCheckTimeout = null; + } + }; + + const watcher = plugins.fs.watch(filePathArg, (eventType, filename) => { + if (eventType === 'change') { + plugins.fs.stat(filePathArg, (err, stats) => { + if (err) { + watcher.close(); + clearStableCheckTimeout(); + reject(err); + return; + } + if (stats.size === lastSize) { + clearStableCheckTimeout(); + stableCheckTimeout = setTimeout(() => { + watcher.close(); + resolve(); + }, 5000); // stability duration + } else { + lastSize = stats.size; + } + }); + } + }); + + watcher.on('error', (error) => { + clearStableCheckTimeout(); + watcher.close(); + reject(error); + }); + }); }; + + + + + + diff --git a/ts/fsstream.ts b/ts/fsstream.ts index 5abaa56..16baa05 100644 --- a/ts/fsstream.ts +++ b/ts/fsstream.ts @@ -37,3 +37,159 @@ export const processDirectory = async ( } } }; + +/** + * Checks if a file is ready to be streamed (exists and is not empty). + */ +export const isFileReadyForStreaming = async (filePathArg: string): Promise => { + try { + const stats = await plugins.fs.promises.stat(filePathArg); + return stats.size > 0; + } catch (error) { + if (error.code === 'ENOENT') { // File does not exist + return false; + } + throw error; // Rethrow other unexpected errors + } +}; + +/** + * Waits for a file to be ready for streaming (exists and is not empty). + */ +export const waitForFileToBeReadyForStreaming = (filePathArg: string): Promise => { + return new Promise((resolve, reject) => { + // Normalize and resolve the file path + const filePath = plugins.path.resolve(filePathArg); + + // Function to check file stats + const checkFile = (resolve: () => void, reject: (reason: any) => void) => { + plugins.fs.stat(filePath, (err, stats) => { + if (err) { + if (err.code === 'ENOENT') { + // File not found, wait and try again + return; + } + // Some other error occurred + return reject(err); + } + if (stats.size > 0) { + // File exists and is not empty, resolve the promise + resolve(); + } + }); + }; + + // Set up file watcher + const watcher = plugins.fs.watch(filePath, { persistent: false }, (eventType) => { + if (eventType === 'change' || eventType === 'rename') { + checkFile(resolve, reject); + } + }); + + // Check file immediately in case it's already ready + checkFile(resolve, reject); + + // Error handling + watcher.on('error', (error) => { + watcher.close(); + reject(error); + }); + }); +}; + +class SmartReadStream extends plugins.stream.Readable { + private watcher: plugins.fs.FSWatcher | null = null; + private lastReadSize: number = 0; + private endTimeout: NodeJS.Timeout | null = null; + private filePath: string; + private endDelay: number; + private reading: boolean = false; + + constructor(filePath: string, endDelay = 60000, opts?: plugins.stream.ReadableOptions) { + super(opts); + this.filePath = filePath; + this.endDelay = endDelay; + } + + private startWatching(): void { + this.watcher = plugins.fs.watch(this.filePath, (eventType) => { + if (eventType === 'change') { + this.resetEndTimeout(); + } + }); + + this.watcher.on('error', (error) => { + this.cleanup(); + this.emit('error', error); + }); + } + + private resetEndTimeout(): void { + if (this.endTimeout) clearTimeout(this.endTimeout); + this.endTimeout = setTimeout(() => this.checkForEnd(), this.endDelay); + } + + private checkForEnd(): void { + plugins.fs.stat(this.filePath, (err, stats) => { + if (err) { + this.emit('error', err); + return; + } + + if (this.lastReadSize === stats.size) { + this.push(null); // Signal the end of the stream + this.cleanup(); + } else { + this.lastReadSize = stats.size; + this.resetEndTimeout(); + if (!this.reading) { + // We only want to continue reading if we were previously waiting for more data + this.reading = true; + this._read(10000); // Try to read more data + } + } + }); + } + + private cleanup(): void { + if (this.endTimeout) clearTimeout(this.endTimeout); + if (this.watcher) this.watcher.close(); + } + + _read(size: number): void { + this.reading = true; + const chunkSize = Math.min(size, 16384); // Read in chunks of 16KB + const buffer = Buffer.alloc(chunkSize); + plugins.fs.open(this.filePath, 'r', (err, fd) => { + if (err) { + this.emit('error', err); + return; + } + plugins.fs.read(fd, buffer, 0, chunkSize, this.lastReadSize, (err, bytesRead, buffer) => { + if (err) { + this.emit('error', err); + return; + } + + if (bytesRead > 0) { + this.lastReadSize += bytesRead; + this.push(buffer.slice(0, bytesRead)); // Push the data onto the stream + } else { + this.reading = false; // No more data to read for now + this.resetEndTimeout(); + } + + plugins.fs.close(fd, (err) => { + if (err) { + this.emit('error', err); + } + }); + }); + }); + } + + _destroy(error: Error | null, callback: (error: Error | null) => void): void { + this.cleanup(); + callback(error); + } +} \ No newline at end of file