174 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
			
		
		
	
	
			174 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
| import * as plugins from './plugins.js';
 | |
| import * as types from './types.js';
 | |
| import { CoreResponse as AbstractCoreResponse } from '../core_base/response.js';
 | |
| 
 | |
| /**
 | |
|  * Node.js implementation of Core Response class that provides a fetch-like API
 | |
|  */
 | |
| export class CoreResponse<T = any>
 | |
|   extends AbstractCoreResponse<T>
 | |
|   implements types.INodeResponse<T>
 | |
| {
 | |
|   private incomingMessage: plugins.http.IncomingMessage;
 | |
|   private bodyBufferPromise: Promise<Buffer> | null = null;
 | |
|   private _autoDrainTimeout: NodeJS.Immediate | null = null;
 | |
| 
 | |
|   // Public properties
 | |
|   public readonly ok: boolean;
 | |
|   public readonly status: number;
 | |
|   public readonly statusText: string;
 | |
|   public readonly headers: plugins.http.IncomingHttpHeaders;
 | |
|   public readonly url: string;
 | |
| 
 | |
|   constructor(
 | |
|     incomingMessage: plugins.http.IncomingMessage,
 | |
|     url: string,
 | |
|     options: types.ICoreRequestOptions = {},
 | |
|   ) {
 | |
|     super();
 | |
|     this.incomingMessage = incomingMessage;
 | |
|     this.url = url;
 | |
|     this.status = incomingMessage.statusCode || 0;
 | |
|     this.statusText = incomingMessage.statusMessage || '';
 | |
|     this.ok = this.status >= 200 && this.status < 300;
 | |
|     this.headers = incomingMessage.headers;
 | |
| 
 | |
|     // Auto-drain unconsumed streams to prevent socket hanging
 | |
|     // This prevents keep-alive sockets from timing out when response bodies aren't consumed
 | |
|     // Default to true if not specified
 | |
|     if (options.autoDrain !== false) {
 | |
|       this._autoDrainTimeout = setImmediate(() => {
 | |
|         if (!this.consumed && !this.incomingMessage.readableEnded) {
 | |
|           console.log(
 | |
|             `Auto-draining unconsumed response body for ${this.url} (status: ${this.status})`,
 | |
|           );
 | |
|           this.incomingMessage.resume(); // Drain without processing
 | |
|         }
 | |
|       });
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Override to also cancel auto-drain when body is consumed
 | |
|    */
 | |
|   protected ensureNotConsumed(): void {
 | |
|     // Cancel auto-drain since we're consuming the body
 | |
|     if (this._autoDrainTimeout) {
 | |
|       clearImmediate(this._autoDrainTimeout);
 | |
|       this._autoDrainTimeout = null;
 | |
|     }
 | |
| 
 | |
|     super.ensureNotConsumed();
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Collects the body as a buffer
 | |
|    */
 | |
|   private async collectBody(): Promise<Buffer> {
 | |
|     this.ensureNotConsumed();
 | |
| 
 | |
|     if (this.bodyBufferPromise) {
 | |
|       return this.bodyBufferPromise;
 | |
|     }
 | |
| 
 | |
|     this.bodyBufferPromise = new Promise<Buffer>((resolve, reject) => {
 | |
|       const chunks: Buffer[] = [];
 | |
| 
 | |
|       this.incomingMessage.on('data', (chunk: Buffer) => {
 | |
|         chunks.push(chunk);
 | |
|       });
 | |
| 
 | |
|       this.incomingMessage.on('end', () => {
 | |
|         resolve(Buffer.concat(chunks));
 | |
|       });
 | |
| 
 | |
|       this.incomingMessage.on('error', reject);
 | |
|     });
 | |
| 
 | |
|     return this.bodyBufferPromise;
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Parse response as JSON
 | |
|    */
 | |
|   async json(): Promise<T> {
 | |
|     const buffer = await this.collectBody();
 | |
|     const text = buffer.toString('utf-8');
 | |
| 
 | |
|     try {
 | |
|       return JSON.parse(text);
 | |
|     } catch (error) {
 | |
|       throw new Error(`Failed to parse JSON: ${error.message}`);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Get response as text
 | |
|    */
 | |
|   async text(): Promise<string> {
 | |
|     const buffer = await this.collectBody();
 | |
|     return buffer.toString('utf-8');
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Get response as ArrayBuffer
 | |
|    */
 | |
|   async arrayBuffer(): Promise<ArrayBuffer> {
 | |
|     const buffer = await this.collectBody();
 | |
|     return buffer.buffer.slice(
 | |
|       buffer.byteOffset,
 | |
|       buffer.byteOffset + buffer.byteLength,
 | |
|     );
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Get response as a web-style ReadableStream
 | |
|    */
 | |
|   stream(): ReadableStream<Uint8Array> | null {
 | |
|     this.ensureNotConsumed();
 | |
| 
 | |
|     // Convert Node.js stream to web stream
 | |
|     // In Node.js 16.5+ we can use Readable.toWeb()
 | |
|     if (this.incomingMessage.readableEnded || this.incomingMessage.destroyed) {
 | |
|       return null;
 | |
|     }
 | |
| 
 | |
|     // Create a web ReadableStream from the Node.js stream
 | |
|     const nodeStream = this.incomingMessage;
 | |
|     return new ReadableStream<Uint8Array>({
 | |
|       start(controller) {
 | |
|         nodeStream.on('data', (chunk) => {
 | |
|           controller.enqueue(new Uint8Array(chunk));
 | |
|         });
 | |
| 
 | |
|         nodeStream.on('end', () => {
 | |
|           controller.close();
 | |
|         });
 | |
| 
 | |
|         nodeStream.on('error', (err) => {
 | |
|           controller.error(err);
 | |
|         });
 | |
|       },
 | |
| 
 | |
|       cancel() {
 | |
|         nodeStream.destroy();
 | |
|       },
 | |
|     });
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Get response as a Node.js readable stream
 | |
|    */
 | |
|   streamNode(): NodeJS.ReadableStream {
 | |
|     this.ensureNotConsumed();
 | |
|     return this.incomingMessage;
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * Get the raw IncomingMessage (for legacy compatibility)
 | |
|    */
 | |
|   raw(): plugins.http.IncomingMessage {
 | |
|     return this.incomingMessage;
 | |
|   }
 | |
| }
 |