Skip to content

refactor(NODE-7075): introduce ModernizedOperation and ModernizedCommandOperation to operations hierarchy #4604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,12 @@ export type {
export type { InsertManyResult, InsertOneOptions, InsertOneResult } from './operations/insert';
export type { CollectionInfo, ListCollectionsOptions } from './operations/list_collections';
export type { ListDatabasesOptions, ListDatabasesResult } from './operations/list_databases';
export type { AbstractOperation, Hint, OperationOptions } from './operations/operation';
export type {
AbstractOperation,
Hint,
ModernizedOperation,
OperationOptions
} from './operations/operation';
export type { ProfilingLevelOptions } from './operations/profiling_level';
export type { RemoveUserOptions } from './operations/remove_user';
export type { RenameOptions } from './operations/rename';
Expand Down
96 changes: 94 additions & 2 deletions src/operations/command.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { type Connection } from '..';
import type { BSONSerializeOptions, Document } from '../bson';
import { type MongoDBResponseConstructor } from '../cmap/wire_protocol/responses';
import { MongoInvalidArgumentError } from '../error';
Expand All @@ -9,14 +10,14 @@ import {
} from '../explain';
import { ReadConcern } from '../read_concern';
import type { ReadPreference } from '../read_preference';
import type { Server } from '../sdam/server';
import type { Server, ServerCommandOptions } from '../sdam/server';
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { commandSupportsReadConcern, maxWireVersion, MongoDBNamespace } from '../utils';
import { WriteConcern, type WriteConcernOptions } from '../write_concern';
import type { ReadConcernLike } from './../read_concern';
import { AbstractOperation, Aspect, type OperationOptions } from './operation';
import { AbstractOperation, Aspect, ModernizedOperation, type OperationOptions } from './operation';

/** @public */
export interface CollationOptions {
Expand Down Expand Up @@ -183,3 +184,94 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
return await server.command(this.ns, cmd, options, responseType);
}
}

/** @internal */
export abstract class ModernizedCommandOperation<T> extends ModernizedOperation<T> {
override options: CommandOperationOptions;
readConcern?: ReadConcern;
writeConcern?: WriteConcern;
explain?: Explain;

constructor(parent?: OperationParent, options?: CommandOperationOptions) {
super(options);
this.options = options ?? {};

// NOTE: this was explicitly added for the add/remove user operations, it's likely
// something we'd want to reconsider. Perhaps those commands can use `Admin`
// as a parent?
const dbNameOverride = options?.dbName || options?.authdb;
if (dbNameOverride) {
this.ns = new MongoDBNamespace(dbNameOverride, '$cmd');
} else {
this.ns = parent
? parent.s.namespace.withCollection('$cmd')
: new MongoDBNamespace('admin', '$cmd');
}

this.readConcern = ReadConcern.fromOptions(options);
this.writeConcern = WriteConcern.fromOptions(options);

if (this.hasAspect(Aspect.EXPLAINABLE)) {
this.explain = Explain.fromOptions(options);
if (this.explain) validateExplainTimeoutOptions(this.options, this.explain);
} else if (options?.explain != null) {
throw new MongoInvalidArgumentError(`Option "explain" is not supported on this command`);
}
}

override get canRetryWrite(): boolean {
if (this.hasAspect(Aspect.EXPLAINABLE)) {
return this.explain == null;
}
return super.canRetryWrite;
}

abstract buildCommandDocument(connection: Connection, session?: ClientSession): Document;

override buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions {
return {
...this.options,
...this.bsonOptions,
timeoutContext,
readPreference: this.readPreference,
session: this.session
};
}

override buildCommand(connection: Connection, session?: ClientSession): Document {
const command = this.buildCommandDocument(connection, session);

const serverWireVersion = maxWireVersion(connection);
const inTransaction = this.session && this.session.inTransaction();

if (this.readConcern && commandSupportsReadConcern(command) && !inTransaction) {
Object.assign(command, { readConcern: this.readConcern });
}

if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) {
command.omitReadPreference = true;
}

if (this.writeConcern && this.hasAspect(Aspect.WRITE_OPERATION) && !inTransaction) {
WriteConcern.apply(command, this.writeConcern);
}

if (
this.options.collation &&
typeof this.options.collation === 'object' &&
!this.hasAspect(Aspect.SKIP_COLLATION)
) {
Object.assign(command, { collation: this.options.collation });
}

if (typeof this.options.maxTimeMS === 'number') {
command.maxTimeMS = this.options.maxTimeMS;
}

if (this.hasAspect(Aspect.EXPLAINABLE) && this.explain) {
return decorateWithExplain(command, this.explain);
}

return command;
}
}
3 changes: 2 additions & 1 deletion src/operations/drop.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { MongoServerError } from '..';
import type { Document } from '../bson';
import { CursorTimeoutContext } from '../cursor/abstract_cursor';
import type { Db } from '../db';
import { MONGODB_ERROR_CODES, MongoServerError } from '../error';
import { MONGODB_ERROR_CODES } from '../error';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
Expand Down
20 changes: 18 additions & 2 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
import { abortable, supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';
import { AbstractOperation, Aspect, ModernizedOperation } from './operation';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
Expand Down Expand Up @@ -85,6 +85,8 @@ export async function executeOperation<
throw new MongoInvalidArgumentError('ClientSession must be from the same MongoClient');
}

