Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
c92a0dddbd | |||
27403a73b5 | |||
b925e5e662 | |||
98a5d2c94d | |||
0e735cba20 | |||
f815457801 | |||
f7e47ae354 | |||
684e893801 |
@ -1,7 +1,7 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/smartfile",
|
"name": "@push.rocks/smartfile",
|
||||||
"private": false,
|
"private": false,
|
||||||
"version": "10.0.38",
|
"version": "11.0.1",
|
||||||
"description": "offers smart ways to work with files in nodejs",
|
"description": "offers smart ways to work with files in nodejs",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
"typings": "dist_ts/index.d.ts",
|
"typings": "dist_ts/index.d.ts",
|
||||||
|
@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartfile',
|
name: '@push.rocks/smartfile',
|
||||||
version: '10.0.38',
|
version: '11.0.1',
|
||||||
description: 'offers smart ways to work with files in nodejs'
|
description: 'offers smart ways to work with files in nodejs'
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,7 @@ import * as smartfileFs from './fs.js';
|
|||||||
import * as smartfileFsStream from './fsstream.js';
|
import * as smartfileFsStream from './fsstream.js';
|
||||||
import { Readable } from 'stream';
|
import { Readable } from 'stream';
|
||||||
|
|
||||||
type StreamSource = () => Promise<Readable>;
|
type TStreamSource = (streamFile: StreamFile) => Promise<Readable>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The StreamFile class represents a file as a stream.
|
* The StreamFile class represents a file as a stream.
|
||||||
@ -12,9 +12,14 @@ type StreamSource = () => Promise<Readable>;
|
|||||||
export class StreamFile {
|
export class StreamFile {
|
||||||
// INSTANCE
|
// INSTANCE
|
||||||
relativeFilePath?: string;
|
relativeFilePath?: string;
|
||||||
private streamSource: StreamSource;
|
private streamSource: TStreamSource;
|
||||||
|
|
||||||
private constructor(streamSource: StreamSource, relativeFilePath?: string) {
|
// enable stream based multi use
|
||||||
|
private cachedStreamBuffer?: Buffer;
|
||||||
|
public multiUse: boolean;
|
||||||
|
public used: boolean = false;
|
||||||
|
|
||||||
|
private constructor(streamSource: TStreamSource, relativeFilePath?: string) {
|
||||||
this.streamSource = streamSource;
|
this.streamSource = streamSource;
|
||||||
this.relativeFilePath = relativeFilePath;
|
this.relativeFilePath = relativeFilePath;
|
||||||
}
|
}
|
||||||
@ -22,32 +27,84 @@ export class StreamFile {
|
|||||||
// STATIC
|
// STATIC
|
||||||
|
|
||||||
public static async fromPath(filePath: string): Promise<StreamFile> {
|
public static async fromPath(filePath: string): Promise<StreamFile> {
|
||||||
const streamSource = () => Promise.resolve(smartfileFsStream.createReadStream(filePath));
|
const streamSource: TStreamSource = async (stremFileArg) => smartfileFsStream.createReadStream(filePath);
|
||||||
return new StreamFile(streamSource, filePath);
|
const streamFile = new StreamFile(streamSource, filePath);
|
||||||
|
streamFile.multiUse = true;
|
||||||
|
return streamFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static async fromUrl(url: string): Promise<StreamFile> {
|
public static async fromUrl(url: string): Promise<StreamFile> {
|
||||||
const streamSource = async () => plugins.smartrequest.getStream(url); // Replace with actual plugin method
|
const streamSource: TStreamSource = async (streamFileArg) => plugins.smartrequest.getStream(url); // Replace with actual plugin method
|
||||||
return new StreamFile(streamSource);
|
const streamFile = new StreamFile(streamSource);
|
||||||
|
streamFile.multiUse = true;
|
||||||
|
return streamFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static fromBuffer(buffer: Buffer, relativeFilePath?: string): StreamFile {
|
public static fromBuffer(buffer: Buffer, relativeFilePath?: string): StreamFile {
|
||||||
const streamSource = () => {
|
const streamSource: TStreamSource = async (streamFileArg) => {
|
||||||
const stream = new Readable();
|
const stream = new Readable();
|
||||||
stream.push(buffer);
|
stream.push(buffer);
|
||||||
stream.push(null); // End of stream
|
stream.push(null); // End of stream
|
||||||
return Promise.resolve(stream);
|
return stream;
|
||||||
};
|
};
|
||||||
return new StreamFile(streamSource, relativeFilePath);
|
const streamFile = new StreamFile(streamSource, relativeFilePath);
|
||||||
|
streamFile.multiUse = true;
|
||||||
|
return streamFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a StreamFile from an existing Readable stream with an option for multiple uses.
|
||||||
|
* @param stream A Node.js Readable stream.
|
||||||
|
* @param relativeFilePath Optional file path for the stream.
|
||||||
|
* @param multiUse If true, the stream can be read multiple times, caching its content.
|
||||||
|
* @returns A StreamFile instance.
|
||||||
|
*/
|
||||||
|
public static fromStream(stream: Readable, relativeFilePath?: string, multiUse: boolean = false): StreamFile {
|
||||||
|
const streamSource: TStreamSource = (streamFileArg) => {
|
||||||
|
if (streamFileArg.multiUse) {
|
||||||
|
// If multi-use is enabled and we have cached content, create a new readable stream from the buffer
|
||||||
|
const bufferedStream = new Readable();
|
||||||
|
bufferedStream.push(streamFileArg.cachedStreamBuffer);
|
||||||
|
bufferedStream.push(null); // No more data to push
|
||||||
|
return Promise.resolve(bufferedStream);
|
||||||
|
} else {
|
||||||
|
return Promise.resolve(stream);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const streamFile = new StreamFile(streamSource, relativeFilePath);
|
||||||
|
streamFile.multiUse = multiUse;
|
||||||
|
|
||||||
|
// If multi-use is enabled, cache the stream when it's first read
|
||||||
|
if (multiUse) {
|
||||||
|
const chunks: Buffer[] = [];
|
||||||
|
stream.on('data', (chunk) => chunks.push(Buffer.from(chunk)));
|
||||||
|
stream.on('end', () => {
|
||||||
|
streamFile.cachedStreamBuffer = Buffer.concat(chunks);
|
||||||
|
});
|
||||||
|
// It's important to handle errors that may occur during streaming
|
||||||
|
stream.on('error', (err) => {
|
||||||
|
console.error('Error while caching stream:', err);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return streamFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
// METHODS
|
// METHODS
|
||||||
|
|
||||||
|
private checkMultiUse() {
|
||||||
|
if (!this.multiUse && this.used) {
|
||||||
|
throw new Error('This stream can only be used once.');
|
||||||
|
}
|
||||||
|
this.used = true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new readable stream from the source.
|
* Creates a new readable stream from the source.
|
||||||
*/
|
*/
|
||||||
public async createReadStream(): Promise<Readable> {
|
public async createReadStream(): Promise<Readable> {
|
||||||
return this.streamSource();
|
return this.streamSource(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -55,6 +112,7 @@ export class StreamFile {
|
|||||||
* @param filePathArg The file path where the stream should be written.
|
* @param filePathArg The file path where the stream should be written.
|
||||||
*/
|
*/
|
||||||
public async writeToDisk(filePathArg: string): Promise<void> {
|
public async writeToDisk(filePathArg: string): Promise<void> {
|
||||||
|
this.checkMultiUse();
|
||||||
const readStream = await this.createReadStream();
|
const readStream = await this.createReadStream();
|
||||||
const writeStream = smartfileFsStream.createWriteStream(filePathArg);
|
const writeStream = smartfileFsStream.createWriteStream(filePathArg);
|
||||||
|
|
||||||
@ -67,12 +125,14 @@ export class StreamFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async writeToDir(dirPathArg: string) {
|
public async writeToDir(dirPathArg: string) {
|
||||||
|
this.checkMultiUse();
|
||||||
const filePath = plugins.path.join(dirPathArg, this.relativeFilePath);
|
const filePath = plugins.path.join(dirPathArg, this.relativeFilePath);
|
||||||
await smartfileFs.ensureDir(plugins.path.parse(filePath).dir);
|
await smartfileFs.ensureDir(plugins.path.parse(filePath).dir);
|
||||||
return this.writeToDisk(filePath);
|
return this.writeToDisk(filePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async getContentAsBuffer() {
|
public async getContentAsBuffer() {
|
||||||
|
this.checkMultiUse();
|
||||||
const done = plugins.smartpromise.defer<Buffer>();
|
const done = plugins.smartpromise.defer<Buffer>();
|
||||||
const readStream = await this.createReadStream();
|
const readStream = await this.createReadStream();
|
||||||
const chunks: Buffer[] = [];
|
const chunks: Buffer[] = [];
|
||||||
|
69
ts/fs.ts
69
ts/fs.ts
@ -389,27 +389,52 @@ export const listFileTree = async (
|
|||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* checks wether a file is ready for processing
|
* Watches for file stability before resolving the promise.
|
||||||
*/
|
*/
|
||||||
export const waitForFileToBeReady = async (filePathArg: string): Promise<void> => {
|
export const waitForFileToBeReady = (filePathArg: string): Promise<void> => {
|
||||||
if (!plugins.path.isAbsolute(filePathArg)) {
|
return new Promise((resolve, reject) => {
|
||||||
filePathArg = plugins.path.resolve(filePathArg);
|
let lastSize = -1;
|
||||||
}
|
let stableCheckTimeout: NodeJS.Timeout | null = null;
|
||||||
const limitedArray = new plugins.lik.LimitedArray<number>(3);
|
|
||||||
let fileReady = false;
|
const clearStableCheckTimeout = () => {
|
||||||
while (!fileReady) {
|
if (stableCheckTimeout) {
|
||||||
const stats = await plugins.fsExtra.stat(filePathArg);
|
clearTimeout(stableCheckTimeout);
|
||||||
limitedArray.addOne(stats.size);
|
stableCheckTimeout = null;
|
||||||
if (
|
}
|
||||||
limitedArray.array.length < 3 ||
|
};
|
||||||
!(
|
|
||||||
limitedArray.array[0] === limitedArray.array[1] &&
|
const watcher = plugins.fs.watch(filePathArg, (eventType, filename) => {
|
||||||
limitedArray.array[1] === limitedArray.array[2]
|
if (eventType === 'change') {
|
||||||
)
|
plugins.fs.stat(filePathArg, (err, stats) => {
|
||||||
) {
|
if (err) {
|
||||||
await plugins.smartdelay.delayFor(5000);
|
watcher.close();
|
||||||
} else {
|
clearStableCheckTimeout();
|
||||||
fileReady = true;
|
reject(err);
|
||||||
}
|
return;
|
||||||
}
|
}
|
||||||
|
if (stats.size === lastSize) {
|
||||||
|
clearStableCheckTimeout();
|
||||||
|
stableCheckTimeout = setTimeout(() => {
|
||||||
|
watcher.close();
|
||||||
|
resolve();
|
||||||
|
}, 5000); // stability duration
|
||||||
|
} else {
|
||||||
|
lastSize = stats.size;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
watcher.on('error', (error) => {
|
||||||
|
clearStableCheckTimeout();
|
||||||
|
watcher.close();
|
||||||
|
reject(error);
|
||||||
|
});
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
156
ts/fsstream.ts
156
ts/fsstream.ts
@ -37,3 +37,159 @@ export const processDirectory = async (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if a file is ready to be streamed (exists and is not empty).
|
||||||
|
*/
|
||||||
|
export const isFileReadyForStreaming = async (filePathArg: string): Promise<boolean> => {
|
||||||
|
try {
|
||||||
|
const stats = await plugins.fs.promises.stat(filePathArg);
|
||||||
|
return stats.size > 0;
|
||||||
|
} catch (error) {
|
||||||
|
if (error.code === 'ENOENT') { // File does not exist
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
throw error; // Rethrow other unexpected errors
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits for a file to be ready for streaming (exists and is not empty).
|
||||||
|
*/
|
||||||
|
export const waitForFileToBeReadyForStreaming = (filePathArg: string): Promise<void> => {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
// Normalize and resolve the file path
|
||||||
|
const filePath = plugins.path.resolve(filePathArg);
|
||||||
|
|
||||||
|
// Function to check file stats
|
||||||
|
const checkFile = (resolve: () => void, reject: (reason: any) => void) => {
|
||||||
|
plugins.fs.stat(filePath, (err, stats) => {
|
||||||
|
if (err) {
|
||||||
|
if (err.code === 'ENOENT') {
|
||||||
|
// File not found, wait and try again
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Some other error occurred
|
||||||
|
return reject(err);
|
||||||
|
}
|
||||||
|
if (stats.size > 0) {
|
||||||
|
// File exists and is not empty, resolve the promise
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
// Set up file watcher
|
||||||
|
const watcher = plugins.fs.watch(filePath, { persistent: false }, (eventType) => {
|
||||||
|
if (eventType === 'change' || eventType === 'rename') {
|
||||||
|
checkFile(resolve, reject);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Check file immediately in case it's already ready
|
||||||
|
checkFile(resolve, reject);
|
||||||
|
|
||||||
|
// Error handling
|
||||||
|
watcher.on('error', (error) => {
|
||||||
|
watcher.close();
|
||||||
|
reject(error);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
class SmartReadStream extends plugins.stream.Readable {
|
||||||
|
private watcher: plugins.fs.FSWatcher | null = null;
|
||||||
|
private lastReadSize: number = 0;
|
||||||
|
private endTimeout: NodeJS.Timeout | null = null;
|
||||||
|
private filePath: string;
|
||||||
|
private endDelay: number;
|
||||||
|
private reading: boolean = false;
|
||||||
|
|
||||||
|
constructor(filePath: string, endDelay = 60000, opts?: plugins.stream.ReadableOptions) {
|
||||||
|
super(opts);
|
||||||
|
this.filePath = filePath;
|
||||||
|
this.endDelay = endDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
private startWatching(): void {
|
||||||
|
this.watcher = plugins.fs.watch(this.filePath, (eventType) => {
|
||||||
|
if (eventType === 'change') {
|
||||||
|
this.resetEndTimeout();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
this.watcher.on('error', (error) => {
|
||||||
|
this.cleanup();
|
||||||
|
this.emit('error', error);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private resetEndTimeout(): void {
|
||||||
|
if (this.endTimeout) clearTimeout(this.endTimeout);
|
||||||
|
this.endTimeout = setTimeout(() => this.checkForEnd(), this.endDelay);
|
||||||
|
}
|
||||||
|
|
||||||
|
private checkForEnd(): void {
|
||||||
|
plugins.fs.stat(this.filePath, (err, stats) => {
|
||||||
|
if (err) {
|
||||||
|
this.emit('error', err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.lastReadSize === stats.size) {
|
||||||
|
this.push(null); // Signal the end of the stream
|
||||||
|
this.cleanup();
|
||||||
|
} else {
|
||||||
|
this.lastReadSize = stats.size;
|
||||||
|
this.resetEndTimeout();
|
||||||
|
if (!this.reading) {
|
||||||
|
// We only want to continue reading if we were previously waiting for more data
|
||||||
|
this.reading = true;
|
||||||
|
this._read(10000); // Try to read more data
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private cleanup(): void {
|
||||||
|
if (this.endTimeout) clearTimeout(this.endTimeout);
|
||||||
|
if (this.watcher) this.watcher.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
_read(size: number): void {
|
||||||
|
this.reading = true;
|
||||||
|
const chunkSize = Math.min(size, 16384); // Read in chunks of 16KB
|
||||||
|
const buffer = Buffer.alloc(chunkSize);
|
||||||
|
plugins.fs.open(this.filePath, 'r', (err, fd) => {
|
||||||
|
if (err) {
|
||||||
|
this.emit('error', err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
plugins.fs.read(fd, buffer, 0, chunkSize, this.lastReadSize, (err, bytesRead, buffer) => {
|
||||||
|
if (err) {
|
||||||
|
this.emit('error', err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bytesRead > 0) {
|
||||||
|
this.lastReadSize += bytesRead;
|
||||||
|
this.push(buffer.slice(0, bytesRead)); // Push the data onto the stream
|
||||||
|
} else {
|
||||||
|
this.reading = false; // No more data to read for now
|
||||||
|
this.resetEndTimeout();
|
||||||
|
}
|
||||||
|
|
||||||
|
plugins.fs.close(fd, (err) => {
|
||||||
|
if (err) {
|
||||||
|
this.emit('error', err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_destroy(error: Error | null, callback: (error: Error | null) => void): void {
|
||||||
|
this.cleanup();
|
||||||
|
callback(error);
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user