111 lines
3.3 KiB
TypeScript
111 lines
3.3 KiB
TypeScript
|
|
/**
|
||
|
|
* Represents a streaming response from a Rust bridge command.
|
||
|
|
* Implements AsyncIterable to allow `for await...of` consumption of chunks,
|
||
|
|
* and exposes `.result` for the final response once the stream ends.
|
||
|
|
*
|
||
|
|
* @typeParam TChunk - Type of each streamed chunk
|
||
|
|
* @typeParam TResult - Type of the final result
|
||
|
|
*/
|
||
|
|
export class StreamingResponse<TChunk, TResult> implements AsyncIterable<TChunk> {
|
||
|
|
/** Resolves with the final result when the stream ends successfully. */
|
||
|
|
public readonly result: Promise<TResult>;
|
||
|
|
|
||
|
|
private resolveResult!: (value: TResult) => void;
|
||
|
|
private rejectResult!: (error: Error) => void;
|
||
|
|
|
||
|
|
/** Buffered chunks not yet consumed by the iterator. */
|
||
|
|
private buffer: TChunk[] = [];
|
||
|
|
/** Waiting consumer resolve callback (when iterator is ahead of producer). */
|
||
|
|
private waiting: ((value: IteratorResult<TChunk>) => void) | null = null;
|
||
|
|
/** Waiting consumer reject callback. */
|
||
|
|
private waitingReject: ((error: Error) => void) | null = null;
|
||
|
|
|
||
|
|
private done = false;
|
||
|
|
private error: Error | null = null;
|
||
|
|
|
||
|
|
constructor() {
|
||
|
|
this.result = new Promise<TResult>((resolve, reject) => {
|
||
|
|
this.resolveResult = resolve;
|
||
|
|
this.rejectResult = reject;
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Push a chunk into the stream. Called internally by RustBridge.
|
||
|
|
*/
|
||
|
|
public pushChunk(chunk: TChunk): void {
|
||
|
|
if (this.done) return;
|
||
|
|
|
||
|
|
if (this.waiting) {
|
||
|
|
// A consumer is waiting — deliver immediately
|
||
|
|
const resolve = this.waiting;
|
||
|
|
this.waiting = null;
|
||
|
|
this.waitingReject = null;
|
||
|
|
resolve({ value: chunk, done: false });
|
||
|
|
} else {
|
||
|
|
// No consumer waiting — buffer the chunk
|
||
|
|
this.buffer.push(chunk);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* End the stream successfully with a final result. Called internally by RustBridge.
|
||
|
|
*/
|
||
|
|
public finish(result: TResult): void {
|
||
|
|
if (this.done) return;
|
||
|
|
this.done = true;
|
||
|
|
this.resolveResult(result);
|
||
|
|
|
||
|
|
// If a consumer is waiting, signal end of iteration
|
||
|
|
if (this.waiting) {
|
||
|
|
const resolve = this.waiting;
|
||
|
|
this.waiting = null;
|
||
|
|
this.waitingReject = null;
|
||
|
|
resolve({ value: undefined as any, done: true });
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* End the stream with an error. Called internally by RustBridge.
|
||
|
|
*/
|
||
|
|
public fail(error: Error): void {
|
||
|
|
if (this.done) return;
|
||
|
|
this.done = true;
|
||
|
|
this.error = error;
|
||
|
|
this.rejectResult(error);
|
||
|
|
|
||
|
|
// If a consumer is waiting, reject it
|
||
|
|
if (this.waitingReject) {
|
||
|
|
const reject = this.waitingReject;
|
||
|
|
this.waiting = null;
|
||
|
|
this.waitingReject = null;
|
||
|
|
reject(error);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
[Symbol.asyncIterator](): AsyncIterator<TChunk> {
|
||
|
|
return {
|
||
|
|
next: (): Promise<IteratorResult<TChunk>> => {
|
||
|
|
// If there are buffered chunks, deliver one
|
||
|
|
if (this.buffer.length > 0) {
|
||
|
|
return Promise.resolve({ value: this.buffer.shift()!, done: false });
|
||
|
|
}
|
||
|
|
|
||
|
|
// If the stream is done, signal end
|
||
|
|
if (this.done) {
|
||
|
|
if (this.error) {
|
||
|
|
return Promise.reject(this.error);
|
||
|
|
}
|
||
|
|
return Promise.resolve({ value: undefined as any, done: true });
|
||
|
|
}
|
||
|
|
|
||
|
|
// No buffered chunks and not done — wait for the next push
|
||
|
|
return new Promise<IteratorResult<TChunk>>((resolve, reject) => {
|
||
|
|
this.waiting = resolve;
|
||
|
|
this.waitingReject = reject;
|
||
|
|
});
|
||
|
|
},
|
||
|
|
};
|
||
|
|
}
|
||
|
|
}
|