Skip to content

Commit ff9a785

Browse files
refactor(NODE-7075): introduce ModernizedOperation and ModernizedCommandOperation to operations hierarchy (#4604)
1 parent be7f808 commit ff9a785

File tree

13 files changed

+355
-61
lines changed

13 files changed

+355
-61
lines changed

src/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,12 @@ export type {
543543
export type { InsertManyResult, InsertOneOptions, InsertOneResult } from './operations/insert';
544544
export type { CollectionInfo, ListCollectionsOptions } from './operations/list_collections';
545545
export type { ListDatabasesOptions, ListDatabasesResult } from './operations/list_databases';
546-
export type { AbstractOperation, Hint, OperationOptions } from './operations/operation';
546+
export type {
547+
AbstractOperation,
548+
Hint,
549+
ModernizedOperation,
550+
OperationOptions
551+
} from './operations/operation';
547552
export type { ProfilingLevelOptions } from './operations/profiling_level';
548553
export type { RemoveUserOptions } from './operations/remove_user';
549554
export type { RenameOptions } from './operations/rename';

src/operations/command.ts

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { type Connection } from '..';
12
import type { BSONSerializeOptions, Document } from '../bson';
23
import { type MongoDBResponseConstructor } from '../cmap/wire_protocol/responses';
34
import { MongoInvalidArgumentError } from '../error';
@@ -9,14 +10,14 @@ import {
910
} from '../explain';
1011
import { ReadConcern } from '../read_concern';
1112
import type { ReadPreference } from '../read_preference';
12-
import type { Server } from '../sdam/server';
13+
import type { Server, ServerCommandOptions } from '../sdam/server';
1314
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection';
1415
import type { ClientSession } from '../sessions';
1516
import { type TimeoutContext } from '../timeout';
1617
import { commandSupportsReadConcern, maxWireVersion, MongoDBNamespace } from '../utils';
1718
import { WriteConcern, type WriteConcernOptions } from '../write_concern';
1819
import type { ReadConcernLike } from './../read_concern';
19-
import { AbstractOperation, Aspect, type OperationOptions } from './operation';
20+
import { AbstractOperation, Aspect, ModernizedOperation, type OperationOptions } from './operation';
2021

2122
/** @public */
2223
export interface CollationOptions {
@@ -183,3 +184,94 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
183184
return await server.command(this.ns, cmd, options, responseType);
184185
}
185186
}
187+
188+
/** @internal */
189+
export abstract class ModernizedCommandOperation<T> extends ModernizedOperation<T> {
190+
override options: CommandOperationOptions;
191+
readConcern?: ReadConcern;
192+
writeConcern?: WriteConcern;
193+
explain?: Explain;
194+
195+
constructor(parent?: OperationParent, options?: CommandOperationOptions) {
196+
super(options);
197+
this.options = options ?? {};
198+
199+
// NOTE: this was explicitly added for the add/remove user operations, it's likely
200+
// something we'd want to reconsider. Perhaps those commands can use `Admin`
201+
// as a parent?
202+
const dbNameOverride = options?.dbName || options?.authdb;
203+
if (dbNameOverride) {
204+
this.ns = new MongoDBNamespace(dbNameOverride, '$cmd');
205+
} else {
206+
this.ns = parent
207+
? parent.s.namespace.withCollection('$cmd')
208+
: new MongoDBNamespace('admin', '$cmd');
209+
}
210+
211+
this.readConcern = ReadConcern.fromOptions(options);
212+
this.writeConcern = WriteConcern.fromOptions(options);
213+
214+
if (this.hasAspect(Aspect.EXPLAINABLE)) {
215+
this.explain = Explain.fromOptions(options);
216+
if (this.explain) validateExplainTimeoutOptions(this.options, this.explain);
217+
} else if (options?.explain != null) {
218+
throw new MongoInvalidArgumentError(`Option "explain" is not supported on this command`);
219+
}
220+
}
221+
222+
override get canRetryWrite(): boolean {
223+
if (this.hasAspect(Aspect.EXPLAINABLE)) {
224+
return this.explain == null;
225+
}
226+
return super.canRetryWrite;
227+
}
228+
229+
abstract buildCommandDocument(connection: Connection, session?: ClientSession): Document;
230+
231+
override buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions {
232+
return {
233+
...this.options,
234+
...this.bsonOptions,
235+
timeoutContext,
236+
readPreference: this.readPreference,
237+
session: this.session
238+
};
239+
}
240+
241+
override buildCommand(connection: Connection, session?: ClientSession): Document {
242+
const command = this.buildCommandDocument(connection, session);
243+
244+
const serverWireVersion = maxWireVersion(connection);
245+
const inTransaction = this.session && this.session.inTransaction();
246+
247+
if (this.readConcern && commandSupportsReadConcern(command) && !inTransaction) {
248+
Object.assign(command, { readConcern: this.readConcern });
249+
}
250+
251+
if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) {
252+
command.omitReadPreference = true;
253+
}
254+
255+
if (this.writeConcern && this.hasAspect(Aspect.WRITE_OPERATION) && !inTransaction) {
256+
WriteConcern.apply(command, this.writeConcern);
257+
}
258+
259+
if (
260+
this.options.collation &&
261+
typeof this.options.collation === 'object' &&
262+
!this.hasAspect(Aspect.SKIP_COLLATION)
263+
) {
264+
Object.assign(command, { collation: this.options.collation });
265+
}
266+
267+
if (typeof this.options.maxTimeMS === 'number') {
268+
command.maxTimeMS = this.options.maxTimeMS;
269+
}
270+
271+
if (this.hasAspect(Aspect.EXPLAINABLE) && this.explain) {
272+
return decorateWithExplain(command, this.explain);
273+
}
274+
275+
return command;
276+
}
277+
}

