/** * Represents a streaming response from a Rust bridge command. * Implements AsyncIterable to allow `for await...of` consumption of chunks, * and exposes `.result` for the final response once the stream ends. * * @typeParam TChunk - Type of each streamed chunk * @typeParam TResult - Type of the final result */ export class StreamingResponse implements AsyncIterable { /** Resolves with the final result when the stream ends successfully. */ public readonly result: Promise; private resolveResult!: (value: TResult) => void; private rejectResult!: (error: Error) => void; /** Buffered chunks not yet consumed by the iterator. */ private buffer: TChunk[] = []; /** Waiting consumer resolve callback (when iterator is ahead of producer). */ private waiting: ((value: IteratorResult) => void) | null = null; /** Waiting consumer reject callback. */ private waitingReject: ((error: Error) => void) | null = null; private done = false; private error: Error | null = null; constructor() { this.result = new Promise((resolve, reject) => { this.resolveResult = resolve; this.rejectResult = reject; }); } /** * Push a chunk into the stream. Called internally by RustBridge. */ public pushChunk(chunk: TChunk): void { if (this.done) return; if (this.waiting) { // A consumer is waiting — deliver immediately const resolve = this.waiting; this.waiting = null; this.waitingReject = null; resolve({ value: chunk, done: false }); } else { // No consumer waiting — buffer the chunk this.buffer.push(chunk); } } /** * End the stream successfully with a final result. Called internally by RustBridge. */ public finish(result: TResult): void { if (this.done) return; this.done = true; this.resolveResult(result); // If a consumer is waiting, signal end of iteration if (this.waiting) { const resolve = this.waiting; this.waiting = null; this.waitingReject = null; resolve({ value: undefined as any, done: true }); } } /** * End the stream with an error. Called internally by RustBridge. */ public fail(error: Error): void { if (this.done) return; this.done = true; this.error = error; this.rejectResult(error); // If a consumer is waiting, reject it if (this.waitingReject) { const reject = this.waitingReject; this.waiting = null; this.waitingReject = null; reject(error); } } [Symbol.asyncIterator](): AsyncIterator { return { next: (): Promise> => { // If there are buffered chunks, deliver one if (this.buffer.length > 0) { return Promise.resolve({ value: this.buffer.shift()!, done: false }); } // If the stream is done, signal end if (this.done) { if (this.error) { return Promise.reject(this.error); } return Promise.resolve({ value: undefined as any, done: true }); } // No buffered chunks and not done — wait for the next push return new Promise>((resolve, reject) => { this.waiting = resolve; this.waitingReject = reject; }); }, }; } }