From 3480fbdb298d765dd71a47ff92dc8901c6f705c9 Mon Sep 17 00:00:00 2001 From: bailey Date: Wed, 23 Jul 2025 15:13:47 -0600 Subject: [PATCH 01/10] POC --- src/db.ts | 2 +- src/operations/execute_operation.ts | 18 ++++- src/operations/operation.ts | 38 ++++++++++- src/operations/search_indexes/drop.ts | 33 ++++----- src/sdam/server.ts | 97 ++++++++++++++++++++++++++- 5 files changed, 167 insertions(+), 21 deletions(-) diff --git a/src/db.ts b/src/db.ts index 49b094bc100..dacc9cd2cd9 100644 --- a/src/db.ts +++ b/src/db.ts @@ -6,7 +6,7 @@ import * as CONSTANTS from './constants'; import { AggregationCursor } from './cursor/aggregation_cursor'; import { ListCollectionsCursor } from './cursor/list_collections_cursor'; import { RunCommandCursor, type RunCursorCommandOptions } from './cursor/run_command_cursor'; -import { MongoInvalidArgumentError } from './error'; +import { MONGODB_ERROR_CODES, MongoInvalidArgumentError, MongoServerError } from './error'; import type { MongoClient, PkFactory } from './mongo_client'; import type { Abortable, TODO_NODE_3286 } from './mongo_types'; import type { AggregateOptions } from './operations/aggregate'; diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 454f56daaa9..149232fd419 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -26,7 +26,7 @@ import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import { TimeoutContext } from '../timeout'; import { abortable, supportsRetryableWrites } from '../utils'; -import { AbstractOperation, Aspect } from './operation'; +import { AbstractOperation, Aspect, ModernOperation } from './operation'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = @@ -85,6 +85,8 @@ export async function executeOperation< throw new MongoInvalidArgumentError('ClientSession must be from the same MongoClient'); } + operation.session ??= session; + const readPreference = operation.readPreference ?? ReadPreference.primary; const inTransaction = !!session?.inTransaction(); @@ -231,6 +233,8 @@ async function tryOperation< let previousOperationError: MongoError | undefined; let previousServer: ServerDescription | undefined; + const isModernOperation = operation instanceof ModernOperation; + for (let tries = 0; tries < maxTries; tries++) { if (previousOperationError) { if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) { @@ -280,7 +284,17 @@ async function tryOperation< if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) { operation.resetBatch(); } - return await operation.execute(server, session, timeoutContext); + + if (!isModernOperation) { + return await operation.execute(server, session, timeoutContext); + } + + try { + const result = await server.modernCommand(operation, timeoutContext); + return operation.handleOk(result) as TResult; + } catch (error) { + operation.handleError(error); + } } catch (operationError) { if (!(operationError instanceof MongoError)) throw operationError; if ( diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 190f2a522bd..d09568ca494 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -1,7 +1,8 @@ +import { type Connection, type MongoError } from '..'; import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '../bson'; import { type Abortable } from '../mongo_types'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; -import type { Server } from '../sdam/server'; +import type { Server, ServerCommandOptions } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { type TimeoutContext } from '../timeout'; import type { MongoDBNamespace } from '../utils'; @@ -108,6 +109,10 @@ export abstract class AbstractOperation { return this._session; } + set session(session: ClientSession) { + this._session = session; + } + clearSession() { this._session = undefined; } @@ -125,6 +130,37 @@ export abstract class AbstractOperation { } } +export abstract class ModernOperation extends AbstractOperation { + /** this will never be used - but we must implement it to satisfy AbstractOperation's interface */ + override execute( + _server: Server, + _session: ClientSession | undefined, + _timeoutContext: TimeoutContext + ): Promise { + throw new Error('cannot execute!!'); + } + + abstract buildCommand(connection: Connection, session?: ClientSession): Document; + + abstract buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions; + + /** + * Optional - if the operation performs error handling, such as wrapping or renaming the error, + * this method can be overridden. + */ + handleOk(response: Document) { + return response; + } + + /** + * Optional - if the operation performs post-processing + * on the result document, this method can be overridden. + */ + handleError(error: MongoError): void { + throw error; + } +} + export function defineAspects( operation: { aspects?: Set }, aspects: symbol | symbol[] | Set diff --git a/src/operations/search_indexes/drop.ts b/src/operations/search_indexes/drop.ts index 3b87bfad442..a0ef1314fde 100644 --- a/src/operations/search_indexes/drop.ts +++ b/src/operations/search_indexes/drop.ts @@ -1,13 +1,14 @@ +import { type Connection, type MongoError } from '../..'; import type { Document } from '../../bson'; import type { Collection } from '../../collection'; import { MONGODB_ERROR_CODES, MongoServerError } from '../../error'; -import type { Server } from '../../sdam/server'; +import type { Server, ServerCommandOptions } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; import { type TimeoutContext } from '../../timeout'; -import { AbstractOperation } from '../operation'; +import { AbstractOperation, ModernOperation } from '../operation'; /** @internal */ -export class DropSearchIndexOperation extends AbstractOperation { +export class DropSearchIndexOperation extends ModernOperation { private readonly collection: Collection; private readonly name: string; @@ -15,17 +16,14 @@ export class DropSearchIndexOperation extends AbstractOperation { super(); this.collection = collection; this.name = name; + this.ns = collection.fullNamespace; } override get commandName() { return 'dropSearchIndex' as const; } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { + override buildCommand(_connection: Connection, _session?: ClientSession): Document { const namespace = this.collection.fullNamespace; const command: Document = { @@ -35,15 +33,18 @@ export class DropSearchIndexOperation extends AbstractOperation { if (typeof this.name === 'string') { command.name = this.name; } + return command; + } + + override buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions { + return { session: this.session, timeoutContext }; + } - try { - await server.command(namespace, command, { session, timeoutContext }); - } catch (error) { - const isNamespaceNotFoundError = - error instanceof MongoServerError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound; - if (!isNamespaceNotFoundError) { - throw error; - } + override handleError(error: MongoError): void { + const isNamespaceNotFoundError = + error instanceof MongoServerError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound; + if (!isNamespaceNotFoundError) { + throw error; } } } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 4d7052e3270..121bbef4f95 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -38,8 +38,9 @@ import { import type { ServerApi } from '../mongo_client'; import { type Abortable, TypedEventEmitter } from '../mongo_types'; import type { GetMoreOptions } from '../operations/get_more'; +import { type ModernOperation } from '../operations/operation'; import type { ClientSession } from '../sessions'; -import { type TimeoutContext } from '../timeout'; +import { Timeout, type TimeoutContext } from '../timeout'; import { isTransactionCommand } from '../transactions'; import { abortable, @@ -277,6 +278,100 @@ export class Server extends TypedEventEmitter { } } + public async modernCommand( + operation: ModernOperation, + timeoutContext: TimeoutContext + ): Promise { + if (this.s.state === STATE_CLOSING || this.s.state === STATE_CLOSED) { + throw new MongoServerClosedError(); + } + const session = operation.session; + + let conn = session?.pinnedConnection; + + this.incrementOperationCount(); + if (conn == null) { + try { + conn = await this.pool.checkOut({ timeoutContext }); + } catch (checkoutError) { + this.decrementOperationCount(); + if (!(checkoutError instanceof PoolClearedError)) this.handleError(checkoutError); + throw checkoutError; + } + } + + const cmd = operation.buildCommand(conn, session); + const options = operation.buildOptions(timeoutContext); + const ns = operation.ns; + + if (this.loadBalanced && isPinnableCommand(cmd, session)) { + session?.pin(conn); + } + + options.directConnection = this.topology.s.options.directConnection; + + // There are cases where we need to flag the read preference not to get sent in + // the command, such as pre-5.0 servers attempting to perform an aggregate write + // with a non-primary read preference. In this case the effective read preference + // (primary) is not the same as the provided and must be removed completely. + if (options.omitReadPreference) { + delete options.readPreference; + } + + if (this.description.iscryptd) { + options.omitMaxTimeMS = true; + } + + let reauthPromise: Promise | null = null; + + try { + try { + const res = await conn.command(ns, cmd, options); + throwIfWriteConcernError(res); + return res; + } catch (commandError) { + throw this.decorateCommandError(conn, cmd, options, commandError); + } + } catch (operationError) { + if ( + operationError instanceof MongoError && + operationError.code === MONGODB_ERROR_CODES.Reauthenticate + ) { + reauthPromise = this.pool.reauthenticate(conn); + reauthPromise.then(undefined, error => { + reauthPromise = null; + squashError(error); + }); + + await abortable(reauthPromise, options); + reauthPromise = null; // only reachable if reauth succeeds + + try { + const res = await conn.command(ns, cmd, options); + throwIfWriteConcernError(res); + return res; + } catch (commandError) { + throw this.decorateCommandError(conn, cmd, options, commandError); + } + } else { + throw operationError; + } + } finally { + this.decrementOperationCount(); + if (session?.pinnedConnection !== conn) { + if (reauthPromise != null) { + // The reauth promise only exists if it hasn't thrown. + const checkBackIn = () => { + this.pool.checkIn(conn); + }; + void reauthPromise.then(checkBackIn, checkBackIn); + } else { + this.pool.checkIn(conn); + } + } + } + } + public async command( ns: MongoDBNamespace, command: Document, From da4c1d20bcf831bc329ad7c5e3b3c74830db2e53 Mon Sep 17 00:00:00 2001 From: bailey Date: Fri, 25 Jul 2025 10:23:25 -0600 Subject: [PATCH 02/10] POC with response type --- src/operations/drop.ts | 1 - src/operations/execute_operation.ts | 2 +- src/operations/operation.ts | 17 ++++++++++------ src/operations/search_indexes/drop.ts | 12 ++++++++--- src/sdam/server.ts | 29 +++++++++++++++++++-------- 5 files changed, 42 insertions(+), 19 deletions(-) diff --git a/src/operations/drop.ts b/src/operations/drop.ts index 3fd4ac6dace..7b1c6ee0482 100644 --- a/src/operations/drop.ts +++ b/src/operations/drop.ts @@ -1,7 +1,6 @@ import type { Document } from '../bson'; import { CursorTimeoutContext } from '../cursor/abstract_cursor'; import type { Db } from '../db'; -import { MONGODB_ERROR_CODES, MongoServerError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { TimeoutContext } from '../timeout'; diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 149232fd419..d5c13b8364c 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -291,7 +291,7 @@ async function tryOperation< try { const result = await server.modernCommand(operation, timeoutContext); - return operation.handleOk(result) as TResult; + return operation.handleOk(result); } catch (error) { operation.handleError(error); } diff --git a/src/operations/operation.ts b/src/operations/operation.ts index d09568ca494..acc65a97344 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -1,4 +1,4 @@ -import { type Connection, type MongoError } from '..'; +import { type Connection, type MongoDBResponse, type MongoError } from '..'; import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '../bson'; import { type Abortable } from '../mongo_types'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; @@ -130,13 +130,18 @@ export abstract class AbstractOperation { } } -export abstract class ModernOperation extends AbstractOperation { +export abstract class ModernOperation< + TResponse extends typeof MongoDBResponse | undefined, + TResult +> extends AbstractOperation { + abstract RESPONSE_TYPE: TResponse; + /** this will never be used - but we must implement it to satisfy AbstractOperation's interface */ override execute( _server: Server, _session: ClientSession | undefined, _timeoutContext: TimeoutContext - ): Promise { + ): Promise { throw new Error('cannot execute!!'); } @@ -148,9 +153,9 @@ export abstract class ModernOperation extends AbstractOperation { * Optional - if the operation performs error handling, such as wrapping or renaming the error, * this method can be overridden. */ - handleOk(response: Document) { - return response; - } + abstract handleOk( + response: TResponse extends typeof MongoDBResponse ? InstanceType : Document + ): TResult; /** * Optional - if the operation performs post-processing diff --git a/src/operations/search_indexes/drop.ts b/src/operations/search_indexes/drop.ts index a0ef1314fde..588d5de34c0 100644 --- a/src/operations/search_indexes/drop.ts +++ b/src/operations/search_indexes/drop.ts @@ -2,13 +2,15 @@ import { type Connection, type MongoError } from '../..'; import type { Document } from '../../bson'; import type { Collection } from '../../collection'; import { MONGODB_ERROR_CODES, MongoServerError } from '../../error'; -import type { Server, ServerCommandOptions } from '../../sdam/server'; +import type { ServerCommandOptions } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; import { type TimeoutContext } from '../../timeout'; -import { AbstractOperation, ModernOperation } from '../operation'; +import { ModernOperation } from '../operation'; /** @internal */ -export class DropSearchIndexOperation extends ModernOperation { +export class DropSearchIndexOperation extends ModernOperation { + override RESPONSE_TYPE: undefined; + private readonly collection: Collection; private readonly name: string; @@ -36,6 +38,10 @@ export class DropSearchIndexOperation extends ModernOperation { return command; } + override handleOk(_response: Document): void { + // do nothing + } + override buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions { return { session: this.session, timeoutContext }; } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 121bbef4f95..b82a3786a07 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -7,7 +7,10 @@ import { type ConnectionPoolOptions } from '../cmap/connection_pool'; import { PoolClearedError } from '../cmap/errors'; -import { type MongoDBResponseConstructor } from '../cmap/wire_protocol/responses'; +import { + type MongoDBResponse, + type MongoDBResponseConstructor +} from '../cmap/wire_protocol/responses'; import { APM_EVENTS, CLOSED, @@ -40,7 +43,7 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types'; import type { GetMoreOptions } from '../operations/get_more'; import { type ModernOperation } from '../operations/operation'; import type { ClientSession } from '../sessions'; -import { Timeout, type TimeoutContext } from '../timeout'; +import { type TimeoutContext } from '../timeout'; import { isTransactionCommand } from '../transactions'; import { abortable, @@ -278,10 +281,14 @@ export class Server extends TypedEventEmitter { } } - public async modernCommand( - operation: ModernOperation, + public async modernCommand( + operation: ModernOperation, timeoutContext: TimeoutContext - ): Promise { + ): Promise< + typeof operation.RESPONSE_TYPE extends typeof MongoDBResponse + ? InstanceType + : Document + > { if (this.s.state === STATE_CLOSING || this.s.state === STATE_CLOSED) { throw new MongoServerClosedError(); } @@ -326,9 +333,12 @@ export class Server extends TypedEventEmitter { try { try { - const res = await conn.command(ns, cmd, options); + const res = await conn.command(ns, cmd, options, operation.RESPONSE_TYPE); throwIfWriteConcernError(res); - return res; + // TODO: figure out why casting is necessary + return res as typeof operation.RESPONSE_TYPE extends typeof MongoDBResponse + ? InstanceType + : Document; } catch (commandError) { throw this.decorateCommandError(conn, cmd, options, commandError); } @@ -349,7 +359,10 @@ export class Server extends TypedEventEmitter { try { const res = await conn.command(ns, cmd, options); throwIfWriteConcernError(res); - return res; + // TODO: figure out why casting is necessary + return res as typeof operation.RESPONSE_TYPE extends typeof MongoDBResponse + ? InstanceType + : Document; } catch (commandError) { throw this.decorateCommandError(conn, cmd, options, commandError); } From 6be96f4a595ca4ccb101acf2dd16d9e7f7b5a1ae Mon Sep 17 00:00:00 2001 From: bailey Date: Fri, 25 Jul 2025 11:02:52 -0600 Subject: [PATCH 03/10] fix ci --- test/integration/crud/abstract_operation.test.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/integration/crud/abstract_operation.test.ts b/test/integration/crud/abstract_operation.test.ts index 052286a3ea7..d9793e73db5 100644 --- a/test/integration/crud/abstract_operation.test.ts +++ b/test/integration/crud/abstract_operation.test.ts @@ -290,6 +290,9 @@ describe('abstract operation', function () { if (!WrapperSubclasses.includes(subclassType.name.toString())) { it(`operation.commandName equals key in command document`, async function () { const subclassInstance = subclassCreator(); + if (subclassInstance instanceof mongodb.ModernOperation) { + return; + } const yieldDoc = subclassType.name === 'ProfilingLevelOperation' ? { ok: 1, was: 1 } : { ok: 1 }; const cmdCallerStub = sinon.stub(Server.prototype, 'command').resolves(yieldDoc); From bef8cd84f17cf36c10c326ad13448718195a83a6 Mon Sep 17 00:00:00 2001 From: bailey Date: Fri, 25 Jul 2025 13:24:03 -0600 Subject: [PATCH 04/10] passing --- src/operations/execute_operation.ts | 4 +-- src/operations/operation.ts | 35 ++++++++++++------- src/operations/search_indexes/drop.ts | 9 ++--- src/sdam/server.ts | 31 +++++----------- .../crud/abstract_operation.test.ts | 2 +- 5 files changed, 39 insertions(+), 42 deletions(-) diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index d5c13b8364c..399129a62ed 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -26,7 +26,7 @@ import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import { TimeoutContext } from '../timeout'; import { abortable, supportsRetryableWrites } from '../utils'; -import { AbstractOperation, Aspect, ModernOperation } from './operation'; +import { AbstractOperation, Aspect, ModernizedOperation } from './operation'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = @@ -233,7 +233,7 @@ async function tryOperation< let previousOperationError: MongoError | undefined; let previousServer: ServerDescription | undefined; - const isModernOperation = operation instanceof ModernOperation; + const isModernOperation = operation instanceof ModernizedOperation; for (let tries = 0; tries < maxTries; tries++) { if (previousOperationError) { diff --git a/src/operations/operation.ts b/src/operations/operation.ts index acc65a97344..dcca97654c9 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -1,5 +1,6 @@ -import { type Connection, type MongoDBResponse, type MongoError } from '..'; +import { type Connection, type MongoError } from '..'; import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '../bson'; +import { type MongoDBResponse } from '../cmap/wire_protocol/responses'; import { type Abortable } from '../mongo_types'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import type { Server, ServerCommandOptions } from '../sdam/server'; @@ -130,11 +131,8 @@ export abstract class AbstractOperation { } } -export abstract class ModernOperation< - TResponse extends typeof MongoDBResponse | undefined, - TResult -> extends AbstractOperation { - abstract RESPONSE_TYPE: TResponse; +export abstract class ModernizedOperation extends AbstractOperation { + abstract SERVER_COMMAND_RESPONSE_TYPE: typeof MongoDBResponse; /** this will never be used - but we must implement it to satisfy AbstractOperation's interface */ override execute( @@ -150,16 +148,27 @@ export abstract class ModernOperation< abstract buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions; /** - * Optional - if the operation performs error handling, such as wrapping or renaming the error, - * this method can be overridden. + * Given an instance of a MongoDBResponse, map the response to the correct result type. For + * example, a `CountOperation` might map the response as follows: + * + * ```typescript + * override handleOk(response: InstanceType): TResult { + * return response.toObject(this.bsonOptions).n ?? 0; + * } + * + * // or, with type safety: + * override handleOk(response: InstanceType): TResult { + * return response.getNumber('n') ?? 0; + * } + * ``` */ - abstract handleOk( - response: TResponse extends typeof MongoDBResponse ? InstanceType : Document - ): TResult; + handleOk(response: InstanceType): TResult { + return response.toObject(this.bsonOptions) as TResult; + } /** - * Optional - if the operation performs post-processing - * on the result document, this method can be overridden. + * Optional - if the operation performs error handling, such as wrapping or renaming the error, + * this method can be overridden. */ handleError(error: MongoError): void { throw error; diff --git a/src/operations/search_indexes/drop.ts b/src/operations/search_indexes/drop.ts index 588d5de34c0..d6fa8ea670c 100644 --- a/src/operations/search_indexes/drop.ts +++ b/src/operations/search_indexes/drop.ts @@ -5,11 +5,12 @@ import { MONGODB_ERROR_CODES, MongoServerError } from '../../error'; import type { ServerCommandOptions } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; import { type TimeoutContext } from '../../timeout'; -import { ModernOperation } from '../operation'; +import { ModernizedOperation } from '../operation'; +import { MongoDBResponse } from '../../cmap/wire_protocol/responses' /** @internal */ -export class DropSearchIndexOperation extends ModernOperation { - override RESPONSE_TYPE: undefined; +export class DropSearchIndexOperation extends ModernizedOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; private readonly collection: Collection; private readonly name: string; @@ -38,7 +39,7 @@ export class DropSearchIndexOperation extends ModernOperation { return command; } - override handleOk(_response: Document): void { + override handleOk(_response: MongoDBResponse): void { // do nothing } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index b82a3786a07..4e4f70cbadb 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -7,10 +7,7 @@ import { type ConnectionPoolOptions } from '../cmap/connection_pool'; import { PoolClearedError } from '../cmap/errors'; -import { - type MongoDBResponse, - type MongoDBResponseConstructor -} from '../cmap/wire_protocol/responses'; +import { type MongoDBResponseConstructor } from '../cmap/wire_protocol/responses'; import { APM_EVENTS, CLOSED, @@ -41,7 +38,7 @@ import { import type { ServerApi } from '../mongo_client'; import { type Abortable, TypedEventEmitter } from '../mongo_types'; import type { GetMoreOptions } from '../operations/get_more'; -import { type ModernOperation } from '../operations/operation'; +import { type ModernizedOperation } from '../operations/operation'; import type { ClientSession } from '../sessions'; import { type TimeoutContext } from '../timeout'; import { isTransactionCommand } from '../transactions'; @@ -281,14 +278,10 @@ export class Server extends TypedEventEmitter { } } - public async modernCommand( - operation: ModernOperation, + public async modernCommand( + operation: ModernizedOperation, timeoutContext: TimeoutContext - ): Promise< - typeof operation.RESPONSE_TYPE extends typeof MongoDBResponse - ? InstanceType - : Document - > { + ): Promise> { if (this.s.state === STATE_CLOSING || this.s.state === STATE_CLOSED) { throw new MongoServerClosedError(); } @@ -333,12 +326,9 @@ export class Server extends TypedEventEmitter { try { try { - const res = await conn.command(ns, cmd, options, operation.RESPONSE_TYPE); + const res = await conn.command(ns, cmd, options, operation.SERVER_COMMAND_RESPONSE_TYPE); throwIfWriteConcernError(res); - // TODO: figure out why casting is necessary - return res as typeof operation.RESPONSE_TYPE extends typeof MongoDBResponse - ? InstanceType - : Document; + return res; } catch (commandError) { throw this.decorateCommandError(conn, cmd, options, commandError); } @@ -357,12 +347,9 @@ export class Server extends TypedEventEmitter { reauthPromise = null; // only reachable if reauth succeeds try { - const res = await conn.command(ns, cmd, options); + const res = await conn.command(ns, cmd, options, operation.SERVER_COMMAND_RESPONSE_TYPE); throwIfWriteConcernError(res); - // TODO: figure out why casting is necessary - return res as typeof operation.RESPONSE_TYPE extends typeof MongoDBResponse - ? InstanceType - : Document; + return res; } catch (commandError) { throw this.decorateCommandError(conn, cmd, options, commandError); } diff --git a/test/integration/crud/abstract_operation.test.ts b/test/integration/crud/abstract_operation.test.ts index d9793e73db5..fe4d1d47586 100644 --- a/test/integration/crud/abstract_operation.test.ts +++ b/test/integration/crud/abstract_operation.test.ts @@ -290,7 +290,7 @@ describe('abstract operation', function () { if (!WrapperSubclasses.includes(subclassType.name.toString())) { it(`operation.commandName equals key in command document`, async function () { const subclassInstance = subclassCreator(); - if (subclassInstance instanceof mongodb.ModernOperation) { + if (subclassInstance instanceof mongodb.ModernizedOperation) { return; } const yieldDoc = From 8ed641100f2e92f81f95ff5a4a89ebf69816afeb Mon Sep 17 00:00:00 2001 From: bailey Date: Thu, 31 Jul 2025 08:26:56 -0600 Subject: [PATCH 05/10] command op too --- src/operations/command.ts | 95 ++++++++++++++++++- src/operations/execute_operation.ts | 2 + src/operations/insert.ts | 31 ++++-- src/operations/search_indexes/drop.ts | 2 +- .../non-server-retryable_writes.test.ts | 2 +- 5 files changed, 119 insertions(+), 13 deletions(-) diff --git a/src/operations/command.ts b/src/operations/command.ts index 14d3762997c..f025113763a 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -1,3 +1,4 @@ +import { type Connection } from '..'; import type { BSONSerializeOptions, Document } from '../bson'; import { type MongoDBResponseConstructor } from '../cmap/wire_protocol/responses'; import { MongoInvalidArgumentError } from '../error'; @@ -9,14 +10,14 @@ import { } from '../explain'; import { ReadConcern } from '../read_concern'; import type { ReadPreference } from '../read_preference'; -import type { Server } from '../sdam/server'; +import type { Server, ServerCommandOptions } from '../sdam/server'; import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection'; import type { ClientSession } from '../sessions'; import { type TimeoutContext } from '../timeout'; import { commandSupportsReadConcern, maxWireVersion, MongoDBNamespace } from '../utils'; import { WriteConcern, type WriteConcernOptions } from '../write_concern'; import type { ReadConcernLike } from './../read_concern'; -import { AbstractOperation, Aspect, type OperationOptions } from './operation'; +import { AbstractOperation, Aspect, ModernizedOperation, type OperationOptions } from './operation'; /** @public */ export interface CollationOptions { @@ -183,3 +184,93 @@ export abstract class CommandOperation extends AbstractOperation { return await server.command(this.ns, cmd, options, responseType); } } + +export abstract class ModernizedCommandOperation extends ModernizedOperation { + override options: CommandOperationOptions; + readConcern?: ReadConcern; + writeConcern?: WriteConcern; + explain?: Explain; + + constructor(parent?: OperationParent, options?: CommandOperationOptions) { + super(options); + this.options = options ?? {}; + + // NOTE: this was explicitly added for the add/remove user operations, it's likely + // something we'd want to reconsider. Perhaps those commands can use `Admin` + // as a parent? + const dbNameOverride = options?.dbName || options?.authdb; + if (dbNameOverride) { + this.ns = new MongoDBNamespace(dbNameOverride, '$cmd'); + } else { + this.ns = parent + ? parent.s.namespace.withCollection('$cmd') + : new MongoDBNamespace('admin', '$cmd'); + } + + this.readConcern = ReadConcern.fromOptions(options); + this.writeConcern = WriteConcern.fromOptions(options); + + if (this.hasAspect(Aspect.EXPLAINABLE)) { + this.explain = Explain.fromOptions(options); + if (this.explain) validateExplainTimeoutOptions(this.options, this.explain); + } else if (options?.explain != null) { + throw new MongoInvalidArgumentError(`Option "explain" is not supported on this command`); + } + } + + override get canRetryWrite(): boolean { + if (this.hasAspect(Aspect.EXPLAINABLE)) { + return this.explain == null; + } + return super.canRetryWrite; + } + + abstract buildCommandDocument(connection: Connection, session?: ClientSession): Document; + + override buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions { + return { + ...this.options, + ...this.bsonOptions, + timeoutContext, + readPreference: this.readPreference, + session: this.session + }; + } + + override buildCommand(connection: Connection, session?: ClientSession): Document { + const command = this.buildCommandDocument(connection, session); + + const serverWireVersion = maxWireVersion(connection); + const inTransaction = this.session && this.session.inTransaction(); + + if (this.readConcern && commandSupportsReadConcern(command) && !inTransaction) { + Object.assign(command, { readConcern: this.readConcern }); + } + + if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) { + command.omitReadPreference = true; + } + + if (this.writeConcern && this.hasAspect(Aspect.WRITE_OPERATION) && !inTransaction) { + WriteConcern.apply(command, this.writeConcern); + } + + if ( + this.options.collation && + typeof this.options.collation === 'object' && + !this.hasAspect(Aspect.SKIP_COLLATION) + ) { + Object.assign(command, { collation: this.options.collation }); + } + + if (typeof this.options.maxTimeMS === 'number') { + command.maxTimeMS = this.options.maxTimeMS; + } + + if (this.hasAspect(Aspect.EXPLAINABLE) && this.explain) { + return decorateWithExplain(command, this.explain); + } + + return command; + } +} diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 399129a62ed..67b7d62dd2b 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -279,6 +279,8 @@ async function tryOperation< } } + operation.server = server; + try { // If tries > 0 and we are command batching we need to reset the batch. if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) { diff --git a/src/operations/insert.ts b/src/operations/insert.ts index 588468f3134..bbc324e65de 100644 --- a/src/operations/insert.ts +++ b/src/operations/insert.ts @@ -1,5 +1,7 @@ +import { type Connection } from '..'; import type { Document } from '../bson'; import type { BulkWriteOptions } from '../bulk/common'; +import { MongoDBResponse } from '../cmap/wire_protocol/responses'; import type { Collection } from '../collection'; import { MongoServerError } from '../error'; import type { InferIdType } from '../mongo_types'; @@ -7,12 +9,13 @@ import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { type TimeoutContext } from '../timeout'; import { maybeAddIdToDocuments, type MongoDBNamespace } from '../utils'; -import { CommandOperation, type CommandOperationOptions } from './command'; +import { type CommandOperationOptions, ModernizedCommandOperation } from './command'; import { Aspect, defineAspects } from './operation'; - /** @internal */ -export class InsertOperation extends CommandOperation { +export class InsertOperation extends ModernizedCommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; override options: BulkWriteOptions; + documents: Document[]; constructor(ns: MongoDBNamespace, documents: Document[], options: BulkWriteOptions) { @@ -26,11 +29,7 @@ export class InsertOperation extends CommandOperation { return 'insert' as const; } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { const options = this.options ?? {}; const ordered = typeof options.ordered === 'boolean' ? options.ordered : true; const command: Document = { @@ -49,7 +48,7 @@ export class InsertOperation extends CommandOperation { command.comment = options.comment; } - return await super.executeCommand(server, session, command, timeoutContext); + return command; } } @@ -91,6 +90,20 @@ export class InsertOneOperation extends InsertOperation { insertedId: this.documents[0]._id }; } + + override handleOk(response: InstanceType): Document { + const res = super.handleOk(response); + if (res.code) throw new MongoServerError(res); + if (res.writeErrors) { + // This should be a WriteError but we can't change it now because of error hierarchy + throw new MongoServerError(res.writeErrors[0]); + } + + return { + acknowledged: this.writeConcern?.w !== 0, + insertedId: this.documents[0]._id + }; + } } /** @public */ diff --git a/src/operations/search_indexes/drop.ts b/src/operations/search_indexes/drop.ts index d6fa8ea670c..243918de390 100644 --- a/src/operations/search_indexes/drop.ts +++ b/src/operations/search_indexes/drop.ts @@ -1,12 +1,12 @@ import { type Connection, type MongoError } from '../..'; import type { Document } from '../../bson'; +import { MongoDBResponse } from '../../cmap/wire_protocol/responses'; import type { Collection } from '../../collection'; import { MONGODB_ERROR_CODES, MongoServerError } from '../../error'; import type { ServerCommandOptions } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; import { type TimeoutContext } from '../../timeout'; import { ModernizedOperation } from '../operation'; -import { MongoDBResponse } from '../../cmap/wire_protocol/responses' /** @internal */ export class DropSearchIndexOperation extends ModernizedOperation { diff --git a/test/integration/retryable-writes/non-server-retryable_writes.test.ts b/test/integration/retryable-writes/non-server-retryable_writes.test.ts index cc04931e7ab..453928e6221 100644 --- a/test/integration/retryable-writes/non-server-retryable_writes.test.ts +++ b/test/integration/retryable-writes/non-server-retryable_writes.test.ts @@ -32,7 +32,7 @@ describe('Non Server Retryable Writes', function () { 'returns the original error with a PoolRequstedRetry label after encountering a WriteConcernError', { requires: { topology: 'replicaset' } }, async () => { - const serverCommandStub = sinon.stub(Server.prototype, 'command'); + const serverCommandStub = sinon.stub(Server.prototype, 'modernCommand'); serverCommandStub.onCall(0).rejects(new PoolClearedError('error')); serverCommandStub.onCall(1).returns( Promise.reject( From 84416e8621b5c084ccad47daa96867dfd5a5c642 Mon Sep 17 00:00:00 2001 From: bailey Date: Thu, 31 Jul 2025 13:25:22 -0600 Subject: [PATCH 06/10] broken test + lint --- drivers-evergreen-tools | 2 +- src/db.ts | 2 +- src/index.ts | 7 +++- src/operations/command.ts | 1 + src/operations/drop.ts | 2 ++ src/operations/operation.ts | 1 + .../mongodb-handshake.prose.test.ts | 36 ++++++++++--------- 7 files changed, 31 insertions(+), 20 deletions(-) diff --git a/drivers-evergreen-tools b/drivers-evergreen-tools index 3290c5a6873..3052fce9dfc 160000 --- a/drivers-evergreen-tools +++ b/drivers-evergreen-tools @@ -1 +1 @@ -Subproject commit 3290c5a68739ed1ba35de2c51a3eb0daa36d1f5b +Subproject commit 3052fce9dfc582cc9ab7a16f5b6b04647ce7ebb0 diff --git a/src/db.ts b/src/db.ts index dacc9cd2cd9..49b094bc100 100644 --- a/src/db.ts +++ b/src/db.ts @@ -6,7 +6,7 @@ import * as CONSTANTS from './constants'; import { AggregationCursor } from './cursor/aggregation_cursor'; import { ListCollectionsCursor } from './cursor/list_collections_cursor'; import { RunCommandCursor, type RunCursorCommandOptions } from './cursor/run_command_cursor'; -import { MONGODB_ERROR_CODES, MongoInvalidArgumentError, MongoServerError } from './error'; +import { MongoInvalidArgumentError } from './error'; import type { MongoClient, PkFactory } from './mongo_client'; import type { Abortable, TODO_NODE_3286 } from './mongo_types'; import type { AggregateOptions } from './operations/aggregate'; diff --git a/src/index.ts b/src/index.ts index b87a86042a2..58c14e81a9d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -543,7 +543,12 @@ export type { export type { InsertManyResult, InsertOneOptions, InsertOneResult } from './operations/insert'; export type { CollectionInfo, ListCollectionsOptions } from './operations/list_collections'; export type { ListDatabasesOptions, ListDatabasesResult } from './operations/list_databases'; -export type { AbstractOperation, Hint, OperationOptions } from './operations/operation'; +export type { + AbstractOperation, + Hint, + ModernizedOperation, + OperationOptions +} from './operations/operation'; export type { ProfilingLevelOptions } from './operations/profiling_level'; export type { RemoveUserOptions } from './operations/remove_user'; export type { RenameOptions } from './operations/rename'; diff --git a/src/operations/command.ts b/src/operations/command.ts index f025113763a..36216fbd980 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -185,6 +185,7 @@ export abstract class CommandOperation extends AbstractOperation { } } +/** @internal */ export abstract class ModernizedCommandOperation extends ModernizedOperation { override options: CommandOperationOptions; readConcern?: ReadConcern; diff --git a/src/operations/drop.ts b/src/operations/drop.ts index 7b1c6ee0482..833321fe014 100644 --- a/src/operations/drop.ts +++ b/src/operations/drop.ts @@ -1,6 +1,8 @@ +import { MongoServerError } from '..'; import type { Document } from '../bson'; import { CursorTimeoutContext } from '../cursor/abstract_cursor'; import type { Db } from '../db'; +import { MONGODB_ERROR_CODES } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { TimeoutContext } from '../timeout'; diff --git a/src/operations/operation.ts b/src/operations/operation.ts index dcca97654c9..586a29695fa 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -131,6 +131,7 @@ export abstract class AbstractOperation { } } +/** @internal */ export abstract class ModernizedOperation extends AbstractOperation { abstract SERVER_COMMAND_RESPONSE_TYPE: typeof MongoDBResponse; diff --git a/test/integration/mongodb-handshake/mongodb-handshake.prose.test.ts b/test/integration/mongodb-handshake/mongodb-handshake.prose.test.ts index 1cdd9e6d8fb..b2d127fc9ca 100644 --- a/test/integration/mongodb-handshake/mongodb-handshake.prose.test.ts +++ b/test/integration/mongodb-handshake/mongodb-handshake.prose.test.ts @@ -162,23 +162,25 @@ describe('Handshake Prose Tests', function () { let stubCalled = false; beforeEach(() => { // Mock the server response in a way that saslSupportedMechs array in the hello command response contains an arbitrary string. - sinon.stub(Connection.prototype, 'command').callsFake(async function (ns, cmd, options) { - // @ts-expect-error: sinon will place wrappedMethod there - const command = Connection.prototype.command.wrappedMethod.bind(this); - if (cmd.hello || cmd[LEGACY_HELLO_COMMAND]) { - return stub(); - } - return command(ns, cmd, options); - - async function stub() { - stubCalled = true; - const response = await command(ns, cmd, options); - return { - ...response, - saslSupportedMechs: [...(response.saslSupportedMechs ?? []), 'random string'] - }; - } - }); + sinon + .stub(Connection.prototype, 'command') + .callsFake(async function (ns, cmd, options, responseType) { + // @ts-expect-error: sinon will place wrappedMethod there + const command = Connection.prototype.command.wrappedMethod.bind(this); + if (cmd.hello || cmd[LEGACY_HELLO_COMMAND]) { + return stub(); + } + return command(ns, cmd, options, responseType); + + async function stub() { + stubCalled = true; + const response = await command(ns, cmd, options, responseType); + return { + ...response, + saslSupportedMechs: [...(response.saslSupportedMechs ?? []), 'random string'] + }; + } + }); }); afterEach(() => sinon.restore()); From 5e3db080ce3296b2e9ef09242a9cfc849ca8ee87 Mon Sep 17 00:00:00 2001 From: bailey Date: Thu, 31 Jul 2025 14:21:09 -0600 Subject: [PATCH 07/10] fix build --- src/operations/execute_operation.ts | 8 ++++---- src/sdam/server.ts | 2 +- .../retryable_writes.spec.prose.test.ts | 15 +++++++-------- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 67b7d62dd2b..9d48b5dfc43 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -313,8 +313,8 @@ async function tryOperation< } } - throw ( - previousOperationError ?? - new MongoRuntimeError('Tried to propagate retryability error, but no error was found.') - ); + if (previousOperationError) throw previousOperationError; + + // @ts-expect-error asdf + return; } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 4e4f70cbadb..bfaa9ac93d3 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -304,7 +304,7 @@ export class Server extends TypedEventEmitter { const options = operation.buildOptions(timeoutContext); const ns = operation.ns; - if (this.loadBalanced && isPinnableCommand(cmd, session)) { + if (this.loadBalanced && isPinnableCommand(cmd, session) && !session?.pinnedConnection) { session?.pin(conn); } diff --git a/test/integration/retryable-writes/retryable_writes.spec.prose.test.ts b/test/integration/retryable-writes/retryable_writes.spec.prose.test.ts index 039d81721eb..827455804df 100644 --- a/test/integration/retryable-writes/retryable_writes.spec.prose.test.ts +++ b/test/integration/retryable-writes/retryable_writes.spec.prose.test.ts @@ -275,14 +275,13 @@ describe('Retryable Writes Spec Prose', () => { 'when a retry attempt fails with an error labeled NoWritesPerformed, drivers MUST return the original error', { requires: { topology: 'replicaset', mongodb: '>=4.2.9' } }, async () => { - const serverCommandStub = sinon.stub(Server.prototype, 'command'); - serverCommandStub.onCall(0).returns( - Promise.reject( - new MongoWriteConcernError({ - errorLabels: ['RetryableWriteError'], - writeConcernError: { errmsg: 'ShutdownInProgress error', code: 91 } - }) - ) + const serverCommandStub = sinon.stub(Server.prototype, 'modernCommand'); + serverCommandStub.onCall(0).rejects( + new MongoWriteConcernError({ + errorLabels: ['RetryableWriteError'], + writeConcernError: { errmsg: 'ShutdownInProgress error', code: 91 }, + ok: 1 + }) ); serverCommandStub.onCall(1).returns( Promise.reject( From 8f3a175ffb0c5cd7ce972930e951c61a1fbefe44 Mon Sep 17 00:00:00 2001 From: bailey Date: Thu, 31 Jul 2025 16:09:11 -0600 Subject: [PATCH 08/10] operation count tests --- test/integration/server-selection/operation_count.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/integration/server-selection/operation_count.test.ts b/test/integration/server-selection/operation_count.test.ts index d02c18d02a5..cf2ec4e5d76 100644 --- a/test/integration/server-selection/operation_count.test.ts +++ b/test/integration/server-selection/operation_count.test.ts @@ -120,7 +120,7 @@ describe('Server Operation Count Tests', function () { it('is zero after a successful command', testMetadata, async function () { const server = Array.from(client.topology.s.servers.values())[0]; expect(server.s.operationCount).to.equal(0); - const commandSpy = sinon.spy(server, 'command'); + const commandSpy = sinon.spy(server, 'modernCommand'); const incrementSpy = sinon.spy(server, 'incrementOperationCount'); const decrementSpy = sinon.spy(server, 'decrementOperationCount'); @@ -147,7 +147,7 @@ describe('Server Operation Count Tests', function () { const server = Array.from(client.topology.s.servers.values())[0]; expect(server.s.operationCount).to.equal(0); - const commandSpy = sinon.spy(server, 'command'); + const commandSpy = sinon.spy(server, 'modernCommand'); const error = await collection.insertOne({ count: 1 }).catch(e => e); @@ -171,7 +171,7 @@ describe('Server Operation Count Tests', function () { sinon .stub(ConnectionPool.prototype, 'checkOut') .rejects(new Error('unable to checkout connection')); - const commandSpy = sinon.spy(server, 'command'); + const commandSpy = sinon.spy(server, 'modernCommand'); const error = await collection.insertOne({ count: 1 }).catch(e => e); From f0251256d4fc3b53d2e8cfdca2c3bf3f49930f01 Mon Sep 17 00:00:00 2001 From: bailey Date: Fri, 1 Aug 2025 12:44:39 -0600 Subject: [PATCH 09/10] fix TS issue --- src/operations/execute_operation.ts | 10 +++++----- src/operations/operation.ts | 12 ++++++++++-- src/operations/search_indexes/drop.ts | 1 + 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 9d48b5dfc43..723c38ee93d 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -295,7 +295,7 @@ async function tryOperation< const result = await server.modernCommand(operation, timeoutContext); return operation.handleOk(result); } catch (error) { - operation.handleError(error); + return operation.handleError(error); } } catch (operationError) { if (!(operationError instanceof MongoError)) throw operationError; @@ -313,8 +313,8 @@ async function tryOperation< } } - if (previousOperationError) throw previousOperationError; - - // @ts-expect-error asdf - return; + throw ( + previousOperationError ?? + new MongoRuntimeError('Tried to propagate retryability error, but no error was found.') + ); } diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 586a29695fa..4171c689c1f 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -144,8 +144,14 @@ export abstract class ModernizedOperation extends AbstractOperation extends AbstractOperation { if (typeof this.name === 'string') { command.name = this.name; } + return command; } From bafad0553b859ed80812ccccc14cfb549d2f7c88 Mon Sep 17 00:00:00 2001 From: bailey Date: Fri, 1 Aug 2025 13:18:57 -0600 Subject: [PATCH 10/10] update det --- drivers-evergreen-tools | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers-evergreen-tools b/drivers-evergreen-tools index 3052fce9dfc..36c35d01173 160000 --- a/drivers-evergreen-tools +++ b/drivers-evergreen-tools @@ -1 +1 @@ -Subproject commit 3052fce9dfc582cc9ab7a16f5b6b04647ce7ebb0 +Subproject commit 36c35d0117336323fddd21292c2b3a925ccd4f63