src/operations/drop.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import { MongoServerError } from '..';
12
import type { Document } from '../bson';
23
import { CursorTimeoutContext } from '../cursor/abstract_cursor';
34
import type { Db } from '../db';
4-
import { MONGODB_ERROR_CODES, MongoServerError } from '../error';
5+
import { MONGODB_ERROR_CODES } from '../error';
56
import type { Server } from '../sdam/server';
67
import type { ClientSession } from '../sessions';
78
import { TimeoutContext } from '../timeout';

src/operations/execute_operation.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import type { Topology } from '../sdam/topology';
2626
import type { ClientSession } from '../sessions';
2727
import { TimeoutContext } from '../timeout';
2828
import { abortable, supportsRetryableWrites } from '../utils';
29-
import { AbstractOperation, Aspect } from './operation';
29+
import { AbstractOperation, Aspect, ModernizedOperation } from './operation';
3030

3131
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
3232
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
@@ -85,6 +85,8 @@ export async function executeOperation<
8585
throw new MongoInvalidArgumentError('ClientSession must be from the same MongoClient');
8686
}
8787

88+
operation.session ??= session;
89+
8890
const readPreference = operation.readPreference ?? ReadPreference.primary;
8991
const inTransaction = !!session?.inTransaction();
9092

@@ -231,6 +233,8 @@ async function tryOperation<
231233
let previousOperationError: MongoError | undefined;
232234
let previousServer: ServerDescription | undefined;
233235

236+
const isModernOperation = operation instanceof ModernizedOperation;
237+
234238
for (let tries = 0; tries < maxTries; tries++) {
235239
if (previousOperationError) {
236240
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
@@ -276,12 +280,24 @@ async function tryOperation<
276280
}
277281
}
278282

