import * as plugins from '../congodb.plugins.js'; /** * MongoDB Wire Protocol Implementation * Handles parsing and encoding of MongoDB wire protocol messages (OP_MSG primarily) * * Wire Protocol Message Format: * - Header (16 bytes): messageLength (4), requestID (4), responseTo (4), opCode (4) * - OP_MSG: flagBits (4), sections[], optional checksum (4) * * References: * - https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/ */ // OpCodes export const OP_REPLY = 1; // Legacy reply export const OP_UPDATE = 2001; // Legacy update export const OP_INSERT = 2002; // Legacy insert export const OP_QUERY = 2004; // Legacy query (still used for initial handshake) export const OP_GET_MORE = 2005; // Legacy getMore export const OP_DELETE = 2006; // Legacy delete export const OP_KILL_CURSORS = 2007; // Legacy kill cursors export const OP_COMPRESSED = 2012; // Compressed message export const OP_MSG = 2013; // Modern protocol (MongoDB 3.6+) // OP_MSG Section Types export const SECTION_BODY = 0; // Single BSON document export const SECTION_DOCUMENT_SEQUENCE = 1; // Document sequence for bulk operations // OP_MSG Flag Bits export const MSG_FLAG_CHECKSUM_PRESENT = 1 << 0; export const MSG_FLAG_MORE_TO_COME = 1 << 1; export const MSG_FLAG_EXHAUST_ALLOWED = 1 << 16; /** * Parsed message header */ export interface IMessageHeader { messageLength: number; requestID: number; responseTo: number; opCode: number; } /** * Parsed OP_MSG message */ export interface IOpMsgMessage { header: IMessageHeader; flagBits: number; sections: IOpMsgSection[]; checksum?: number; } /** * OP_MSG section (either body or document sequence) */ export interface IOpMsgSection { type: number; payload: plugins.bson.Document; sequenceIdentifier?: string; documents?: plugins.bson.Document[]; } /** * Parsed OP_QUERY message (legacy, but used for initial handshake) */ export interface IOpQueryMessage { header: IMessageHeader; flags: number; fullCollectionName: string; numberToSkip: number; numberToReturn: number; query: plugins.bson.Document; returnFieldsSelector?: plugins.bson.Document; } /** * Parsed command from any message type */ export interface IParsedCommand { commandName: string; command: plugins.bson.Document; database: string; requestID: number; opCode: number; documentSequences?: Map; } /** * Wire Protocol parser and encoder */ export class WireProtocol { /** * Parse a complete message from a buffer * Returns the parsed command and the number of bytes consumed */ static parseMessage(buffer: Buffer): { command: IParsedCommand; bytesConsumed: number } | null { if (buffer.length < 16) { return null; // Not enough data for header } const header = this.parseHeader(buffer); if (buffer.length < header.messageLength) { return null; // Not enough data for complete message } const messageBuffer = buffer.subarray(0, header.messageLength); switch (header.opCode) { case OP_MSG: return this.parseOpMsg(messageBuffer, header); case OP_QUERY: return this.parseOpQuery(messageBuffer, header); default: throw new Error(`Unsupported opCode: ${header.opCode}`); } } /** * Parse message header (16 bytes) */ private static parseHeader(buffer: Buffer): IMessageHeader { return { messageLength: buffer.readInt32LE(0), requestID: buffer.readInt32LE(4), responseTo: buffer.readInt32LE(8), opCode: buffer.readInt32LE(12), }; } /** * Parse OP_MSG message */ private static parseOpMsg(buffer: Buffer, header: IMessageHeader): { command: IParsedCommand; bytesConsumed: number } { let offset = 16; // Skip header const flagBits = buffer.readUInt32LE(offset); offset += 4; const sections: IOpMsgSection[] = []; const documentSequences = new Map(); // Parse sections until we reach the end (or checksum) const messageEnd = (flagBits & MSG_FLAG_CHECKSUM_PRESENT) ? header.messageLength - 4 : header.messageLength; while (offset < messageEnd) { const sectionType = buffer.readUInt8(offset); offset += 1; if (sectionType === SECTION_BODY) { // Single BSON document const docSize = buffer.readInt32LE(offset); const docBuffer = buffer.subarray(offset, offset + docSize); const doc = plugins.bson.deserialize(docBuffer); sections.push({ type: SECTION_BODY, payload: doc }); offset += docSize; } else if (sectionType === SECTION_DOCUMENT_SEQUENCE) { // Document sequence const sectionSize = buffer.readInt32LE(offset); const sectionEnd = offset + sectionSize; offset += 4; // Read sequence identifier (C string) let identifierEnd = offset; while (buffer[identifierEnd] !== 0 && identifierEnd < sectionEnd) { identifierEnd++; } const identifier = buffer.subarray(offset, identifierEnd).toString('utf8'); offset = identifierEnd + 1; // Skip null terminator // Read documents const documents: plugins.bson.Document[] = []; while (offset < sectionEnd) { const docSize = buffer.readInt32LE(offset); const docBuffer = buffer.subarray(offset, offset + docSize); documents.push(plugins.bson.deserialize(docBuffer)); offset += docSize; } sections.push({ type: SECTION_DOCUMENT_SEQUENCE, payload: {}, sequenceIdentifier: identifier, documents }); documentSequences.set(identifier, documents); } else { throw new Error(`Unknown section type: ${sectionType}`); } } // The first section body contains the command const commandSection = sections.find(s => s.type === SECTION_BODY); if (!commandSection) { throw new Error('OP_MSG missing command body section'); } const command = commandSection.payload; const commandName = Object.keys(command)[0]; const database = command.$db || 'admin'; return { command: { commandName, command, database, requestID: header.requestID, opCode: header.opCode, documentSequences: documentSequences.size > 0 ? documentSequences : undefined, }, bytesConsumed: header.messageLength, }; } /** * Parse OP_QUERY message (legacy, used for initial handshake) */ private static parseOpQuery(buffer: Buffer, header: IMessageHeader): { command: IParsedCommand; bytesConsumed: number } { let offset = 16; // Skip header const flags = buffer.readInt32LE(offset); offset += 4; // Read full collection name (C string) let nameEnd = offset; while (buffer[nameEnd] !== 0 && nameEnd < buffer.length) { nameEnd++; } const fullCollectionName = buffer.subarray(offset, nameEnd).toString('utf8'); offset = nameEnd + 1; const numberToSkip = buffer.readInt32LE(offset); offset += 4; const numberToReturn = buffer.readInt32LE(offset); offset += 4; // Read query document const querySize = buffer.readInt32LE(offset); const queryBuffer = buffer.subarray(offset, offset + querySize); const query = plugins.bson.deserialize(queryBuffer); offset += querySize; // Extract database from collection name (format: "dbname.$cmd" or "dbname.collection") const parts = fullCollectionName.split('.'); const database = parts[0]; // For OP_QUERY to .$cmd, the query IS the command let commandName = 'find'; let command = query; if (parts[1] === '$cmd') { // This is a command commandName = Object.keys(query)[0]; // Handle special commands like isMaster, hello if (commandName === 'isMaster' || commandName === 'ismaster') { commandName = 'hello'; } } return { command: { commandName, command, database, requestID: header.requestID, opCode: header.opCode, }, bytesConsumed: header.messageLength, }; } /** * Encode a response as OP_MSG */ static encodeOpMsgResponse( responseTo: number, response: plugins.bson.Document, requestID: number = Math.floor(Math.random() * 0x7FFFFFFF) ): Buffer { // Add $db if not present (optional in response) const responseDoc = { ...response }; // Serialize the response document const bodyBson = plugins.bson.serialize(responseDoc); // Calculate message length // Header (16) + flagBits (4) + section type (1) + body BSON const messageLength = 16 + 4 + 1 + bodyBson.length; const buffer = Buffer.alloc(messageLength); let offset = 0; // Write header buffer.writeInt32LE(messageLength, offset); // messageLength offset += 4; buffer.writeInt32LE(requestID, offset); // requestID offset += 4; buffer.writeInt32LE(responseTo, offset); // responseTo offset += 4; buffer.writeInt32LE(OP_MSG, offset); // opCode offset += 4; // Write flagBits (0 = no flags) buffer.writeUInt32LE(0, offset); offset += 4; // Write section type 0 (body) buffer.writeUInt8(SECTION_BODY, offset); offset += 1; // Write body BSON Buffer.from(bodyBson).copy(buffer, offset); return buffer; } /** * Encode a response as OP_REPLY (legacy, for OP_QUERY responses) */ static encodeOpReplyResponse( responseTo: number, documents: plugins.bson.Document[], requestID: number = Math.floor(Math.random() * 0x7FFFFFFF), cursorId: bigint = BigInt(0) ): Buffer { // Serialize all documents const docBuffers = documents.map(doc => plugins.bson.serialize(doc)); const totalDocsSize = docBuffers.reduce((sum, buf) => sum + buf.length, 0); // Message format: // Header (16) + responseFlags (4) + cursorID (8) + startingFrom (4) + numberReturned (4) + documents const messageLength = 16 + 4 + 8 + 4 + 4 + totalDocsSize; const buffer = Buffer.alloc(messageLength); let offset = 0; // Write header buffer.writeInt32LE(messageLength, offset); // messageLength offset += 4; buffer.writeInt32LE(requestID, offset); // requestID offset += 4; buffer.writeInt32LE(responseTo, offset); // responseTo offset += 4; buffer.writeInt32LE(OP_REPLY, offset); // opCode offset += 4; // Write OP_REPLY fields buffer.writeInt32LE(0, offset); // responseFlags (0 = no errors) offset += 4; buffer.writeBigInt64LE(cursorId, offset); // cursorID offset += 8; buffer.writeInt32LE(0, offset); // startingFrom offset += 4; buffer.writeInt32LE(documents.length, offset); // numberReturned offset += 4; // Write documents for (const docBuffer of docBuffers) { Buffer.from(docBuffer).copy(buffer, offset); offset += docBuffer.length; } return buffer; } /** * Encode an error response */ static encodeErrorResponse( responseTo: number, errorCode: number, errorMessage: string, commandName?: string ): Buffer { const response: plugins.bson.Document = { ok: 0, errmsg: errorMessage, code: errorCode, codeName: this.getErrorCodeName(errorCode), }; return this.encodeOpMsgResponse(responseTo, response); } /** * Get error code name from error code */ private static getErrorCodeName(code: number): string { const errorNames: Record = { 0: 'OK', 1: 'InternalError', 2: 'BadValue', 11000: 'DuplicateKey', 11001: 'DuplicateKeyValue', 13: 'Unauthorized', 26: 'NamespaceNotFound', 27: 'IndexNotFound', 48: 'NamespaceExists', 59: 'CommandNotFound', 66: 'ImmutableField', 73: 'InvalidNamespace', 85: 'IndexOptionsConflict', 112: 'WriteConflict', 121: 'DocumentValidationFailure', 211: 'KeyNotFound', 251: 'NoSuchTransaction', }; return errorNames[code] || 'UnknownError'; } }