Files
smartrequest/ts/core_node/response.ts

174 lines
4.5 KiB
TypeScript

import * as plugins from './plugins.js';
import * as types from './types.js';
import { CoreResponse as AbstractCoreResponse } from '../core_base/response.js';
/**
* Node.js implementation of Core Response class that provides a fetch-like API
*/
export class CoreResponse<T = any>
extends AbstractCoreResponse<T>
implements types.INodeResponse<T>
{
private incomingMessage: plugins.http.IncomingMessage;
private bodyBufferPromise: Promise<Buffer> | null = null;
private _autoDrainTimeout: NodeJS.Immediate | null = null;
// Public properties
public readonly ok: boolean;
public readonly status: number;
public readonly statusText: string;
public readonly headers: plugins.http.IncomingHttpHeaders;
public readonly url: string;
constructor(
incomingMessage: plugins.http.IncomingMessage,
url: string,
options: types.ICoreRequestOptions = {},
) {
super();
this.incomingMessage = incomingMessage;
this.url = url;
this.status = incomingMessage.statusCode || 0;
this.statusText = incomingMessage.statusMessage || '';
this.ok = this.status >= 200 && this.status < 300;
this.headers = incomingMessage.headers;
// Auto-drain unconsumed streams to prevent socket hanging
// This prevents keep-alive sockets from timing out when response bodies aren't consumed
// Default to true if not specified
if (options.autoDrain !== false) {
this._autoDrainTimeout = setImmediate(() => {
if (!this.consumed && !this.incomingMessage.readableEnded) {
console.log(
`Auto-draining unconsumed response body for ${this.url} (status: ${this.status})`,
);
this.incomingMessage.resume(); // Drain without processing
}
});
}
}
/**
* Override to also cancel auto-drain when body is consumed
*/
protected ensureNotConsumed(): void {
// Cancel auto-drain since we're consuming the body
if (this._autoDrainTimeout) {
clearImmediate(this._autoDrainTimeout);
this._autoDrainTimeout = null;
}
super.ensureNotConsumed();
}
/**
* Collects the body as a buffer
*/
private async collectBody(): Promise<Buffer> {
this.ensureNotConsumed();
if (this.bodyBufferPromise) {
return this.bodyBufferPromise;
}
this.bodyBufferPromise = new Promise<Buffer>((resolve, reject) => {
const chunks: Buffer[] = [];
this.incomingMessage.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
this.incomingMessage.on('end', () => {
resolve(Buffer.concat(chunks));
});
this.incomingMessage.on('error', reject);
});
return this.bodyBufferPromise;
}
/**
* Parse response as JSON
*/
async json(): Promise<T> {
const buffer = await this.collectBody();
const text = buffer.toString('utf-8');
try {
return JSON.parse(text);
} catch (error) {
throw new Error(`Failed to parse JSON: ${error.message}`);
}
}
/**
* Get response as text
*/
async text(): Promise<string> {
const buffer = await this.collectBody();
return buffer.toString('utf-8');
}
/**
* Get response as ArrayBuffer
*/
async arrayBuffer(): Promise<ArrayBuffer> {
const buffer = await this.collectBody();
return buffer.buffer.slice(
buffer.byteOffset,
buffer.byteOffset + buffer.byteLength,
);
}
/**
* Get response as a web-style ReadableStream
*/
stream(): ReadableStream<Uint8Array> | null {
this.ensureNotConsumed();
// Convert Node.js stream to web stream
// In Node.js 16.5+ we can use Readable.toWeb()
if (this.incomingMessage.readableEnded || this.incomingMessage.destroyed) {
return null;
}
// Create a web ReadableStream from the Node.js stream
const nodeStream = this.incomingMessage;
return new ReadableStream<Uint8Array>({
start(controller) {
nodeStream.on('data', (chunk) => {
controller.enqueue(new Uint8Array(chunk));
});
nodeStream.on('end', () => {
controller.close();
});
nodeStream.on('error', (err) => {
controller.error(err);
});
},
cancel() {
nodeStream.destroy();
},
});
}
/**
* Get response as a Node.js readable stream
*/
streamNode(): NodeJS.ReadableStream {
this.ensureNotConsumed();
return this.incomingMessage;
}
/**
* Get the raw IncomingMessage (for legacy compatibility)
*/
raw(): plugins.http.IncomingMessage {
return this.incomingMessage;
}
}