283+
operation.server = server;
284+
279285
try {
280286
// If tries > 0 and we are command batching we need to reset the batch.
281287
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
282288
operation.resetBatch();
283289
}
284-
return await operation.execute(server, session, timeoutContext);
290+
291+
if (!isModernOperation) {
292+
return await operation.execute(server, session, timeoutContext);
293+
}
294+
295+
try {
296+
const result = await server.modernCommand(operation, timeoutContext);
297+
return operation.handleOk(result);
298+
} catch (error) {
299+
return operation.handleError(error);
300+
}
285301
} catch (operationError) {
286302
if (!(operationError instanceof MongoError)) throw operationError;
287303
if (

src/operations/insert.ts

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
1+
import { type Connection } from '..';
12
import type { Document } from '../bson';
23
import type { BulkWriteOptions } from '../bulk/common';
4+
import { MongoDBResponse } from '../cmap/wire_protocol/responses';
35
import type { Collection } from '../collection';
46
import { MongoServerError } from '../error';
57
import type { InferIdType } from '../mongo_types';
68
import type { Server } from '../sdam/server';
79
import type { ClientSession } from '../sessions';
810
import { type TimeoutContext } from '../timeout';
911
import { maybeAddIdToDocuments, type MongoDBNamespace } from '../utils';
10-
import { CommandOperation, type CommandOperationOptions } from './command';
12+
import { type CommandOperationOptions, ModernizedCommandOperation } from './command';
1113
import { Aspect, defineAspects } from './operation';
12-
1314
/** @internal */
14-
export class InsertOperation extends CommandOperation<Document> {
15+
export class InsertOperation extends ModernizedCommandOperation<Document> {
16+
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;
1517
override options: BulkWriteOptions;
18+
1619
documents: Document[];
1720

1821
constructor(ns: MongoDBNamespace, documents: Document[], options: BulkWriteOptions) {
@@ -26,11 +29,7 @@ export class InsertOperation extends CommandOperation<Document> {
2629
return 'insert' as const;
2730
}
2831

29-
override async execute(
30-
server: Server,
31-
session: ClientSession | undefined,
32-
timeoutContext: TimeoutContext
33-
): Promise<Document> {
32+
override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
3433
const options = this.options ?? {};
3534
const ordered = typeof options.ordered === 'boolean' ? options.ordered : true;
3635
const command: Document = {
@@ -49,7 +48,7 @@ export class InsertOperation extends CommandOperation<Document> {
4948
command.comment = options.comment;
5049
}
5150

52-
return await super.executeCommand(server, session, command, timeoutContext);
51+
return command;
5352
}
5453
}
5554

@@ -91,6 +90,20 @@ export class InsertOneOperation extends InsertOperation {
9190
insertedId: this.documents[0]._id
9291
};
9392
}
93+
94+
override handleOk(response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>): Document {
95+
const res = super.handleOk(response);
96+
if (res.code) throw new MongoServerError(res);
97+
if (res.writeErrors) {
98+
// This should be a WriteError but we can't change it now because of error hierarchy
99+
throw new MongoServerError(res.writeErrors[0]);
100+
}
101+
102+
return {
103+
acknowledged: this.writeConcern?.w !== 0,
104+
insertedId: this.documents[0]._id
105+
};
106+
}
94107
}
95108

96109
/** @public */

src/operations/operation.ts

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import { type Connection, type MongoError } from '..';
12
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '../bson';
3+
import { type MongoDBResponse } from '../cmap/wire_protocol/responses';
24
import { type Abortable } from '../mongo_types';
35
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
4-
import type { Server } from '../sdam/server';
6+
import type { Server, ServerCommandOptions } from '../sdam/server';
57
import type { ClientSession } from '../sessions';
68
import { type TimeoutContext } from '../timeout';
79
import type { MongoDBNamespace } from '../utils';
@@ -108,6 +110,10 @@ export abstract class AbstractOperation<TResult = any> {
108110
return this._session;
109111
}
110112

113+
set session(session: ClientSession) {
114+
this._session = session;
115+
}
116+
111117
clearSession() {
112118
this._session = undefined;
113119
}
@@ -125,6 +131,59 @@ export abstract class AbstractOperation<TResult = any> {
125131
}
126132
}
127133

134+
/** @internal */
135+
export abstract class ModernizedOperation<TResult> extends AbstractOperation<TResult> {
136+
abstract SERVER_COMMAND_RESPONSE_TYPE: typeof MongoDBResponse;
137+
138+
/** this will never be used - but we must implement it to satisfy AbstractOperation's interface */
139+
override execute(
140+
_server: Server,
141+
_session: ClientSession | undefined,
142+
_timeoutContext: TimeoutContext
143+
): Promise<TResult> {
144+
throw new Error('cannot execute!!');
145+
}
146+
147+
/**
148+
* Build a raw command document.
149+
*/
150+
abstract buildCommand(connection: Connection, session?: ClientSession): Document;
151+
152+
/**
153+
* Builds an instance of `ServerCommandOptions` to be used for operation execution.
154+
*/
155+
abstract buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions;
156+
157+
/**
158+
* Given an instance of a MongoDBResponse, map the response to the correct result type. For
159+
* example, a `CountOperation` might map the response as follows:
160+
*
161+
* ```typescript
162+
* override handleOk(response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>): TResult {
163+
* return response.toObject(this.bsonOptions).n ?? 0;
164+
* }
165+
*
166+
* // or, with type safety:
167+
* override handleOk(response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>): TResult {
168+
* return response.getNumber('n') ?? 0;
169+
* }
170+
* ```
171+
*/
172+
handleOk(response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>): TResult {
173+
return response.toObject(this.bsonOptions) as TResult;
174+
}
175+
176+
/**
177+
* Optional.
178+
*
179+
* If the operation performs error handling, such as wrapping, renaming the error, or squashing errors
180+
* this method can be overridden.
181+
*/
182+
handleError(error: MongoError): TResult | never {
183+
throw error;
184+
}
185+
}
186+
128187
export function defineAspects(
129188
operation: { aspects?: Set<symbol> },
130189
aspects: symbol | symbol[] | Set<symbol>

0 commit comments

Comments
 (0)