2026-01-31 11:33:11 +00:00
|
|
|
import * as net from 'net';
|
2026-02-03 16:42:49 +00:00
|
|
|
import * as fs from 'fs/promises';
|
2026-02-01 23:33:35 +00:00
|
|
|
import * as plugins from '../plugins.js';
|
2026-01-31 11:33:11 +00:00
|
|
|
import { WireProtocol, OP_QUERY } from './WireProtocol.js';
|
|
|
|
|
import { CommandRouter } from './CommandRouter.js';
|
|
|
|
|
import { MemoryStorageAdapter } from '../storage/MemoryStorageAdapter.js';
|
|
|
|
|
import { FileStorageAdapter } from '../storage/FileStorageAdapter.js';
|
|
|
|
|
import type { IStorageAdapter } from '../storage/IStorageAdapter.js';
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Server configuration options
|
|
|
|
|
*/
|
2026-02-01 14:34:07 +00:00
|
|
|
export interface ITsmdbServerOptions {
|
2026-02-03 16:42:49 +00:00
|
|
|
/** Port to listen on (default: 27017) - ignored if socketPath is set */
|
2026-01-31 11:33:11 +00:00
|
|
|
port?: number;
|
2026-02-03 16:42:49 +00:00
|
|
|
/** Host to bind to (default: 127.0.0.1) - ignored if socketPath is set */
|
2026-01-31 11:33:11 +00:00
|
|
|
host?: string;
|
2026-02-03 16:42:49 +00:00
|
|
|
/** Unix socket path - if set, server listens on socket instead of TCP */
|
|
|
|
|
socketPath?: string;
|
2026-01-31 11:33:11 +00:00
|
|
|
/** Storage type: 'memory' or 'file' (default: 'memory') */
|
|
|
|
|
storage?: 'memory' | 'file';
|
|
|
|
|
/** Path for file storage (required if storage is 'file') */
|
|
|
|
|
storagePath?: string;
|
|
|
|
|
/** Enable persistence for memory storage */
|
|
|
|
|
persistPath?: string;
|
|
|
|
|
/** Persistence interval in ms (default: 60000) */
|
|
|
|
|
persistIntervalMs?: number;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Connection state for each client
|
|
|
|
|
*/
|
|
|
|
|
interface IConnectionState {
|
|
|
|
|
id: number;
|
|
|
|
|
socket: net.Socket;
|
|
|
|
|
buffer: Buffer;
|
|
|
|
|
authenticated: boolean;
|
|
|
|
|
database: string;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2026-02-01 14:34:07 +00:00
|
|
|
* TsmdbServer - MongoDB Wire Protocol compatible server
|
2026-01-31 11:33:11 +00:00
|
|
|
*
|
|
|
|
|
* This server implements the MongoDB wire protocol (OP_MSG) to allow
|
|
|
|
|
* official MongoDB drivers to connect and perform operations.
|
|
|
|
|
*
|
|
|
|
|
* @example
|
|
|
|
|
* ```typescript
|
2026-02-01 14:34:07 +00:00
|
|
|
* import { TsmdbServer } from '@push.rocks/smartmongo/tsmdb';
|
2026-01-31 11:33:11 +00:00
|
|
|
* import { MongoClient } from 'mongodb';
|
|
|
|
|
*
|
2026-02-01 14:34:07 +00:00
|
|
|
* const server = new TsmdbServer({ port: 27017 });
|
2026-01-31 11:33:11 +00:00
|
|
|
* await server.start();
|
|
|
|
|
*
|
|
|
|
|
* const client = new MongoClient('mongodb://127.0.0.1:27017');
|
|
|
|
|
* await client.connect();
|
|
|
|
|
* ```
|
|
|
|
|
*/
|
2026-02-01 14:34:07 +00:00
|
|
|
export class TsmdbServer {
|
2026-02-03 16:42:49 +00:00
|
|
|
private options: Required<Omit<ITsmdbServerOptions, 'socketPath'>> & { socketPath: string };
|
2026-01-31 11:33:11 +00:00
|
|
|
private server: net.Server | null = null;
|
|
|
|
|
private storage: IStorageAdapter;
|
|
|
|
|
private commandRouter: CommandRouter;
|
|
|
|
|
private connections: Map<number, IConnectionState> = new Map();
|
|
|
|
|
private connectionIdCounter = 0;
|
|
|
|
|
private isRunning = false;
|
|
|
|
|
private startTime: Date = new Date();
|
2026-02-03 16:42:49 +00:00
|
|
|
private useSocket: boolean;
|
2026-01-31 11:33:11 +00:00
|
|
|
|
2026-02-01 14:34:07 +00:00
|
|
|
constructor(options: ITsmdbServerOptions = {}) {
|
2026-02-03 16:42:49 +00:00
|
|
|
this.useSocket = !!options.socketPath;
|
2026-01-31 11:33:11 +00:00
|
|
|
this.options = {
|
|
|
|
|
port: options.port ?? 27017,
|
|
|
|
|
host: options.host ?? '127.0.0.1',
|
2026-02-03 16:42:49 +00:00
|
|
|
socketPath: options.socketPath ?? '',
|
2026-01-31 11:33:11 +00:00
|
|
|
storage: options.storage ?? 'memory',
|
|
|
|
|
storagePath: options.storagePath ?? './data',
|
|
|
|
|
persistPath: options.persistPath ?? '',
|
|
|
|
|
persistIntervalMs: options.persistIntervalMs ?? 60000,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Create storage adapter
|
|
|
|
|
if (this.options.storage === 'file') {
|
|
|
|
|
this.storage = new FileStorageAdapter(this.options.storagePath);
|
|
|
|
|
} else {
|
|
|
|
|
this.storage = new MemoryStorageAdapter({
|
|
|
|
|
persistPath: this.options.persistPath || undefined,
|
|
|
|
|
persistIntervalMs: this.options.persistPath ? this.options.persistIntervalMs : undefined,
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create command router
|
|
|
|
|
this.commandRouter = new CommandRouter(this.storage, this);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the storage adapter (for testing/debugging)
|
|
|
|
|
*/
|
|
|
|
|
getStorage(): IStorageAdapter {
|
|
|
|
|
return this.storage;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get server uptime in seconds
|
|
|
|
|
*/
|
|
|
|
|
getUptime(): number {
|
|
|
|
|
return Math.floor((Date.now() - this.startTime.getTime()) / 1000);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get current connection count
|
|
|
|
|
*/
|
|
|
|
|
getConnectionCount(): number {
|
|
|
|
|
return this.connections.size;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Start the server
|
|
|
|
|
*/
|
|
|
|
|
async start(): Promise<void> {
|
|
|
|
|
if (this.isRunning) {
|
|
|
|
|
throw new Error('Server is already running');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Initialize storage
|
|
|
|
|
await this.storage.initialize();
|
|
|
|
|
|
2026-02-03 16:42:49 +00:00
|
|
|
// Clean up stale socket file if using Unix socket
|
|
|
|
|
if (this.useSocket && this.options.socketPath) {
|
|
|
|
|
try {
|
|
|
|
|
await fs.unlink(this.options.socketPath);
|
|
|
|
|
} catch (err: any) {
|
|
|
|
|
// Ignore ENOENT (file doesn't exist)
|
|
|
|
|
if (err.code !== 'ENOENT') {
|
|
|
|
|
throw err;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-01-31 11:33:11 +00:00
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
|
this.server = net.createServer((socket) => {
|
|
|
|
|
this.handleConnection(socket);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
this.server.on('error', (err) => {
|
|
|
|
|
if (!this.isRunning) {
|
|
|
|
|
reject(err);
|
|
|
|
|
} else {
|
|
|
|
|
console.error('Server error:', err);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
2026-02-03 16:42:49 +00:00
|
|
|
if (this.useSocket && this.options.socketPath) {
|
|
|
|
|
// Listen on Unix socket
|
|
|
|
|
this.server.listen(this.options.socketPath, () => {
|
|
|
|
|
this.isRunning = true;
|
|
|
|
|
this.startTime = new Date();
|
|
|
|
|
resolve();
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
// Listen on TCP
|
|
|
|
|
this.server.listen(this.options.port, this.options.host, () => {
|
|
|
|
|
this.isRunning = true;
|
|
|
|
|
this.startTime = new Date();
|
|
|
|
|
resolve();
|
|
|
|
|
});
|
|
|
|
|
}
|
2026-01-31 11:33:11 +00:00
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Stop the server
|
|
|
|
|
*/
|
|
|
|
|
async stop(): Promise<void> {
|
|
|
|
|
if (!this.isRunning || !this.server) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close all connections
|
|
|
|
|
for (const conn of this.connections.values()) {
|
|
|
|
|
conn.socket.destroy();
|
|
|
|
|
}
|
|
|
|
|
this.connections.clear();
|
|
|
|
|
|
2026-02-01 16:02:03 +00:00
|
|
|
// Close command router (cleans up session engine, cursors, etc.)
|
|
|
|
|
this.commandRouter.close();
|
|
|
|
|
|
2026-01-31 11:33:11 +00:00
|
|
|
// Close storage
|
|
|
|
|
await this.storage.close();
|
|
|
|
|
|
|
|
|
|
return new Promise((resolve) => {
|
2026-02-03 16:42:49 +00:00
|
|
|
this.server!.close(async () => {
|
2026-01-31 11:33:11 +00:00
|
|
|
this.isRunning = false;
|
|
|
|
|
this.server = null;
|
2026-02-03 16:42:49 +00:00
|
|
|
|
|
|
|
|
// Clean up socket file if using Unix socket
|
|
|
|
|
if (this.useSocket && this.options.socketPath) {
|
|
|
|
|
try {
|
|
|
|
|
await fs.unlink(this.options.socketPath);
|
|
|
|
|
} catch (err: any) {
|
|
|
|
|
// Ignore ENOENT (file doesn't exist)
|
|
|
|
|
if (err.code !== 'ENOENT') {
|
|
|
|
|
console.error('Failed to remove socket file:', err);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-01-31 11:33:11 +00:00
|
|
|
resolve();
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle a new client connection
|
|
|
|
|
*/
|
|
|
|
|
private handleConnection(socket: net.Socket): void {
|
|
|
|
|
const connectionId = ++this.connectionIdCounter;
|
|
|
|
|
|
|
|
|
|
const state: IConnectionState = {
|
|
|
|
|
id: connectionId,
|
|
|
|
|
socket,
|
|
|
|
|
buffer: Buffer.alloc(0),
|
|
|
|
|
authenticated: true, // No auth required for now
|
|
|
|
|
database: 'test',
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
this.connections.set(connectionId, state);
|
|
|
|
|
|
|
|
|
|
socket.on('data', (data) => {
|
|
|
|
|
this.handleData(state, Buffer.isBuffer(data) ? data : Buffer.from(data));
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
socket.on('close', () => {
|
|
|
|
|
this.connections.delete(connectionId);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
socket.on('error', (err) => {
|
|
|
|
|
// Connection errors are expected when clients disconnect
|
|
|
|
|
this.connections.delete(connectionId);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle incoming data from a client
|
|
|
|
|
*/
|
|
|
|
|
private handleData(state: IConnectionState, data: Buffer): void {
|
|
|
|
|
// Append new data to buffer
|
|
|
|
|
state.buffer = Buffer.concat([state.buffer, data]);
|
|
|
|
|
|
|
|
|
|
// Process messages from buffer
|
|
|
|
|
this.processMessages(state);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Process complete messages from the buffer
|
|
|
|
|
*/
|
|
|
|
|
private async processMessages(state: IConnectionState): Promise<void> {
|
|
|
|
|
while (state.buffer.length >= 16) {
|
|
|
|
|
try {
|
|
|
|
|
const result = WireProtocol.parseMessage(state.buffer);
|
|
|
|
|
|
|
|
|
|
if (!result) {
|
|
|
|
|
// Not enough data for a complete message
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const { command, bytesConsumed } = result;
|
|
|
|
|
|
|
|
|
|
// Remove processed bytes from buffer
|
|
|
|
|
state.buffer = state.buffer.subarray(bytesConsumed);
|
|
|
|
|
|
|
|
|
|
// Process the command
|
|
|
|
|
const response = await this.commandRouter.route(command);
|
|
|
|
|
|
|
|
|
|
// Encode and send response
|
|
|
|
|
let responseBuffer: Buffer;
|
|
|
|
|
if (command.opCode === OP_QUERY) {
|
|
|
|
|
// Legacy OP_QUERY gets OP_REPLY response
|
|
|
|
|
responseBuffer = WireProtocol.encodeOpReplyResponse(
|
|
|
|
|
command.requestID,
|
|
|
|
|
[response]
|
|
|
|
|
);
|
|
|
|
|
} else {
|
|
|
|
|
// OP_MSG gets OP_MSG response
|
|
|
|
|
responseBuffer = WireProtocol.encodeOpMsgResponse(
|
|
|
|
|
command.requestID,
|
|
|
|
|
response
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!state.socket.destroyed) {
|
|
|
|
|
state.socket.write(responseBuffer);
|
|
|
|
|
}
|
|
|
|
|
} catch (error: any) {
|
|
|
|
|
// Send error response
|
|
|
|
|
const errorResponse = WireProtocol.encodeErrorResponse(
|
|
|
|
|
0, // We don't have the requestID at this point
|
|
|
|
|
1,
|
|
|
|
|
error.message || 'Internal error'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if (!state.socket.destroyed) {
|
|
|
|
|
state.socket.write(errorResponse);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Clear buffer on parse errors to avoid infinite loops
|
|
|
|
|
if (error.message?.includes('opCode') || error.message?.includes('section')) {
|
|
|
|
|
state.buffer = Buffer.alloc(0);
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the connection URI for this server
|
|
|
|
|
*/
|
|
|
|
|
getConnectionUri(): string {
|
2026-02-03 16:42:49 +00:00
|
|
|
if (this.useSocket && this.options.socketPath) {
|
|
|
|
|
// URL-encode the socket path (replace / with %2F)
|
|
|
|
|
const encodedPath = encodeURIComponent(this.options.socketPath);
|
|
|
|
|
return `mongodb://${encodedPath}`;
|
|
|
|
|
}
|
2026-01-31 11:33:11 +00:00
|
|
|
return `mongodb://${this.options.host}:${this.options.port}`;
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-03 16:42:49 +00:00
|
|
|
/**
|
|
|
|
|
* Get the socket path (if using Unix socket mode)
|
|
|
|
|
*/
|
|
|
|
|
get socketPath(): string | undefined {
|
|
|
|
|
return this.useSocket ? this.options.socketPath : undefined;
|
|
|
|
|
}
|
|
|
|
|
|
2026-01-31 11:33:11 +00:00
|
|
|
/**
|
|
|
|
|
* Check if the server is running
|
|
|
|
|
*/
|
|
|
|
|
get running(): boolean {
|
|
|
|
|
return this.isRunning;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the port the server is listening on
|
|
|
|
|
*/
|
|
|
|
|
get port(): number {
|
|
|
|
|
return this.options.port;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the host the server is bound to
|
|
|
|
|
*/
|
|
|
|
|
get host(): string {
|
|
|
|
|
return this.options.host;
|
|
|
|
|
}
|
|
|
|
|
}
|