417 lines
12 KiB
TypeScript
417 lines
12 KiB
TypeScript
import * as plugins from '../tsmdb.plugins.js';
|
|
|
|
/**
|
|
* MongoDB Wire Protocol Implementation
|
|
* Handles parsing and encoding of MongoDB wire protocol messages (OP_MSG primarily)
|
|
*
|
|
* Wire Protocol Message Format:
|
|
* - Header (16 bytes): messageLength (4), requestID (4), responseTo (4), opCode (4)
|
|
* - OP_MSG: flagBits (4), sections[], optional checksum (4)
|
|
*
|
|
* References:
|
|
* - https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/
|
|
*/
|
|
|
|
// OpCodes
|
|
export const OP_REPLY = 1; // Legacy reply
|
|
export const OP_UPDATE = 2001; // Legacy update
|
|
export const OP_INSERT = 2002; // Legacy insert
|
|
export const OP_QUERY = 2004; // Legacy query (still used for initial handshake)
|
|
export const OP_GET_MORE = 2005; // Legacy getMore
|
|
export const OP_DELETE = 2006; // Legacy delete
|
|
export const OP_KILL_CURSORS = 2007; // Legacy kill cursors
|
|
export const OP_COMPRESSED = 2012; // Compressed message
|
|
export const OP_MSG = 2013; // Modern protocol (MongoDB 3.6+)
|
|
|
|
// OP_MSG Section Types
|
|
export const SECTION_BODY = 0; // Single BSON document
|
|
export const SECTION_DOCUMENT_SEQUENCE = 1; // Document sequence for bulk operations
|
|
|
|
// OP_MSG Flag Bits
|
|
export const MSG_FLAG_CHECKSUM_PRESENT = 1 << 0;
|
|
export const MSG_FLAG_MORE_TO_COME = 1 << 1;
|
|
export const MSG_FLAG_EXHAUST_ALLOWED = 1 << 16;
|
|
|
|
/**
|
|
* Parsed message header
|
|
*/
|
|
export interface IMessageHeader {
|
|
messageLength: number;
|
|
requestID: number;
|
|
responseTo: number;
|
|
opCode: number;
|
|
}
|
|
|
|
/**
|
|
* Parsed OP_MSG message
|
|
*/
|
|
export interface IOpMsgMessage {
|
|
header: IMessageHeader;
|
|
flagBits: number;
|
|
sections: IOpMsgSection[];
|
|
checksum?: number;
|
|
}
|
|
|
|
/**
|
|
* OP_MSG section (either body or document sequence)
|
|
*/
|
|
export interface IOpMsgSection {
|
|
type: number;
|
|
payload: plugins.bson.Document;
|
|
sequenceIdentifier?: string;
|
|
documents?: plugins.bson.Document[];
|
|
}
|
|
|
|
/**
|
|
* Parsed OP_QUERY message (legacy, but used for initial handshake)
|
|
*/
|
|
export interface IOpQueryMessage {
|
|
header: IMessageHeader;
|
|
flags: number;
|
|
fullCollectionName: string;
|
|
numberToSkip: number;
|
|
numberToReturn: number;
|
|
query: plugins.bson.Document;
|
|
returnFieldsSelector?: plugins.bson.Document;
|
|
}
|
|
|
|
/**
|
|
* Parsed command from any message type
|
|
*/
|
|
export interface IParsedCommand {
|
|
commandName: string;
|
|
command: plugins.bson.Document;
|
|
database: string;
|
|
requestID: number;
|
|
opCode: number;
|
|
documentSequences?: Map<string, plugins.bson.Document[]>;
|
|
}
|
|
|
|
/**
|
|
* Wire Protocol parser and encoder
|
|
*/
|
|
export class WireProtocol {
|
|
/**
|
|
* Parse a complete message from a buffer
|
|
* Returns the parsed command and the number of bytes consumed
|
|
*/
|
|
static parseMessage(buffer: Buffer): { command: IParsedCommand; bytesConsumed: number } | null {
|
|
if (buffer.length < 16) {
|
|
return null; // Not enough data for header
|
|
}
|
|
|
|
const header = this.parseHeader(buffer);
|
|
|
|
if (buffer.length < header.messageLength) {
|
|
return null; // Not enough data for complete message
|
|
}
|
|
|
|
const messageBuffer = buffer.subarray(0, header.messageLength);
|
|
|
|
switch (header.opCode) {
|
|
case OP_MSG:
|
|
return this.parseOpMsg(messageBuffer, header);
|
|
case OP_QUERY:
|
|
return this.parseOpQuery(messageBuffer, header);
|
|
default:
|
|
throw new Error(`Unsupported opCode: ${header.opCode}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Parse message header (16 bytes)
|
|
*/
|
|
private static parseHeader(buffer: Buffer): IMessageHeader {
|
|
return {
|
|
messageLength: buffer.readInt32LE(0),
|
|
requestID: buffer.readInt32LE(4),
|
|
responseTo: buffer.readInt32LE(8),
|
|
opCode: buffer.readInt32LE(12),
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Parse OP_MSG message
|
|
*/
|
|
private static parseOpMsg(buffer: Buffer, header: IMessageHeader): { command: IParsedCommand; bytesConsumed: number } {
|
|
let offset = 16; // Skip header
|
|
|
|
const flagBits = buffer.readUInt32LE(offset);
|
|
offset += 4;
|
|
|
|
const sections: IOpMsgSection[] = [];
|
|
const documentSequences = new Map<string, plugins.bson.Document[]>();
|
|
|
|
// Parse sections until we reach the end (or checksum)
|
|
const messageEnd = (flagBits & MSG_FLAG_CHECKSUM_PRESENT)
|
|
? header.messageLength - 4
|
|
: header.messageLength;
|
|
|
|
while (offset < messageEnd) {
|
|
const sectionType = buffer.readUInt8(offset);
|
|
offset += 1;
|
|
|
|
if (sectionType === SECTION_BODY) {
|
|
// Single BSON document
|
|
const docSize = buffer.readInt32LE(offset);
|
|
const docBuffer = buffer.subarray(offset, offset + docSize);
|
|
const doc = plugins.bson.deserialize(docBuffer);
|
|
sections.push({ type: SECTION_BODY, payload: doc });
|
|
offset += docSize;
|
|
} else if (sectionType === SECTION_DOCUMENT_SEQUENCE) {
|
|
// Document sequence
|
|
const sectionSize = buffer.readInt32LE(offset);
|
|
const sectionEnd = offset + sectionSize;
|
|
offset += 4;
|
|
|
|
// Read sequence identifier (C string)
|
|
let identifierEnd = offset;
|
|
while (buffer[identifierEnd] !== 0 && identifierEnd < sectionEnd) {
|
|
identifierEnd++;
|
|
}
|
|
const identifier = buffer.subarray(offset, identifierEnd).toString('utf8');
|
|
offset = identifierEnd + 1; // Skip null terminator
|
|
|
|
// Read documents
|
|
const documents: plugins.bson.Document[] = [];
|
|
while (offset < sectionEnd) {
|
|
const docSize = buffer.readInt32LE(offset);
|
|
const docBuffer = buffer.subarray(offset, offset + docSize);
|
|
documents.push(plugins.bson.deserialize(docBuffer));
|
|
offset += docSize;
|
|
}
|
|
|
|
sections.push({
|
|
type: SECTION_DOCUMENT_SEQUENCE,
|
|
payload: {},
|
|
sequenceIdentifier: identifier,
|
|
documents
|
|
});
|
|
documentSequences.set(identifier, documents);
|
|
} else {
|
|
throw new Error(`Unknown section type: ${sectionType}`);
|
|
}
|
|
}
|
|
|
|
// The first section body contains the command
|
|
const commandSection = sections.find(s => s.type === SECTION_BODY);
|
|
if (!commandSection) {
|
|
throw new Error('OP_MSG missing command body section');
|
|
}
|
|
|
|
const command = commandSection.payload;
|
|
const commandName = Object.keys(command)[0];
|
|
const database = command.$db || 'admin';
|
|
|
|
return {
|
|
command: {
|
|
commandName,
|
|
command,
|
|
database,
|
|
requestID: header.requestID,
|
|
opCode: header.opCode,
|
|
documentSequences: documentSequences.size > 0 ? documentSequences : undefined,
|
|
},
|
|
bytesConsumed: header.messageLength,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Parse OP_QUERY message (legacy, used for initial handshake)
|
|
*/
|
|
private static parseOpQuery(buffer: Buffer, header: IMessageHeader): { command: IParsedCommand; bytesConsumed: number } {
|
|
let offset = 16; // Skip header
|
|
|
|
const flags = buffer.readInt32LE(offset);
|
|
offset += 4;
|
|
|
|
// Read full collection name (C string)
|
|
let nameEnd = offset;
|
|
while (buffer[nameEnd] !== 0 && nameEnd < buffer.length) {
|
|
nameEnd++;
|
|
}
|
|
const fullCollectionName = buffer.subarray(offset, nameEnd).toString('utf8');
|
|
offset = nameEnd + 1;
|
|
|
|
const numberToSkip = buffer.readInt32LE(offset);
|
|
offset += 4;
|
|
|
|
const numberToReturn = buffer.readInt32LE(offset);
|
|
offset += 4;
|
|
|
|
// Read query document
|
|
const querySize = buffer.readInt32LE(offset);
|
|
const queryBuffer = buffer.subarray(offset, offset + querySize);
|
|
const query = plugins.bson.deserialize(queryBuffer);
|
|
offset += querySize;
|
|
|
|
// Extract database from collection name (format: "dbname.$cmd" or "dbname.collection")
|
|
const parts = fullCollectionName.split('.');
|
|
const database = parts[0];
|
|
|
|
// For OP_QUERY to .$cmd, the query IS the command
|
|
let commandName = 'find';
|
|
let command = query;
|
|
|
|
if (parts[1] === '$cmd') {
|
|
// This is a command
|
|
commandName = Object.keys(query)[0];
|
|
// Handle special commands like isMaster, hello
|
|
if (commandName === 'isMaster' || commandName === 'ismaster') {
|
|
commandName = 'hello';
|
|
}
|
|
}
|
|
|
|
return {
|
|
command: {
|
|
commandName,
|
|
command,
|
|
database,
|
|
requestID: header.requestID,
|
|
opCode: header.opCode,
|
|
},
|
|
bytesConsumed: header.messageLength,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Encode a response as OP_MSG
|
|
*/
|
|
static encodeOpMsgResponse(
|
|
responseTo: number,
|
|
response: plugins.bson.Document,
|
|
requestID: number = Math.floor(Math.random() * 0x7FFFFFFF)
|
|
): Buffer {
|
|
// Add $db if not present (optional in response)
|
|
const responseDoc = { ...response };
|
|
|
|
// Serialize the response document
|
|
const bodyBson = plugins.bson.serialize(responseDoc);
|
|
|
|
// Calculate message length
|
|
// Header (16) + flagBits (4) + section type (1) + body BSON
|
|
const messageLength = 16 + 4 + 1 + bodyBson.length;
|
|
|
|
const buffer = Buffer.alloc(messageLength);
|
|
let offset = 0;
|
|
|
|
// Write header
|
|
buffer.writeInt32LE(messageLength, offset); // messageLength
|
|
offset += 4;
|
|
buffer.writeInt32LE(requestID, offset); // requestID
|
|
offset += 4;
|
|
buffer.writeInt32LE(responseTo, offset); // responseTo
|
|
offset += 4;
|
|
buffer.writeInt32LE(OP_MSG, offset); // opCode
|
|
offset += 4;
|
|
|
|
// Write flagBits (0 = no flags)
|
|
buffer.writeUInt32LE(0, offset);
|
|
offset += 4;
|
|
|
|
// Write section type 0 (body)
|
|
buffer.writeUInt8(SECTION_BODY, offset);
|
|
offset += 1;
|
|
|
|
// Write body BSON
|
|
Buffer.from(bodyBson).copy(buffer, offset);
|
|
|
|
return buffer;
|
|
}
|
|
|
|
/**
|
|
* Encode a response as OP_REPLY (legacy, for OP_QUERY responses)
|
|
*/
|
|
static encodeOpReplyResponse(
|
|
responseTo: number,
|
|
documents: plugins.bson.Document[],
|
|
requestID: number = Math.floor(Math.random() * 0x7FFFFFFF),
|
|
cursorId: bigint = BigInt(0)
|
|
): Buffer {
|
|
// Serialize all documents
|
|
const docBuffers = documents.map(doc => plugins.bson.serialize(doc));
|
|
const totalDocsSize = docBuffers.reduce((sum, buf) => sum + buf.length, 0);
|
|
|
|
// Message format:
|
|
// Header (16) + responseFlags (4) + cursorID (8) + startingFrom (4) + numberReturned (4) + documents
|
|
const messageLength = 16 + 4 + 8 + 4 + 4 + totalDocsSize;
|
|
|
|
const buffer = Buffer.alloc(messageLength);
|
|
let offset = 0;
|
|
|
|
// Write header
|
|
buffer.writeInt32LE(messageLength, offset); // messageLength
|
|
offset += 4;
|
|
buffer.writeInt32LE(requestID, offset); // requestID
|
|
offset += 4;
|
|
buffer.writeInt32LE(responseTo, offset); // responseTo
|
|
offset += 4;
|
|
buffer.writeInt32LE(OP_REPLY, offset); // opCode
|
|
offset += 4;
|
|
|
|
// Write OP_REPLY fields
|
|
buffer.writeInt32LE(0, offset); // responseFlags (0 = no errors)
|
|
offset += 4;
|
|
buffer.writeBigInt64LE(cursorId, offset); // cursorID
|
|
offset += 8;
|
|
buffer.writeInt32LE(0, offset); // startingFrom
|
|
offset += 4;
|
|
buffer.writeInt32LE(documents.length, offset); // numberReturned
|
|
offset += 4;
|
|
|
|
// Write documents
|
|
for (const docBuffer of docBuffers) {
|
|
Buffer.from(docBuffer).copy(buffer, offset);
|
|
offset += docBuffer.length;
|
|
}
|
|
|
|
return buffer;
|
|
}
|
|
|
|
/**
|
|
* Encode an error response
|
|
*/
|
|
static encodeErrorResponse(
|
|
responseTo: number,
|
|
errorCode: number,
|
|
errorMessage: string,
|
|
commandName?: string
|
|
): Buffer {
|
|
const response: plugins.bson.Document = {
|
|
ok: 0,
|
|
errmsg: errorMessage,
|
|
code: errorCode,
|
|
codeName: this.getErrorCodeName(errorCode),
|
|
};
|
|
|
|
return this.encodeOpMsgResponse(responseTo, response);
|
|
}
|
|
|
|
/**
|
|
* Get error code name from error code
|
|
*/
|
|
private static getErrorCodeName(code: number): string {
|
|
const errorNames: Record<number, string> = {
|
|
0: 'OK',
|
|
1: 'InternalError',
|
|
2: 'BadValue',
|
|
11000: 'DuplicateKey',
|
|
11001: 'DuplicateKeyValue',
|
|
13: 'Unauthorized',
|
|
26: 'NamespaceNotFound',
|
|
27: 'IndexNotFound',
|
|
48: 'NamespaceExists',
|
|
59: 'CommandNotFound',
|
|
66: 'ImmutableField',
|
|
73: 'InvalidNamespace',
|
|
85: 'IndexOptionsConflict',
|
|
112: 'WriteConflict',
|
|
121: 'DocumentValidationFailure',
|
|
211: 'KeyNotFound',
|
|
251: 'NoSuchTransaction',
|
|
};
|
|
|
|
return errorNames[code] || 'UnknownError';
|
|
}
|
|
}
|