195 lines
5.6 KiB
TypeScript
195 lines
5.6 KiB
TypeScript
/*
|
|
This file contains logic for streaming things from and to the filesystem
|
|
*/
|
|
import * as plugins from './plugins.js';
|
|
|
|
export const createReadStream = (pathArg: string) => {
|
|
return plugins.fs.createReadStream(pathArg);
|
|
};
|
|
|
|
export const createWriteStream = (pathArg: string) => {
|
|
return plugins.fs.createWriteStream(pathArg);
|
|
};
|
|
|
|
export const processFile = async (
|
|
filePath: string,
|
|
asyncFunc: (fileStream: plugins.stream.Readable) => Promise<void>
|
|
): Promise<void> => {
|
|
return new Promise((resolve, reject) => {
|
|
const fileStream = createReadStream(filePath);
|
|
asyncFunc(fileStream).then(resolve).catch(reject);
|
|
});
|
|
}
|
|
|
|
export const processDirectory = async (
|
|
directoryPath: string,
|
|
asyncFunc: (fileStream: plugins.stream.Readable) => Promise<void>
|
|
): Promise<void> => {
|
|
const files = plugins.fs.readdirSync(directoryPath, { withFileTypes: true });
|
|
|
|
for (const file of files) {
|
|
const fullPath = plugins.path.join(directoryPath, file.name);
|
|
|
|
if (file.isDirectory()) {
|
|
await processDirectory(fullPath, asyncFunc); // Recursively call processDirectory for directories
|
|
} else if (file.isFile()) {
|
|
await processFile(fullPath, asyncFunc); // Call async function with the file stream and wait for it
|
|
}
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Checks if a file is ready to be streamed (exists and is not empty).
|
|
*/
|
|
export const isFileReadyForStreaming = async (filePathArg: string): Promise<boolean> => {
|
|
try {
|
|
const stats = await plugins.fs.promises.stat(filePathArg);
|
|
return stats.size > 0;
|
|
} catch (error) {
|
|
if (error.code === 'ENOENT') { // File does not exist
|
|
return false;
|
|
}
|
|
throw error; // Rethrow other unexpected errors
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Waits for a file to be ready for streaming (exists and is not empty).
|
|
*/
|
|
export const waitForFileToBeReadyForStreaming = (filePathArg: string): Promise<void> => {
|
|
return new Promise((resolve, reject) => {
|
|
// Normalize and resolve the file path
|
|
const filePath = plugins.path.resolve(filePathArg);
|
|
|
|
// Function to check file stats
|
|
const checkFile = (resolve: () => void, reject: (reason: any) => void) => {
|
|
plugins.fs.stat(filePath, (err, stats) => {
|
|
if (err) {
|
|
if (err.code === 'ENOENT') {
|
|
// File not found, wait and try again
|
|
return;
|
|
}
|
|
// Some other error occurred
|
|
return reject(err);
|
|
}
|
|
if (stats.size > 0) {
|
|
// File exists and is not empty, resolve the promise
|
|
resolve();
|
|
}
|
|
});
|
|
};
|
|
|
|
// Set up file watcher
|
|
const watcher = plugins.fs.watch(filePath, { persistent: false }, (eventType) => {
|
|
if (eventType === 'change' || eventType === 'rename') {
|
|
checkFile(resolve, reject);
|
|
}
|
|
});
|
|
|
|
// Check file immediately in case it's already ready
|
|
checkFile(resolve, reject);
|
|
|
|
// Error handling
|
|
watcher.on('error', (error) => {
|
|
watcher.close();
|
|
reject(error);
|
|
});
|
|
});
|
|
};
|
|
|
|
export class SmartReadStream extends plugins.stream.Readable {
|
|
private watcher: plugins.fs.FSWatcher | null = null;
|
|
private lastReadSize: number = 0;
|
|
private endTimeout: NodeJS.Timeout | null = null;
|
|
private filePath: string;
|
|
private endDelay: number;
|
|
private reading: boolean = false;
|
|
|
|
constructor(filePath: string, endDelay = 60000, opts?: plugins.stream.ReadableOptions) {
|
|
super(opts);
|
|
this.filePath = filePath;
|
|
this.endDelay = endDelay;
|
|
}
|
|
|
|
private startWatching(): void {
|
|
this.watcher = plugins.fs.watch(this.filePath, (eventType) => {
|
|
if (eventType === 'change') {
|
|
this.resetEndTimeout();
|
|
}
|
|
});
|
|
|
|
this.watcher.on('error', (error) => {
|
|
this.cleanup();
|
|
this.emit('error', error);
|
|
});
|
|
}
|
|
|
|
private resetEndTimeout(): void {
|
|
if (this.endTimeout) clearTimeout(this.endTimeout);
|
|
this.endTimeout = setTimeout(() => this.checkForEnd(), this.endDelay);
|
|
}
|
|
|
|
private checkForEnd(): void {
|
|
plugins.fs.stat(this.filePath, (err, stats) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
return;
|
|
}
|
|
|
|
if (this.lastReadSize === stats.size) {
|
|
this.push(null); // Signal the end of the stream
|
|
this.cleanup();
|
|
} else {
|
|
this.lastReadSize = stats.size;
|
|
this.resetEndTimeout();
|
|
if (!this.reading) {
|
|
// We only want to continue reading if we were previously waiting for more data
|
|
this.reading = true;
|
|
this._read(10000); // Try to read more data
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
private cleanup(): void {
|
|
if (this.endTimeout) clearTimeout(this.endTimeout);
|
|
if (this.watcher) this.watcher.close();
|
|
}
|
|
|
|
_read(size: number): void {
|
|
this.reading = true;
|
|
const chunkSize = Math.min(size, 16384); // Read in chunks of 16KB
|
|
const buffer = Buffer.alloc(chunkSize);
|
|
plugins.fs.open(this.filePath, 'r', (err, fd) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
return;
|
|
}
|
|
plugins.fs.read(fd, buffer, 0, chunkSize, this.lastReadSize, (err, bytesRead, buffer) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
return;
|
|
}
|
|
|
|
if (bytesRead > 0) {
|
|
this.lastReadSize += bytesRead;
|
|
this.push(buffer.slice(0, bytesRead)); // Push the data onto the stream
|
|
} else {
|
|
this.reading = false; // No more data to read for now
|
|
this.resetEndTimeout();
|
|
}
|
|
|
|
plugins.fs.close(fd, (err) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
}
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
_destroy(error: Error | null, callback: (error: Error | null) => void): void {
|
|
this.cleanup();
|
|
callback(error);
|
|
}
|
|
} |