fix(fs): Improve fs and stream handling, enhance SmartFile/StreamFile, update tests and CI configs
This commit is contained in:
@@ -13,17 +13,17 @@ export const createWriteStream = (pathArg: string) => {
|
||||
|
||||
export const processFile = async (
|
||||
filePath: string,
|
||||
asyncFunc: (fileStream: plugins.stream.Readable) => Promise<void>
|
||||
asyncFunc: (fileStream: plugins.stream.Readable) => Promise<void>,
|
||||
): Promise<void> => {
|
||||
return new Promise((resolve, reject) => {
|
||||
const fileStream = createReadStream(filePath);
|
||||
asyncFunc(fileStream).then(resolve).catch(reject);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
export const processDirectory = async (
|
||||
directoryPath: string,
|
||||
asyncFunc: (fileStream: plugins.stream.Readable) => Promise<void>
|
||||
asyncFunc: (fileStream: plugins.stream.Readable) => Promise<void>,
|
||||
): Promise<void> => {
|
||||
const files = plugins.fs.readdirSync(directoryPath, { withFileTypes: true });
|
||||
|
||||
@@ -41,12 +41,15 @@ export const processDirectory = async (
|
||||
/**
|
||||
* Checks if a file is ready to be streamed (exists and is not empty).
|
||||
*/
|
||||
export const isFileReadyForStreaming = async (filePathArg: string): Promise<boolean> => {
|
||||
export const isFileReadyForStreaming = async (
|
||||
filePathArg: string,
|
||||
): Promise<boolean> => {
|
||||
try {
|
||||
const stats = await plugins.fs.promises.stat(filePathArg);
|
||||
return stats.size > 0;
|
||||
} catch (error) {
|
||||
if (error.code === 'ENOENT') { // File does not exist
|
||||
if (error.code === 'ENOENT') {
|
||||
// File does not exist
|
||||
return false;
|
||||
}
|
||||
throw error; // Rethrow other unexpected errors
|
||||
@@ -56,7 +59,9 @@ export const isFileReadyForStreaming = async (filePathArg: string): Promise<bool
|
||||
/**
|
||||
* Waits for a file to be ready for streaming (exists and is not empty).
|
||||
*/
|
||||
export const waitForFileToBeReadyForStreaming = (filePathArg: string): Promise<void> => {
|
||||
export const waitForFileToBeReadyForStreaming = (
|
||||
filePathArg: string,
|
||||
): Promise<void> => {
|
||||
return new Promise((resolve, reject) => {
|
||||
// Normalize and resolve the file path
|
||||
const filePath = plugins.path.resolve(filePathArg);
|
||||
@@ -80,11 +85,15 @@ export const waitForFileToBeReadyForStreaming = (filePathArg: string): Promise<v
|
||||
};
|
||||
|
||||
// Set up file watcher
|
||||
const watcher = plugins.fs.watch(filePath, { persistent: false }, (eventType) => {
|
||||
if (eventType === 'change' || eventType === 'rename') {
|
||||
checkFile(resolve, reject);
|
||||
}
|
||||
});
|
||||
const watcher = plugins.fs.watch(
|
||||
filePath,
|
||||
{ persistent: false },
|
||||
(eventType) => {
|
||||
if (eventType === 'change' || eventType === 'rename') {
|
||||
checkFile(resolve, reject);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Check file immediately in case it's already ready
|
||||
checkFile(resolve, reject);
|
||||
@@ -105,7 +114,11 @@ export class SmartReadStream extends plugins.stream.Readable {
|
||||
private endDelay: number;
|
||||
private reading: boolean = false;
|
||||
|
||||
constructor(filePath: string, endDelay = 60000, opts?: plugins.stream.ReadableOptions) {
|
||||
constructor(
|
||||
filePath: string,
|
||||
endDelay = 60000,
|
||||
opts?: plugins.stream.ReadableOptions,
|
||||
) {
|
||||
super(opts);
|
||||
this.filePath = filePath;
|
||||
this.endDelay = endDelay;
|
||||
@@ -165,26 +178,33 @@ export class SmartReadStream extends plugins.stream.Readable {
|
||||
this.emit('error', err);
|
||||
return;
|
||||
}
|
||||
plugins.fs.read(fd, buffer, 0, chunkSize, this.lastReadSize, (err, bytesRead, buffer) => {
|
||||
if (err) {
|
||||
this.emit('error', err);
|
||||
return;
|
||||
}
|
||||
|
||||
if (bytesRead > 0) {
|
||||
this.lastReadSize += bytesRead;
|
||||
this.push(buffer.slice(0, bytesRead)); // Push the data onto the stream
|
||||
} else {
|
||||
this.reading = false; // No more data to read for now
|
||||
this.resetEndTimeout();
|
||||
}
|
||||
|
||||
plugins.fs.close(fd, (err) => {
|
||||
plugins.fs.read(
|
||||
fd,
|
||||
buffer,
|
||||
0,
|
||||
chunkSize,
|
||||
this.lastReadSize,
|
||||
(err, bytesRead, buffer) => {
|
||||
if (err) {
|
||||
this.emit('error', err);
|
||||
return;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
if (bytesRead > 0) {
|
||||
this.lastReadSize += bytesRead;
|
||||
this.push(buffer.slice(0, bytesRead)); // Push the data onto the stream
|
||||
} else {
|
||||
this.reading = false; // No more data to read for now
|
||||
this.resetEndTimeout();
|
||||
}
|
||||
|
||||
plugins.fs.close(fd, (err) => {
|
||||
if (err) {
|
||||
this.emit('error', err);
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -192,4 +212,4 @@ export class SmartReadStream extends plugins.stream.Readable {
|
||||
this.cleanup();
|
||||
callback(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user