Files
smartrust/ts/classes.streamingresponse.ts

111 lines
3.3 KiB
TypeScript

/**
* Represents a streaming response from a Rust bridge command.
* Implements AsyncIterable to allow `for await...of` consumption of chunks,
* and exposes `.result` for the final response once the stream ends.
*
* @typeParam TChunk - Type of each streamed chunk
* @typeParam TResult - Type of the final result
*/
export class StreamingResponse<TChunk, TResult> implements AsyncIterable<TChunk> {
/** Resolves with the final result when the stream ends successfully. */
public readonly result: Promise<TResult>;
private resolveResult!: (value: TResult) => void;
private rejectResult!: (error: Error) => void;
/** Buffered chunks not yet consumed by the iterator. */
private buffer: TChunk[] = [];
/** Waiting consumer resolve callback (when iterator is ahead of producer). */
private waiting: ((value: IteratorResult<TChunk>) => void) | null = null;
/** Waiting consumer reject callback. */
private waitingReject: ((error: Error) => void) | null = null;
private done = false;
private error: Error | null = null;
constructor() {
this.result = new Promise<TResult>((resolve, reject) => {
this.resolveResult = resolve;
this.rejectResult = reject;
});
}
/**
* Push a chunk into the stream. Called internally by RustBridge.
*/
public pushChunk(chunk: TChunk): void {
if (this.done) return;
if (this.waiting) {
// A consumer is waiting — deliver immediately
const resolve = this.waiting;
this.waiting = null;
this.waitingReject = null;
resolve({ value: chunk, done: false });
} else {
// No consumer waiting — buffer the chunk
this.buffer.push(chunk);
}
}
/**
* End the stream successfully with a final result. Called internally by RustBridge.
*/
public finish(result: TResult): void {
if (this.done) return;
this.done = true;
this.resolveResult(result);
// If a consumer is waiting, signal end of iteration
if (this.waiting) {
const resolve = this.waiting;
this.waiting = null;
this.waitingReject = null;
resolve({ value: undefined as any, done: true });
}
}
/**
* End the stream with an error. Called internally by RustBridge.
*/
public fail(error: Error): void {
if (this.done) return;
this.done = true;
this.error = error;
this.rejectResult(error);
// If a consumer is waiting, reject it
if (this.waitingReject) {
const reject = this.waitingReject;
this.waiting = null;
this.waitingReject = null;
reject(error);
}
}
[Symbol.asyncIterator](): AsyncIterator<TChunk> {
return {
next: (): Promise<IteratorResult<TChunk>> => {
// If there are buffered chunks, deliver one
if (this.buffer.length > 0) {
return Promise.resolve({ value: this.buffer.shift()!, done: false });
}
// If the stream is done, signal end
if (this.done) {
if (this.error) {
return Promise.reject(this.error);
}
return Promise.resolve({ value: undefined as any, done: true });
}
// No buffered chunks and not done — wait for the next push
return new Promise<IteratorResult<TChunk>>((resolve, reject) => {
this.waiting = resolve;
this.waitingReject = reject;
});
},
};
}
}