/// import * as types from './types.js'; import { CoreResponse } from './response.js'; import { CoreRequest as AbstractCoreRequest } from '../core_base/request.js'; /** * Cache for HttpClient instances keyed by socket path * This prevents creating multiple clients for the same socket */ const httpClientCache = new Map(); /** * Deno implementation of Core Request class using native fetch with unix socket support via HttpClient */ export class CoreRequest extends AbstractCoreRequest< types.IDenoRequestOptions, CoreResponse > { private timeoutId: ReturnType | null = null; private abortController: AbortController | null = null; private createdClient: Deno.HttpClient | null = null; private requestDataFunc: ((req: any) => void) | null; constructor( url: string, options: types.IDenoRequestOptions = {}, requestDataFunc: ((req: any) => void) | null = null, ) { super(url, options); this.requestDataFunc = requestDataFunc; // Check for unsupported Node.js-specific options if (options.agent) { throw new Error( 'Node.js specific option (agent) is not supported in Deno implementation', ); } // Handle Node.js stream conversion if requestDataFunc is provided if (requestDataFunc && (options as any).__nodeStream) { // Convert Node.js stream to web ReadableStream for Deno const nodeStream = (options as any).__nodeStream; // Create web ReadableStream from Node.js stream this.options.requestBody = new ReadableStream({ async start(controller) { nodeStream.on('data', (chunk: any) => { controller.enqueue(new Uint8Array(chunk)); }); nodeStream.on('end', () => { controller.close(); }); nodeStream.on('error', (err: any) => { controller.error(err); }); }, }); } // Throw error if raw streaming function is provided (not supported in Deno) if (requestDataFunc && (options as any).__rawStreamFunc) { throw new Error( 'Raw streaming with .raw() is not supported in Deno. Use .stream() with web ReadableStream instead.', ); } } /** * Get or create an HttpClient for unix socket communication */ private getHttpClient(): Deno.HttpClient | undefined { // If client was explicitly provided, use it if (this.options.client) { return this.options.client; } // Check if we need a unix socket client const socketPath = this.options.socketPath || (CoreRequest.isUnixSocket(this.url) ? CoreRequest.parseUnixSocketUrl(this.url).socketPath : null); if (!socketPath) { return undefined; // Use default client } // Check cache first if (httpClientCache.has(socketPath)) { return httpClientCache.get(socketPath); } // Create new HttpClient for this socket const client = Deno.createHttpClient({ proxy: { url: `unix://${socketPath}`, }, }); // Cache it httpClientCache.set(socketPath, client); this.createdClient = client; return client; } /** * Build the full URL with query parameters */ private buildUrl(): string { // For unix sockets, we need to extract the HTTP path part if (CoreRequest.isUnixSocket(this.url)) { const { path } = CoreRequest.parseUnixSocketUrl(this.url); // Build URL for the HTTP request (the hostname doesn't matter for unix sockets) if ( !this.options.queryParams || Object.keys(this.options.queryParams).length === 0 ) { return `http://localhost${path}`; } const url = new URL(`http://localhost${path}`); Object.entries(this.options.queryParams).forEach(([key, value]) => { url.searchParams.append(key, value); }); return url.toString(); } // Regular HTTP/HTTPS URL if ( !this.options.queryParams || Object.keys(this.options.queryParams).length === 0 ) { return this.url; } const url = new URL(this.url); Object.entries(this.options.queryParams).forEach(([key, value]) => { url.searchParams.append(key, value); }); return url.toString(); } /** * Convert our options to fetch RequestInit */ private buildFetchOptions(): RequestInit & { client?: Deno.HttpClient } { const fetchOptions: RequestInit & { client?: Deno.HttpClient } = { method: this.options.method, headers: this.options.headers, credentials: this.options.credentials, mode: this.options.mode, cache: this.options.cache, redirect: this.options.redirect, referrer: this.options.referrer, referrerPolicy: this.options.referrerPolicy, integrity: this.options.integrity, keepalive: this.options.keepAlive, signal: this.options.signal, }; // Set the HttpClient (for unix sockets or custom configurations) const client = this.getHttpClient(); if (client) { fetchOptions.client = client; } // Handle request body if (this.options.requestBody !== undefined) { if ( typeof this.options.requestBody === 'string' || this.options.requestBody instanceof ArrayBuffer || this.options.requestBody instanceof Uint8Array || this.options.requestBody instanceof FormData || this.options.requestBody instanceof URLSearchParams || this.options.requestBody instanceof ReadableStream || // Check for Buffer (Deno provides Buffer via Node.js compatibility) (typeof Buffer !== 'undefined' && this.options.requestBody instanceof Buffer) ) { fetchOptions.body = this.options.requestBody as BodyInit; // If streaming, we need to set duplex mode if (this.options.requestBody instanceof ReadableStream) { (fetchOptions as any).duplex = 'half'; } } else { // Convert objects to JSON fetchOptions.body = JSON.stringify(this.options.requestBody); // Set content-type if not already set if (!fetchOptions.headers) { fetchOptions.headers = { 'Content-Type': 'application/json' }; } else if (fetchOptions.headers instanceof Headers) { if (!fetchOptions.headers.has('Content-Type')) { fetchOptions.headers.set('Content-Type', 'application/json'); } } else if ( typeof fetchOptions.headers === 'object' && !Array.isArray(fetchOptions.headers) ) { const headersObj = fetchOptions.headers as Record; if (!headersObj['Content-Type']) { headersObj['Content-Type'] = 'application/json'; } } } } // Handle timeout if (this.options.timeout || this.options.hardDataCuttingTimeout) { const timeout = this.options.hardDataCuttingTimeout || this.options.timeout; this.abortController = new AbortController(); this.timeoutId = setTimeout(() => { if (this.abortController) { this.abortController.abort(); } }, timeout); fetchOptions.signal = this.abortController.signal; } return fetchOptions; } /** * Fire the request and return a CoreResponse */ async fire(): Promise { const response = await this.fireCore(); return new CoreResponse(response); } /** * Fire the request and return the raw Response */ async fireCore(): Promise { const url = this.buildUrl(); const options = this.buildFetchOptions(); try { const response = await fetch(url, options); // Clear timeout on successful response this.clearTimeout(); return response; } catch (error) { // Clear timeout on error this.clearTimeout(); if (error.name === 'AbortError') { throw new Error('Request timed out'); } throw error; } } /** * Clear the timeout and abort controller * Note: We don't close the HttpClient here as it's cached for reuse */ private clearTimeout(): void { if (this.timeoutId) { clearTimeout(this.timeoutId); this.timeoutId = null; } if (this.abortController) { this.abortController = null; } } /** * Static factory method to create and fire a request */ static async create( url: string, options: types.IDenoRequestOptions = {}, ): Promise { const request = new CoreRequest(url, options); return request.fire(); } /** * Static method to clear the HttpClient cache * Call this when you want to force new clients to be created */ static clearClientCache(): void { httpClientCache.forEach((client) => { client.close(); }); httpClientCache.clear(); } } /** * Convenience exports for backward compatibility */ export const isUnixSocket = CoreRequest.isUnixSocket; export const parseUnixSocketUrl = CoreRequest.parseUnixSocketUrl;