174 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
			
		
		
	
	
			174 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
import * as plugins from './plugins.js';
 | 
						|
import * as types from './types.js';
 | 
						|
import { CoreResponse as AbstractCoreResponse } from '../core_base/response.js';
 | 
						|
 | 
						|
/**
 | 
						|
 * Node.js implementation of Core Response class that provides a fetch-like API
 | 
						|
 */
 | 
						|
export class CoreResponse<T = any>
 | 
						|
  extends AbstractCoreResponse<T>
 | 
						|
  implements types.INodeResponse<T>
 | 
						|
{
 | 
						|
  private incomingMessage: plugins.http.IncomingMessage;
 | 
						|
  private bodyBufferPromise: Promise<Buffer> | null = null;
 | 
						|
  private _autoDrainTimeout: NodeJS.Immediate | null = null;
 | 
						|
 | 
						|
  // Public properties
 | 
						|
  public readonly ok: boolean;
 | 
						|
  public readonly status: number;
 | 
						|
  public readonly statusText: string;
 | 
						|
  public readonly headers: plugins.http.IncomingHttpHeaders;
 | 
						|
  public readonly url: string;
 | 
						|
 | 
						|
  constructor(
 | 
						|
    incomingMessage: plugins.http.IncomingMessage,
 | 
						|
    url: string,
 | 
						|
    options: types.ICoreRequestOptions = {},
 | 
						|
  ) {
 | 
						|
    super();
 | 
						|
    this.incomingMessage = incomingMessage;
 | 
						|
    this.url = url;
 | 
						|
    this.status = incomingMessage.statusCode || 0;
 | 
						|
    this.statusText = incomingMessage.statusMessage || '';
 | 
						|
    this.ok = this.status >= 200 && this.status < 300;
 | 
						|
    this.headers = incomingMessage.headers;
 | 
						|
 | 
						|
    // Auto-drain unconsumed streams to prevent socket hanging
 | 
						|
    // This prevents keep-alive sockets from timing out when response bodies aren't consumed
 | 
						|
    // Default to true if not specified
 | 
						|
    if (options.autoDrain !== false) {
 | 
						|
      this._autoDrainTimeout = setImmediate(() => {
 | 
						|
        if (!this.consumed && !this.incomingMessage.readableEnded) {
 | 
						|
          console.log(
 | 
						|
            `Auto-draining unconsumed response body for ${this.url} (status: ${this.status})`,
 | 
						|
          );
 | 
						|
          this.incomingMessage.resume(); // Drain without processing
 | 
						|
        }
 | 
						|
      });
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Override to also cancel auto-drain when body is consumed
 | 
						|
   */
 | 
						|
  protected ensureNotConsumed(): void {
 | 
						|
    // Cancel auto-drain since we're consuming the body
 | 
						|
    if (this._autoDrainTimeout) {
 | 
						|
      clearImmediate(this._autoDrainTimeout);
 | 
						|
      this._autoDrainTimeout = null;
 | 
						|
    }
 | 
						|
 | 
						|
    super.ensureNotConsumed();
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Collects the body as a buffer
 | 
						|
   */
 | 
						|
  private async collectBody(): Promise<Buffer> {
 | 
						|
    this.ensureNotConsumed();
 | 
						|
 | 
						|
    if (this.bodyBufferPromise) {
 | 
						|
      return this.bodyBufferPromise;
 | 
						|
    }
 | 
						|
 | 
						|
    this.bodyBufferPromise = new Promise<Buffer>((resolve, reject) => {
 | 
						|
      const chunks: Buffer[] = [];
 | 
						|
 | 
						|
      this.incomingMessage.on('data', (chunk: Buffer) => {
 | 
						|
        chunks.push(chunk);
 | 
						|
      });
 | 
						|
 | 
						|
      this.incomingMessage.on('end', () => {
 | 
						|
        resolve(Buffer.concat(chunks));
 | 
						|
      });
 | 
						|
 | 
						|
      this.incomingMessage.on('error', reject);
 | 
						|
    });
 | 
						|
 | 
						|
    return this.bodyBufferPromise;
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Parse response as JSON
 | 
						|
   */
 | 
						|
  async json(): Promise<T> {
 | 
						|
    const buffer = await this.collectBody();
 | 
						|
    const text = buffer.toString('utf-8');
 | 
						|
 | 
						|
    try {
 | 
						|
      return JSON.parse(text);
 | 
						|
    } catch (error) {
 | 
						|
      throw new Error(`Failed to parse JSON: ${error.message}`);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Get response as text
 | 
						|
   */
 | 
						|
  async text(): Promise<string> {
 | 
						|
    const buffer = await this.collectBody();
 | 
						|
    return buffer.toString('utf-8');
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Get response as ArrayBuffer
 | 
						|
   */
 | 
						|
  async arrayBuffer(): Promise<ArrayBuffer> {
 | 
						|
    const buffer = await this.collectBody();
 | 
						|
    return buffer.buffer.slice(
 | 
						|
      buffer.byteOffset,
 | 
						|
      buffer.byteOffset + buffer.byteLength,
 | 
						|
    );
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Get response as a web-style ReadableStream
 | 
						|
   */
 | 
						|
  stream(): ReadableStream<Uint8Array> | null {
 | 
						|
    this.ensureNotConsumed();
 | 
						|
 | 
						|
    // Convert Node.js stream to web stream
 | 
						|
    // In Node.js 16.5+ we can use Readable.toWeb()
 | 
						|
    if (this.incomingMessage.readableEnded || this.incomingMessage.destroyed) {
 | 
						|
      return null;
 | 
						|
    }
 | 
						|
 | 
						|
    // Create a web ReadableStream from the Node.js stream
 | 
						|
    const nodeStream = this.incomingMessage;
 | 
						|
    return new ReadableStream<Uint8Array>({
 | 
						|
      start(controller) {
 | 
						|
        nodeStream.on('data', (chunk) => {
 | 
						|
          controller.enqueue(new Uint8Array(chunk));
 | 
						|
        });
 | 
						|
 | 
						|
        nodeStream.on('end', () => {
 | 
						|
          controller.close();
 | 
						|
        });
 | 
						|
 | 
						|
        nodeStream.on('error', (err) => {
 | 
						|
          controller.error(err);
 | 
						|
        });
 | 
						|
      },
 | 
						|
 | 
						|
      cancel() {
 | 
						|
        nodeStream.destroy();
 | 
						|
      },
 | 
						|
    });
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Get response as a Node.js readable stream
 | 
						|
   */
 | 
						|
  streamNode(): NodeJS.ReadableStream {
 | 
						|
    this.ensureNotConsumed();
 | 
						|
    return this.incomingMessage;
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Get the raw IncomingMessage (for legacy compatibility)
 | 
						|
   */
 | 
						|
  raw(): plugins.http.IncomingMessage {
 | 
						|
    return this.incomingMessage;
 | 
						|
  }
 | 
						|
}
 |