fix(socket-handler): Fix socket handler race condition by differentiating between async and sync handlers. Now, async socket handlers complete their setup before initial data is emitted, ensuring that no data is lost. Documentation and tests have been updated to reflect this change.

This commit is contained in:
2025-05-29 00:24:57 +00:00
parent d42fa8b1e9
commit e4aade4a9a
11 changed files with 1338 additions and 6 deletions

View File

@ -6,7 +6,12 @@ import type { PortRange } from '../../../proxies/nftables-proxy/models/interface
/**
* Supported action types for route configurations
*/
export type TRouteActionType = 'forward' | 'redirect' | 'block' | 'static';
export type TRouteActionType = 'forward' | 'redirect' | 'block' | 'static' | 'socket-handler';
/**
* Socket handler function type
*/
export type TSocketHandler = (socket: plugins.net.Socket) => void | Promise<void>;
/**
* TLS handling modes for route configurations
@ -297,6 +302,9 @@ export interface IRouteAction {
// Handler function for static routes
handler?: (context: IRouteContext) => Promise<IStaticResponse>;
// Socket handler function (when type is 'socket-handler')
socketHandler?: TSocketHandler;
}
/**

View File

@ -399,6 +399,15 @@ export class RouteConnectionHandler {
this.handleStaticAction(socket, record, route, initialChunk);
return;
case 'socket-handler':
logger.log('info', `Handling socket-handler action for route ${route.name}`, {
connectionId,
routeName: route.name,
component: 'route-handler'
});
this.handleSocketHandlerAction(socket, record, route, initialChunk);
return;
default:
logger.log('error', `Unknown action type '${(route.action as any).type}' for connection ${connectionId}`, {
connectionId,
@ -776,6 +785,75 @@ export class RouteConnectionHandler {
}, record, initialChunk);
}
/**
* Handle a socket-handler action for a route
*/
private async handleSocketHandlerAction(
socket: plugins.net.Socket,
record: IConnectionRecord,
route: IRouteConfig,
initialChunk?: Buffer
): Promise<void> {
const connectionId = record.id;
if (!route.action.socketHandler) {
logger.log('error', 'socket-handler action missing socketHandler function', {
connectionId,
routeName: route.name,
component: 'route-handler'
});
socket.destroy();
this.connectionManager.cleanupConnection(record, 'missing_handler');
return;
}
try {
// Call the handler
const result = route.action.socketHandler(socket);
// Handle async handlers properly
if (result instanceof Promise) {
result
.then(() => {
// Emit initial chunk after async handler completes
if (initialChunk && initialChunk.length > 0) {
socket.emit('data', initialChunk);
}
})
.catch(error => {
logger.log('error', 'Socket handler error', {
connectionId,
routeName: route.name,
error: error.message,
component: 'route-handler'
});
if (!socket.destroyed) {
socket.destroy();
}
this.connectionManager.cleanupConnection(record, 'handler_error');
});
} else {
// For sync handlers, emit on next tick
if (initialChunk && initialChunk.length > 0) {
process.nextTick(() => {
socket.emit('data', initialChunk);
});
}
}
} catch (error) {
logger.log('error', 'Socket handler error', {
connectionId,
routeName: route.name,
error: error.message,
component: 'route-handler'
});
if (!socket.destroyed) {
socket.destroy();
}
this.connectionManager.cleanupConnection(record, 'handler_error');
}
}
/**
* Setup improved error handling for the outgoing connection
*/

View File

@ -19,6 +19,7 @@
* - NFTables routes (createNfTablesRoute, createNfTablesTerminateRoute)
*/
import * as plugins from '../../../plugins.js';
import type { IRouteConfig, IRouteMatch, IRouteAction, IRouteTarget, TPortRange, IRouteContext } from '../models/route-types.js';
/**
@ -810,4 +811,99 @@ export function createCompleteNfTablesHttpsServer(
);
return [httpsRoute, httpRedirectRoute];
}
}
/**
* Create a socket handler route configuration
* @param domains Domain(s) to match
* @param ports Port(s) to listen on
* @param handler Socket handler function
* @param options Additional route options
* @returns Route configuration object
*/
export function createSocketHandlerRoute(
domains: string | string[],
ports: TPortRange,
handler: (socket: plugins.net.Socket) => void | Promise<void>,
options: {
name?: string;
priority?: number;
path?: string;
} = {}
): IRouteConfig {
return {
name: options.name || 'socket-handler-route',
priority: options.priority !== undefined ? options.priority : 50,
match: {
domains,
ports,
...(options.path && { path: options.path })
},
action: {
type: 'socket-handler',
socketHandler: handler
}
};
}
/**
* Pre-built socket handlers for common use cases
*/
export const SocketHandlers = {
/**
* Simple echo server handler
*/
echo: (socket: plugins.net.Socket) => {
socket.write('ECHO SERVER READY\n');
socket.on('data', data => socket.write(data));
},
/**
* TCP proxy handler
*/
proxy: (targetHost: string, targetPort: number) => (socket: plugins.net.Socket) => {
const target = plugins.net.connect(targetPort, targetHost);
socket.pipe(target);
target.pipe(socket);
socket.on('close', () => target.destroy());
target.on('close', () => socket.destroy());
target.on('error', (err) => {
console.error('Proxy target error:', err);
socket.destroy();
});
},
/**
* Line-based protocol handler
*/
lineProtocol: (handler: (line: string, socket: plugins.net.Socket) => void) => (socket: plugins.net.Socket) => {
let buffer = '';
socket.on('data', (data) => {
buffer += data.toString();
const lines = buffer.split('\n');
buffer = lines.pop() || '';
lines.forEach(line => {
if (line.trim()) {
handler(line.trim(), socket);
}
});
});
},
/**
* Simple HTTP response handler (for testing)
*/
httpResponse: (statusCode: number, body: string) => (socket: plugins.net.Socket) => {
const response = [
`HTTP/1.1 ${statusCode} ${statusCode === 200 ? 'OK' : 'Error'}`,
'Content-Type: text/plain',
`Content-Length: ${body.length}`,
'Connection: close',
'',
body
].join('\r\n');
socket.write(response);
socket.end();
}
};