This commit is contained in:
2025-07-29 15:44:04 +00:00
parent b4769e7feb
commit 1991308d4a
8 changed files with 114 additions and 36 deletions

View File

@@ -195,6 +195,15 @@ export class SmartRequest<T = any> {
return this;
}
/**
* Enable or disable auto-drain for unconsumed response bodies (Node.js only)
* Default is true to prevent socket hanging
*/
autoDrain(enabled: boolean): this {
this._options.autoDrain = enabled;
return this;
}
/**
* Set the Accept header to indicate what content type is expected
*/

View File

@@ -38,6 +38,7 @@ export interface ICoreRequestOptions {
queryParams?: { [key: string]: string };
timeout?: number;
hardDataCuttingTimeout?: number;
autoDrain?: boolean; // Auto-drain unconsumed responses (Node.js only, default: true)
// Node.js specific options (ignored in fetch implementation)
agent?: any;

View File

@@ -52,7 +52,7 @@ export class CoreRequest extends AbstractCoreRequest<types.ICoreRequestOptions,
*/
async fire(): Promise<CoreResponse> {
const incomingMessage = await this.fireCore();
return new CoreResponse(incomingMessage, this.url);
return new CoreResponse(incomingMessage, this.url, this.options);
}
/**

View File

@@ -8,6 +8,7 @@ import { CoreResponse as AbstractCoreResponse } from '../core_base/response.js';
export class CoreResponse<T = any> extends AbstractCoreResponse<T> implements types.INodeResponse<T> {
private incomingMessage: plugins.http.IncomingMessage;
private bodyBufferPromise: Promise<Buffer> | null = null;
private _autoDrainTimeout: NodeJS.Immediate | null = null;
// Public properties
public readonly ok: boolean;
@@ -16,7 +17,7 @@ export class CoreResponse<T = any> extends AbstractCoreResponse<T> implements ty
public readonly headers: plugins.http.IncomingHttpHeaders;
public readonly url: string;
constructor(incomingMessage: plugins.http.IncomingMessage, url: string) {
constructor(incomingMessage: plugins.http.IncomingMessage, url: string, options: types.ICoreRequestOptions = {}) {
super();
this.incomingMessage = incomingMessage;
this.url = url;
@@ -24,6 +25,31 @@ export class CoreResponse<T = any> extends AbstractCoreResponse<T> implements ty
this.statusText = incomingMessage.statusMessage || '';
this.ok = this.status >= 200 && this.status < 300;
this.headers = incomingMessage.headers;
// Auto-drain unconsumed streams to prevent socket hanging
// This prevents keep-alive sockets from timing out when response bodies aren't consumed
// Default to true if not specified
if (options.autoDrain !== false) {
this._autoDrainTimeout = setImmediate(() => {
if (!this.consumed && !this.incomingMessage.readableEnded) {
console.log(`Auto-draining unconsumed response body for ${this.url} (status: ${this.status})`);
this.incomingMessage.resume(); // Drain without processing
}
});
}
}
/**
* Override to also cancel auto-drain when body is consumed
*/
protected ensureNotConsumed(): void {
// Cancel auto-drain since we're consuming the body
if (this._autoDrainTimeout) {
clearImmediate(this._autoDrainTimeout);
this._autoDrainTimeout = null;
}
super.ensureNotConsumed();
}
/**