Skip to content

Commit 39bdcdb

Browse files
WIP
1 parent 253ffa5 commit 39bdcdb

18 files changed

+95
-79
lines changed

src/cmap/connection.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
582582
this.throwIfAborted();
583583
}
584584
} catch (error) {
585+
if (options.session != null) {
586+
updateSessionFromResponse(options.session, MongoDBResponse.empty);
587+
}
585588
if (this.shouldEmitAndLogCommand) {
586589
this.emitAndLogCommand(
587590
this.monitorCommands,

src/operations/execute_operation.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import {
3737
supportsRetryableWrites
3838
} from '../utils';
3939
import { AggregateOperation } from './aggregate';
40-
import { AbstractOperation, Aspect } from './operation';
40+
import { AbstractOperation, Aspect, RetryContext } from './operation';
4141

4242
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
4343
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
@@ -254,13 +254,18 @@ async function executeOperationWithRetries<
254254
2 // backoff rate
255255
);
256256

257+
const retryContext =
258+
operation.retryContext ??
259+
new RetryContext(willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1);
257260
for (
258-
let attempt = 0, maxAttempts = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
259-
attempt <= maxAttempts;
261+
let attempt = 0;
262+
attempt <= retryContext.maxAttempts;
260263
attempt++,
261-
maxAttempts = previousOperationError?.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
264+
retryContext.maxAttempts = previousOperationError?.hasErrorLabel(
265+
MongoErrorLabel.SystemOverloadedError
266+
)
262267
? 5
263-
: maxAttempts
268+
: retryContext.maxAttempts
264269
) {
265270
if (previousOperationError) {
266271
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
@@ -275,8 +280,8 @@ async function executeOperationWithRetries<
275280
// bulk write commands are retryable if all operations in the batch are retryable
276281
(operation.hasAspect(Aspect.COMMAND_BATCHING) && operation.canRetryWrite) ||
277282
// if we have a retryable read or write operation, we can retry
278-
(hasWriteAspect && isRetryableWriteError(previousOperationError)) ||
279-
(hasReadAspect && isRetryableReadError(previousOperationError)) ||
283+
(hasWriteAspect && willRetryWrite && isRetryableWriteError(previousOperationError)) ||
284+
(hasReadAspect && willRetryRead && isRetryableReadError(previousOperationError)) ||
280285
// if we have a retryable, system overloaded error, we can retry
281286
(previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
282287
previousOperationError.hasErrorLabel(MongoErrorLabel.RetryableError));

src/operations/operation.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ export interface OperationOptions extends BSONSerializeOptions {
4545
timeoutMS?: number;
4646
}
4747

48+
export class RetryContext {
49+
constructor(public maxAttempts: number) {}
50+
}
51+
4852
/**
4953
* This class acts as a parent class for any operation and is responsible for setting this.options,
5054
* as well as setting and getting a session.
@@ -66,6 +70,8 @@ export abstract class AbstractOperation<TResult = any> {
6670
/** Specifies the time an operation will run until it throws a timeout error. */
6771
timeoutMS?: number;
6872

73+
retryContext?: RetryContext;
74+
6975
private _session: ClientSession | undefined;
7076

7177
static aspects?: Set<symbol>;

src/sessions.ts

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
import type { MongoClient, MongoOptions } from './mongo_client';
2424
import { TypedEventEmitter } from './mongo_types';
2525
import { executeOperation } from './operations/execute_operation';
26+
import { RetryContext } from './operations/operation';
2627
import { RunCommandOperation } from './operations/run_command';
2728
import { ReadConcernLevel } from './read_concern';
2829
import { ReadPreference } from './read_preference';
@@ -466,7 +467,11 @@ export class ClientSession
466467
} else {
467468
const wcKeys = Object.keys(wc);
468469
if (wcKeys.length > 2 || (!wcKeys.includes('wtimeoutMS') && !wcKeys.includes('wTimeoutMS')))
469-
// if the write concern was specified with wTimeoutMS, then we set both wtimeoutMS and wTimeoutMS, guaranteeing at least two keys, so if we have more than two keys, then we can automatically assume that we should add the write concern to the command. If it has 2 or fewer keys, we need to check that those keys aren't the wtimeoutMS or wTimeoutMS options before we add the write concern to the command
470+
// if the write concern was specified with wTimeoutMS, then we set both wtimeoutMS
471+
// and wTimeoutMS, guaranteeing at least two keys, so if we have more than two keys,
472+
// then we can automatically assume that we should add the write concern to the command.
473+
// If it has 2 or fewer keys, we need to check that those keys aren't the wtimeoutMS
474+
// or wTimeoutMS options before we add the write concern to the command
470475
WriteConcern.apply(command, { ...wc, wtimeoutMS: undefined });
471476
}
472477
}
@@ -487,11 +492,14 @@ export class ClientSession
487492
command.recoveryToken = this.transaction.recoveryToken;
488493
}
489494

495+
const retryContext = new RetryContext(5);
496+
490497
const operation = new RunCommandOperation(new MongoDBNamespace('admin'), command, {
491498
session: this,
492499
readPreference: ReadPreference.primary,
493500
bypassPinningCheck: true
494501
});
502+
operation.retryContext = retryContext;
495503

496504
const timeoutContext =
497505
this.timeoutContext ??
@@ -516,15 +524,13 @@ export class ClientSession
516524
this.unpin({ force: true });
517525

518526
try {
519-
await executeOperation(
520-
this.client,
521-
new RunCommandOperation(new MongoDBNamespace('admin'), command, {
522-
session: this,
523-
readPreference: ReadPreference.primary,
524-
bypassPinningCheck: true
525-
}),
526-
timeoutContext
527-
);
527+
const op = new RunCommandOperation(new MongoDBNamespace('admin'), command, {
528+
session: this,
529+
readPreference: ReadPreference.primary,
530+
bypassPinningCheck: true
531+
});
532+
op.retryContext = retryContext;
533+
await executeOperation(this.client, op, timeoutContext);
528534
return;
529535
} catch (retryCommitError) {
530536
// If the retry failed, we process that error instead of the original
@@ -957,6 +963,11 @@ export class ServerSession {
957963
id: ServerSessionId;
958964
lastUse: number;
959965
txnNumber: number;
966+
967+
/*
968+
* Indicates that a network error has been encountered while using this session.
969+
* Once a session is marked as dirty, it is always dirty.
970+
*/
960971
isDirty: boolean;
961972

962973
/** @internal */
@@ -1050,16 +1061,15 @@ export class ServerSessionPool {
10501061
* @param session - The session to release to the pool
10511062
*/
10521063
release(session: ServerSession): void {
1053-
const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
1064+
if (this.client.topology?.loadBalanced) {
1065+
if (session.isDirty) return;
10541066

1055-
if (this.client.topology?.loadBalanced && !sessionTimeoutMinutes) {
10561067
this.sessions.unshift(session);
1057-
}
1058-
1059-
if (!sessionTimeoutMinutes) {
10601068
return;
10611069
}
10621070

1071+
const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
1072+
10631073
this.sessions.prune(session => session.hasTimedOut(sessionTimeoutMinutes));
10641074

10651075
if (!session.hasTimedOut(sessionTimeoutMinutes)) {
@@ -1147,9 +1157,9 @@ export function applySession(
11471157
command.autocommit = false;
11481158

11491159
if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
1150-
session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
11511160
command.startTransaction = true;
11521161

1162+
// TODO: read concern only applied if it is not the same as the server's default
11531163
const readConcern =
11541164
session.transaction.options.readConcern || session?.clientOptions?.readConcern;
11551165
if (readConcern) {
@@ -1185,4 +1195,17 @@ export function updateSessionFromResponse(session: ClientSession, document: Mong
11851195
session.snapshotTime = atClusterTime;
11861196
}
11871197
}
1198+
1199+
if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
1200+
if (document.ok === 1) {
1201+
session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
1202+
} else {
1203+
const error = new MongoServerError(document.toObject());
1204+
const isBackpressureError = error.hasErrorLabel(MongoErrorLabel.RetryableError);
1205+
1206+
if (!isBackpressureError) {
1207+
session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
1208+
}
1209+
}
1210+
}
11881211
}

sync.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11

22

3-
cp ~/dev/specifications/source/client-backpressure/tests/* ~/dev/node-mongodb-native/test/spec/client-backpressure
3+
cp ~/dev/specifications/source/client-backpressure/tests/* ~/dev/node-mongodb-native/test/spec/client-backpressure
4+
cp ~/dev/specifications/source/transactions/tests/unified/backpressure* ~/dev/node-mongodb-native/test/spec/transactions/unified/

test/integration/client-backpressure/client-backpressure.spec.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ function shouldSkip({ description }: Test) {
1111
return skippedTests[description] ?? false;
1212
}
1313

14-
describe('Client Backpressure (spec)', function () {
14+
describe.only('Client Backpressure (spec)', function () {
1515
runUnifiedSuite(loadSpecTests('client-backpressure'), shouldSkip);
1616
});

test/integration/transactions/transactions.spec.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ const SKIPPED_TESTS = [
1313
'client bulkWrite with writeConcern in a transaction causes a transaction error'
1414
];
1515

16-
describe('Transactions Spec Unified Tests', function () {
16+
describe.only('Transactions Spec Unified Tests', function () {
1717
runUnifiedSuite(loadSpecTests(path.join('transactions', 'unified')), test => {
1818
return SKIPPED_TESTS.includes(test.description)
1919
? 'TODO(NODE-5924): Skipping failing transaction tests'

test/spec/client-backpressure/backpressure-retry-loop.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@
3131
},
3232
{
3333
"database": {
34-
"id": "database",
34+
"id": "internal_db",
3535
"client": "internal_client",
3636
"databaseName": "retryable-writes-tests"
3737
}
3838
},
3939
{
4040
"collection": {
4141
"id": "retryable-writes-tests",
42-
"database": "database",
42+
"database": "internal_db",
4343
"collectionName": "coll"
4444
}
4545
},

test/spec/client-backpressure/backpressure-retry-loop.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ createEntities:
2323

2424
-
2525
database:
26-
id: &internal_db database
26+
id: &internal_db internal_db
2727
client: *internal_client
2828
databaseName: &database_name retryable-writes-tests
2929

test/spec/client-backpressure/backpressure-retry-loop.yml.template

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ createEntities:
2323

2424
-
2525
database:
26-
id: &internal_db database
26+
id: &internal_db internal_db
2727
client: *internal_client
2828
databaseName: &database_name retryable-writes-tests
2929

0 commit comments

Comments
 (0)