/* This file contains logic for streaming things from and to the filesystem */ import * as plugins from './plugins.js'; export const createReadStream = (pathArg: string) => { return plugins.fs.createReadStream(pathArg); }; export const createWriteStream = (pathArg: string) => { return plugins.fs.createWriteStream(pathArg); }; export const processFile = async ( filePath: string, asyncFunc: (fileStream: plugins.stream.Readable) => Promise ): Promise => { return new Promise((resolve, reject) => { const fileStream = createReadStream(filePath); asyncFunc(fileStream).then(resolve).catch(reject); }); } export const processDirectory = async ( directoryPath: string, asyncFunc: (fileStream: plugins.stream.Readable) => Promise ): Promise => { const files = plugins.fs.readdirSync(directoryPath, { withFileTypes: true }); for (const file of files) { const fullPath = plugins.path.join(directoryPath, file.name); if (file.isDirectory()) { await processDirectory(fullPath, asyncFunc); // Recursively call processDirectory for directories } else if (file.isFile()) { await processFile(fullPath, asyncFunc); // Call async function with the file stream and wait for it } } }; /** * Checks if a file is ready to be streamed (exists and is not empty). */ export const isFileReadyForStreaming = async (filePathArg: string): Promise => { try { const stats = await plugins.fs.promises.stat(filePathArg); return stats.size > 0; } catch (error) { if (error.code === 'ENOENT') { // File does not exist return false; } throw error; // Rethrow other unexpected errors } }; /** * Waits for a file to be ready for streaming (exists and is not empty). */ export const waitForFileToBeReadyForStreaming = (filePathArg: string): Promise => { return new Promise((resolve, reject) => { // Normalize and resolve the file path const filePath = plugins.path.resolve(filePathArg); // Function to check file stats const checkFile = (resolve: () => void, reject: (reason: any) => void) => { plugins.fs.stat(filePath, (err, stats) => { if (err) { if (err.code === 'ENOENT') { // File not found, wait and try again return; } // Some other error occurred return reject(err); } if (stats.size > 0) { // File exists and is not empty, resolve the promise resolve(); } }); }; // Set up file watcher const watcher = plugins.fs.watch(filePath, { persistent: false }, (eventType) => { if (eventType === 'change' || eventType === 'rename') { checkFile(resolve, reject); } }); // Check file immediately in case it's already ready checkFile(resolve, reject); // Error handling watcher.on('error', (error) => { watcher.close(); reject(error); }); }); }; export class SmartReadStream extends plugins.stream.Readable { private watcher: plugins.fs.FSWatcher | null = null; private lastReadSize: number = 0; private endTimeout: NodeJS.Timeout | null = null; private filePath: string; private endDelay: number; private reading: boolean = false; constructor(filePath: string, endDelay = 60000, opts?: plugins.stream.ReadableOptions) { super(opts); this.filePath = filePath; this.endDelay = endDelay; } private startWatching(): void { this.watcher = plugins.fs.watch(this.filePath, (eventType) => { if (eventType === 'change') { this.resetEndTimeout(); } }); this.watcher.on('error', (error) => { this.cleanup(); this.emit('error', error); }); } private resetEndTimeout(): void { if (this.endTimeout) clearTimeout(this.endTimeout); this.endTimeout = setTimeout(() => this.checkForEnd(), this.endDelay); } private checkForEnd(): void { plugins.fs.stat(this.filePath, (err, stats) => { if (err) { this.emit('error', err); return; } if (this.lastReadSize === stats.size) { this.push(null); // Signal the end of the stream this.cleanup(); } else { this.lastReadSize = stats.size; this.resetEndTimeout(); if (!this.reading) { // We only want to continue reading if we were previously waiting for more data this.reading = true; this._read(10000); // Try to read more data } } }); } private cleanup(): void { if (this.endTimeout) clearTimeout(this.endTimeout); if (this.watcher) this.watcher.close(); } _read(size: number): void { this.reading = true; const chunkSize = Math.min(size, 16384); // Read in chunks of 16KB const buffer = Buffer.alloc(chunkSize); plugins.fs.open(this.filePath, 'r', (err, fd) => { if (err) { this.emit('error', err); return; } plugins.fs.read(fd, buffer, 0, chunkSize, this.lastReadSize, (err, bytesRead, buffer) => { if (err) { this.emit('error', err); return; } if (bytesRead > 0) { this.lastReadSize += bytesRead; this.push(buffer.slice(0, bytesRead)); // Push the data onto the stream } else { this.reading = false; // No more data to read for now this.resetEndTimeout(); } plugins.fs.close(fd, (err) => { if (err) { this.emit('error', err); } }); }); }); } _destroy(error: Error | null, callback: (error: Error | null) => void): void { this.cleanup(); callback(error); } }