operation.session ??= session;

const readPreference = operation.readPreference ?? ReadPreference.primary;
const inTransaction = !!session?.inTransaction();

Expand Down Expand Up @@ -231,6 +233,8 @@ async function tryOperation<
let previousOperationError: MongoError | undefined;
let previousServer: ServerDescription | undefined;

const isModernOperation = operation instanceof ModernizedOperation;

for (let tries = 0; tries < maxTries; tries++) {
if (previousOperationError) {
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
Expand Down Expand Up @@ -276,12 +280,24 @@ async function tryOperation<
}
}

operation.server = server;

try {
// If tries > 0 and we are command batching we need to reset the batch.
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
operation.resetBatch();
}
return await operation.execute(server, session, timeoutContext);

if (!isModernOperation) {
return await operation.execute(server, session, timeoutContext);
}

try {
const result = await server.modernCommand(operation, timeoutContext);
return operation.handleOk(result);
} catch (error) {
return operation.handleError(error);
}
} catch (operationError) {
if (!(operationError instanceof MongoError)) throw operationError;
if (
Expand Down
31 changes: 22 additions & 9 deletions src/operations/insert.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import { type Connection } from '..';
import type { Document } from '../bson';
import type { BulkWriteOptions } from '../bulk/common';
import { MongoDBResponse } from '../cmap/wire_protocol/responses';
import type { Collection } from '../collection';
import { MongoServerError } from '../error';
import type { InferIdType } from '../mongo_types';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { maybeAddIdToDocuments, type MongoDBNamespace } from '../utils';
import { CommandOperation, type CommandOperationOptions } from './command';
import { type CommandOperationOptions, ModernizedCommandOperation } from './command';
import { Aspect, defineAspects } from './operation';

/** @internal */
export class InsertOperation extends CommandOperation<Document> {
export class InsertOperation extends ModernizedCommandOperation<Document> {
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;
override options: BulkWriteOptions;

documents: Document[];

constructor(ns: MongoDBNamespace, documents: Document[], options: BulkWriteOptions) {
Expand All @@ -26,11 +29,7 @@ export class InsertOperation extends CommandOperation<Document> {
return 'insert' as const;
}

override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<Document> {
override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
const options = this.options ?? {};
const ordered = typeof options.ordered === 'boolean' ? options.ordered : true;
const command: Document = {
Expand All @@ -49,7 +48,7 @@ export class InsertOperation extends CommandOperation<Document> {
command.comment = options.comment;
}

return await super.executeCommand(server, session, command, timeoutContext);
return command;
}
}

Expand Down Expand Up @@ -91,6 +90,20 @@ export class InsertOneOperation extends InsertOperation {
insertedId: this.documents[0]._id
};
}

override handleOk(response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>): Document {
const res = super.handleOk(response);
if (res.code) throw new MongoServerError(res);
if (res.writeErrors) {
// This should be a WriteError but we can't change it now because of error hierarchy
throw new MongoServerError(res.writeErrors[0]);
}

return {
acknowledged: this.writeConcern?.w !== 0,
insertedId: this.documents[0]._id
};
}
}

/** @public */
Expand Down
61 changes: 60 additions & 1 deletion src/operations/operation.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { type Connection, type MongoError } from '..';
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '../bson';
import { type MongoDBResponse } from '../cmap/wire_protocol/responses';
import { type Abortable } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import type { Server, ServerCommandOptions } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import type { MongoDBNamespace } from '../utils';
Expand Down Expand Up @@ -108,6 +110,10 @@ export abstract class AbstractOperation<TResult = any> {
return this._session;
}

set session(session: ClientSession) {
this._session = session;
}

clearSession() {
this._session = undefined;
}
Expand All @@ -125,6 +131,59 @@ export abstract class AbstractOperation<TResult = any> {
}
}

/** @internal */
export abstract class ModernizedOperation<TResult> extends AbstractOperation<TResult> {
abstract SERVER_COMMAND_RESPONSE_TYPE: typeof MongoDBResponse;

/** this will never be used - but we must implement it to satisfy AbstractOperation's interface */
override execute(
_server: Server,
_session: ClientSession | undefined,
_timeoutContext: TimeoutContext
): Promise<TResult> {
throw new Error('cannot execute!!');
}

/**
* Build a raw command document.
*/
abstract buildCommand(connection: Connection, session?: ClientSession): Document;

/**
* Builds an instance of `ServerCommandOptions` to be used for operation execution.
*/
abstract buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions;

/**
* Given an instance of a MongoDBResponse, map the response to the correct result type. For
* example, a `CountOperation` might map the response as follows:
*
* ```typescript
* override handleOk(response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>): TResult {
* return response.toObject(this.bsonOptions).n ?? 0;
* }
*
* // or, with type safety:
* override handleOk(response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>): TResult {
* return response.getNumber('n') ?? 0;
* }
* ```
*/
handleOk(response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>): TResult {
return response.toObject(this.bsonOptions) as TResult;
}

/**
* Optional.
*
* If the operation performs error handling, such as wrapping, renaming the error, or squashing errors
* this method can be overridden.
*/
handleError(error: MongoError): TResult | never {
throw error;
}
}

export function defineAspects(
operation: { aspects?: Set<symbol> },
aspects: symbol | symbol[] | Set<symbol>
Expand Down
Loading