250 lines
7.9 KiB
TypeScript
250 lines
7.9 KiB
TypeScript
import * as types from './types.js';
|
|
import { CoreResponse } from './response.js';
|
|
import { CoreRequest as AbstractCoreRequest } from '../core_base/request.js';
|
|
|
|
/**
|
|
* Bun implementation of Core Request class using native fetch with unix socket support
|
|
*/
|
|
export class CoreRequest extends AbstractCoreRequest<
|
|
types.IBunRequestOptions,
|
|
CoreResponse
|
|
> {
|
|
private timeoutId: ReturnType<typeof setTimeout> | null = null;
|
|
private abortController: AbortController | null = null;
|
|
private requestDataFunc: ((req: any) => void) | null;
|
|
|
|
constructor(
|
|
url: string,
|
|
options: types.IBunRequestOptions = {},
|
|
requestDataFunc: ((req: any) => void) | null = null,
|
|
) {
|
|
super(url, options);
|
|
this.requestDataFunc = requestDataFunc;
|
|
|
|
// Check for unsupported Node.js-specific options
|
|
if (options.agent) {
|
|
throw new Error(
|
|
'Node.js specific option (agent) is not supported in Bun implementation',
|
|
);
|
|
}
|
|
|
|
// Handle Node.js stream conversion if requestDataFunc is provided
|
|
if (requestDataFunc && (options as any).__nodeStream) {
|
|
// Convert Node.js stream to web ReadableStream for Bun
|
|
const nodeStream = (options as any).__nodeStream;
|
|
|
|
// Bun can handle Node.js streams via Readable.toWeb if available
|
|
// Or we can create a web stream that reads from the Node stream
|
|
if (typeof (nodeStream as any).toWeb === 'function') {
|
|
this.options.requestBody = (nodeStream as any).toWeb();
|
|
} else {
|
|
// Create web ReadableStream from Node.js stream
|
|
this.options.requestBody = new ReadableStream({
|
|
async start(controller) {
|
|
nodeStream.on('data', (chunk: any) => {
|
|
controller.enqueue(new Uint8Array(chunk));
|
|
});
|
|
nodeStream.on('end', () => {
|
|
controller.close();
|
|
});
|
|
nodeStream.on('error', (err: any) => {
|
|
controller.error(err);
|
|
});
|
|
},
|
|
});
|
|
}
|
|
}
|
|
|
|
// Warn if raw streaming function is provided (not supported in Bun)
|
|
if (requestDataFunc && (options as any).__rawStreamFunc) {
|
|
throw new Error(
|
|
'Raw streaming with .raw() is not supported in Bun. Use .stream() with web ReadableStream instead.',
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Build the full URL with query parameters
|
|
*/
|
|
private buildUrl(): string {
|
|
// For unix sockets, we need to extract the HTTP path part
|
|
if (CoreRequest.isUnixSocket(this.url)) {
|
|
const { path } = CoreRequest.parseUnixSocketUrl(this.url);
|
|
|
|
// Build URL for the HTTP request (the hostname doesn't matter for unix sockets)
|
|
if (
|
|
!this.options.queryParams ||
|
|
Object.keys(this.options.queryParams).length === 0
|
|
) {
|
|
return `http://localhost${path}`;
|
|
}
|
|
|
|
const url = new URL(`http://localhost${path}`);
|
|
Object.entries(this.options.queryParams).forEach(([key, value]) => {
|
|
url.searchParams.append(key, value);
|
|
});
|
|
return url.toString();
|
|
}
|
|
|
|
// Regular HTTP/HTTPS URL
|
|
if (
|
|
!this.options.queryParams ||
|
|
Object.keys(this.options.queryParams).length === 0
|
|
) {
|
|
return this.url;
|
|
}
|
|
|
|
const url = new URL(this.url);
|
|
Object.entries(this.options.queryParams).forEach(([key, value]) => {
|
|
url.searchParams.append(key, value);
|
|
});
|
|
return url.toString();
|
|
}
|
|
|
|
/**
|
|
* Convert our options to fetch RequestInit with Bun-specific extensions
|
|
*/
|
|
private buildFetchOptions(): RequestInit & { unix?: string } {
|
|
const fetchOptions: RequestInit & { unix?: string } = {
|
|
method: this.options.method,
|
|
headers: this.options.headers,
|
|
credentials: this.options.credentials,
|
|
mode: this.options.mode,
|
|
cache: this.options.cache,
|
|
redirect: this.options.redirect,
|
|
referrer: this.options.referrer,
|
|
referrerPolicy: this.options.referrerPolicy,
|
|
integrity: this.options.integrity,
|
|
keepalive: this.options.keepAlive,
|
|
signal: this.options.signal,
|
|
};
|
|
|
|
// Handle unix socket
|
|
if (CoreRequest.isUnixSocket(this.url)) {
|
|
const { socketPath } = CoreRequest.parseUnixSocketUrl(this.url);
|
|
fetchOptions.unix = socketPath;
|
|
} else if (this.options.unix) {
|
|
// Direct unix option was provided
|
|
fetchOptions.unix = this.options.unix;
|
|
} else if (this.options.socketPath) {
|
|
// Legacy Node.js socketPath option - convert to Bun's unix option
|
|
fetchOptions.unix = this.options.socketPath;
|
|
}
|
|
|
|
// Handle request body
|
|
if (this.options.requestBody !== undefined) {
|
|
if (
|
|
typeof this.options.requestBody === 'string' ||
|
|
this.options.requestBody instanceof ArrayBuffer ||
|
|
this.options.requestBody instanceof Uint8Array ||
|
|
this.options.requestBody instanceof FormData ||
|
|
this.options.requestBody instanceof URLSearchParams ||
|
|
this.options.requestBody instanceof ReadableStream ||
|
|
// Check for Buffer (Bun supports Node.js Buffer)
|
|
(typeof Buffer !== 'undefined' && this.options.requestBody instanceof Buffer)
|
|
) {
|
|
fetchOptions.body = this.options.requestBody as BodyInit;
|
|
|
|
// If streaming, we need to set duplex mode
|
|
if (this.options.requestBody instanceof ReadableStream) {
|
|
(fetchOptions as any).duplex = 'half';
|
|
}
|
|
} else {
|
|
// Convert objects to JSON
|
|
fetchOptions.body = JSON.stringify(this.options.requestBody);
|
|
// Set content-type if not already set
|
|
if (!fetchOptions.headers) {
|
|
fetchOptions.headers = { 'Content-Type': 'application/json' };
|
|
} else if (fetchOptions.headers instanceof Headers) {
|
|
if (!fetchOptions.headers.has('Content-Type')) {
|
|
fetchOptions.headers.set('Content-Type', 'application/json');
|
|
}
|
|
} else if (
|
|
typeof fetchOptions.headers === 'object' &&
|
|
!Array.isArray(fetchOptions.headers)
|
|
) {
|
|
const headersObj = fetchOptions.headers as Record<string, string>;
|
|
if (!headersObj['Content-Type']) {
|
|
headersObj['Content-Type'] = 'application/json';
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Handle timeout
|
|
if (this.options.timeout || this.options.hardDataCuttingTimeout) {
|
|
const timeout =
|
|
this.options.hardDataCuttingTimeout || this.options.timeout;
|
|
this.abortController = new AbortController();
|
|
this.timeoutId = setTimeout(() => {
|
|
if (this.abortController) {
|
|
this.abortController.abort();
|
|
}
|
|
}, timeout);
|
|
fetchOptions.signal = this.abortController.signal;
|
|
}
|
|
|
|
return fetchOptions;
|
|
}
|
|
|
|
/**
|
|
* Fire the request and return a CoreResponse
|
|
*/
|
|
async fire(): Promise<CoreResponse> {
|
|
const response = await this.fireCore();
|
|
return new CoreResponse(response);
|
|
}
|
|
|
|
/**
|
|
* Fire the request and return the raw Response
|
|
*/
|
|
async fireCore(): Promise<Response> {
|
|
const url = this.buildUrl();
|
|
const options = this.buildFetchOptions();
|
|
|
|
try {
|
|
const response = await fetch(url, options);
|
|
// Clear timeout on successful response
|
|
this.clearTimeout();
|
|
return response;
|
|
} catch (error) {
|
|
// Clear timeout on error
|
|
this.clearTimeout();
|
|
if (error.name === 'AbortError') {
|
|
throw new Error('Request timed out');
|
|
}
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Clear the timeout and abort controller
|
|
*/
|
|
private clearTimeout(): void {
|
|
if (this.timeoutId) {
|
|
clearTimeout(this.timeoutId);
|
|
this.timeoutId = null;
|
|
}
|
|
if (this.abortController) {
|
|
this.abortController = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Static factory method to create and fire a request
|
|
*/
|
|
static async create(
|
|
url: string,
|
|
options: types.IBunRequestOptions = {},
|
|
): Promise<CoreResponse> {
|
|
const request = new CoreRequest(url, options);
|
|
return request.fire();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Convenience exports for backward compatibility
|
|
*/
|
|
export const isUnixSocket = CoreRequest.isUnixSocket;
|
|
export const parseUnixSocketUrl = CoreRequest.parseUnixSocketUrl;
|