fix(core): update

This commit is contained in:
Philipp Kunz 2023-11-07 14:09:48 +01:00
parent b925e5e662
commit 27403a73b5
3 changed files with 204 additions and 23 deletions

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartfile', name: '@push.rocks/smartfile',
version: '11.0.0', version: '11.0.1',
description: 'offers smart ways to work with files in nodejs' description: 'offers smart ways to work with files in nodejs'
} }

View File

@ -389,27 +389,52 @@ export const listFileTree = async (
}; };
/** /**
* checks wether a file is ready for processing * Watches for file stability before resolving the promise.
*/ */
export const waitForFileToBeReady = async (filePathArg: string): Promise<void> => { export const waitForFileToBeReady = (filePathArg: string): Promise<void> => {
if (!plugins.path.isAbsolute(filePathArg)) { return new Promise((resolve, reject) => {
filePathArg = plugins.path.resolve(filePathArg); let lastSize = -1;
let stableCheckTimeout: NodeJS.Timeout | null = null;
const clearStableCheckTimeout = () => {
if (stableCheckTimeout) {
clearTimeout(stableCheckTimeout);
stableCheckTimeout = null;
} }
const limitedArray = new plugins.lik.LimitedArray<number>(3); };
let fileReady = false;
while (!fileReady) { const watcher = plugins.fs.watch(filePathArg, (eventType, filename) => {
const stats = await plugins.fsExtra.stat(filePathArg); if (eventType === 'change') {
limitedArray.addOne(stats.size); plugins.fs.stat(filePathArg, (err, stats) => {
if ( if (err) {
limitedArray.array.length < 3 || watcher.close();
!( clearStableCheckTimeout();
limitedArray.array[0] === limitedArray.array[1] && reject(err);
limitedArray.array[1] === limitedArray.array[2] return;
) }
) { if (stats.size === lastSize) {
await plugins.smartdelay.delayFor(5000); clearStableCheckTimeout();
stableCheckTimeout = setTimeout(() => {
watcher.close();
resolve();
}, 5000); // stability duration
} else { } else {
fileReady = true; lastSize = stats.size;
} }
});
} }
});
watcher.on('error', (error) => {
clearStableCheckTimeout();
watcher.close();
reject(error);
});
});
}; };

View File

@ -37,3 +37,159 @@ export const processDirectory = async (
} }
} }
}; };
/**
* Checks if a file is ready to be streamed (exists and is not empty).
*/
export const isFileReadyForStreaming = async (filePathArg: string): Promise<boolean> => {
try {
const stats = await plugins.fs.promises.stat(filePathArg);
return stats.size > 0;
} catch (error) {
if (error.code === 'ENOENT') { // File does not exist
return false;
}
throw error; // Rethrow other unexpected errors
}
};
/**
* Waits for a file to be ready for streaming (exists and is not empty).
*/
export const waitForFileToBeReadyForStreaming = (filePathArg: string): Promise<void> => {
return new Promise((resolve, reject) => {
// Normalize and resolve the file path
const filePath = plugins.path.resolve(filePathArg);
// Function to check file stats
const checkFile = (resolve: () => void, reject: (reason: any) => void) => {
plugins.fs.stat(filePath, (err, stats) => {
if (err) {
if (err.code === 'ENOENT') {
// File not found, wait and try again
return;
}
// Some other error occurred
return reject(err);
}
if (stats.size > 0) {
// File exists and is not empty, resolve the promise
resolve();
}
});
};
// Set up file watcher
const watcher = plugins.fs.watch(filePath, { persistent: false }, (eventType) => {
if (eventType === 'change' || eventType === 'rename') {
checkFile(resolve, reject);
}
});
// Check file immediately in case it's already ready
checkFile(resolve, reject);
// Error handling
watcher.on('error', (error) => {
watcher.close();
reject(error);
});
});
};
class SmartReadStream extends plugins.stream.Readable {
private watcher: plugins.fs.FSWatcher | null = null;
private lastReadSize: number = 0;
private endTimeout: NodeJS.Timeout | null = null;
private filePath: string;
private endDelay: number;
private reading: boolean = false;
constructor(filePath: string, endDelay = 60000, opts?: plugins.stream.ReadableOptions) {
super(opts);
this.filePath = filePath;
this.endDelay = endDelay;
}
private startWatching(): void {
this.watcher = plugins.fs.watch(this.filePath, (eventType) => {
if (eventType === 'change') {
this.resetEndTimeout();
}
});
this.watcher.on('error', (error) => {
this.cleanup();
this.emit('error', error);
});
}
private resetEndTimeout(): void {
if (this.endTimeout) clearTimeout(this.endTimeout);
this.endTimeout = setTimeout(() => this.checkForEnd(), this.endDelay);
}
private checkForEnd(): void {
plugins.fs.stat(this.filePath, (err, stats) => {
if (err) {
this.emit('error', err);
return;
}
if (this.lastReadSize === stats.size) {
this.push(null); // Signal the end of the stream
this.cleanup();
} else {
this.lastReadSize = stats.size;
this.resetEndTimeout();
if (!this.reading) {
// We only want to continue reading if we were previously waiting for more data
this.reading = true;
this._read(10000); // Try to read more data
}
}
});
}
private cleanup(): void {
if (this.endTimeout) clearTimeout(this.endTimeout);
if (this.watcher) this.watcher.close();
}
_read(size: number): void {
this.reading = true;
const chunkSize = Math.min(size, 16384); // Read in chunks of 16KB
const buffer = Buffer.alloc(chunkSize);
plugins.fs.open(this.filePath, 'r', (err, fd) => {
if (err) {
this.emit('error', err);
return;
}
plugins.fs.read(fd, buffer, 0, chunkSize, this.lastReadSize, (err, bytesRead, buffer) => {
if (err) {
this.emit('error', err);
return;
}
if (bytesRead > 0) {
this.lastReadSize += bytesRead;
this.push(buffer.slice(0, bytesRead)); // Push the data onto the stream
} else {
this.reading = false; // No more data to read for now
this.resetEndTimeout();
}
plugins.fs.close(fd, (err) => {
if (err) {
this.emit('error', err);
}
});
});
});
}
_destroy(error: Error | null, callback: (error: Error | null) => void): void {
this.cleanup();
callback(error);
}
}