Skip to content

Commit f2382e6

Browse files
authored
CSHARP-5608: CSOT: Command Execution (#1716)
1 parent 99ff1ed commit f2382e6

File tree

54 files changed

+1305
-1139
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1305
-1139
lines changed

src/MongoDB.Driver/Authentication/MongoDBX509Authenticator.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public void Authenticate(IConnection connection, ConnectionDescription descripti
5858
try
5959
{
6060
var protocol = CreateAuthenticateProtocol();
61-
protocol.Execute(connection, cancellationToken);
61+
// TODO: CSOT: implement operationContext support for Auth.
62+
protocol.Execute(new OperationContext(Timeout.InfiniteTimeSpan, cancellationToken), connection);
6263
}
6364
catch (MongoCommandException ex)
6465
{
@@ -79,7 +80,8 @@ public async Task AuthenticateAsync(IConnection connection, ConnectionDescriptio
7980
try
8081
{
8182
var protocol = CreateAuthenticateProtocol();
82-
await protocol.ExecuteAsync(connection, cancellationToken).ConfigureAwait(false);
83+
// TODO: CSOT: implement operationContext support for Auth.
84+
await protocol.ExecuteAsync(new OperationContext(Timeout.InfiniteTimeSpan, cancellationToken), connection).ConfigureAwait(false);
8385
}
8486
catch (MongoCommandException ex)
8587
{

src/MongoDB.Driver/Authentication/SaslAuthenticator.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ public void Authenticate(IConnection connection, ConnectionDescription descripti
109109
try
110110
{
111111
var protocol = CreateCommandProtocol(command);
112-
result = protocol.Execute(connection, cancellationToken);
112+
// TODO: CSOT: implement operationContext support for Auth.
113+
result = protocol.Execute(new OperationContext(Timeout.InfiniteTimeSpan, cancellationToken), connection);
113114
conversationId ??= result?.GetValue("conversationId").AsInt32;
114115
}
115116
catch (MongoException ex)
@@ -172,7 +173,8 @@ public async Task AuthenticateAsync(IConnection connection, ConnectionDescriptio
172173
try
173174
{
174175
var protocol = CreateCommandProtocol(command);
175-
result = await protocol.ExecuteAsync(connection, cancellationToken).ConfigureAwait(false);
176+
// TODO: CSOT: implement operationContext support for Auth.
177+
result = await protocol.ExecuteAsync(new OperationContext(Timeout.InfiniteTimeSpan, cancellationToken), connection).ConfigureAwait(false);
176178
conversationId ??= result?.GetValue("conversationId").AsInt32;
177179
}
178180
catch (MongoException ex)

src/MongoDB.Driver/Core/Bindings/IChannel.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
using System;
1717
using System.Collections.Generic;
18-
using System.Threading;
1918
using System.Threading.Tasks;
2019
using MongoDB.Bson;
2120
using MongoDB.Bson.IO;
@@ -33,6 +32,7 @@ internal interface IChannel : IDisposable
3332
ConnectionDescription ConnectionDescription { get; }
3433

3534
TResult Command<TResult>(
35+
OperationContext operationContext,
3636
ICoreSession session,
3737
ReadPreference readPreference,
3838
DatabaseNamespace databaseNamespace,
@@ -43,10 +43,10 @@ TResult Command<TResult>(
4343
Action<IMessageEncoderPostProcessor> postWriteAction,
4444
CommandResponseHandling responseHandling,
4545
IBsonSerializer<TResult> resultSerializer,
46-
MessageEncoderSettings messageEncoderSettings,
47-
CancellationToken cancellationToken);
46+
MessageEncoderSettings messageEncoderSettings);
4847

4948
Task<TResult> CommandAsync<TResult>(
49+
OperationContext operationContext,
5050
ICoreSession session,
5151
ReadPreference readPreference,
5252
DatabaseNamespace databaseNamespace,
@@ -57,8 +57,7 @@ Task<TResult> CommandAsync<TResult>(
5757
Action<IMessageEncoderPostProcessor> postWriteAction,
5858
CommandResponseHandling responseHandling,
5959
IBsonSerializer<TResult> resultSerializer,
60-
MessageEncoderSettings messageEncoderSettings,
61-
CancellationToken cancellationToken);
60+
MessageEncoderSettings messageEncoderSettings);
6261
}
6362

6463
internal interface IChannelHandle : IChannel

src/MongoDB.Driver/Core/Bindings/ServerChannelSource.cs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,11 @@ public ServerChannelSource(IServer server, ICoreSessionHandle session)
3535
}
3636

3737
// properties
38-
public IServer Server
39-
{
40-
get { return _server; }
41-
}
38+
public IServer Server => _server;
4239

43-
public ServerDescription ServerDescription
44-
{
45-
get { return _server.Description; }
46-
}
40+
public ServerDescription ServerDescription => _server.Description;
4741

48-
public ICoreSessionHandle Session
49-
{
50-
get { return _session; }
51-
}
42+
public ICoreSessionHandle Session => _session;
5243

5344
// methods
5445
public void Dispose()

src/MongoDB.Driver/Core/Clusters/Cluster.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public IServer SelectServer(OperationContext operationContext, IServerSelector s
159159
Ensure.IsNotNull(operationContext, nameof(operationContext));
160160
ThrowIfDisposedOrNotOpen();
161161

162-
operationContext = operationContext.WithTimeout(Settings.ServerSelectionTimeout);
162+
using var serverSelectionOperationContext = operationContext.WithTimeout(Settings.ServerSelectionTimeout);
163163
var expirableClusterDescription = _expirableClusterDescription;
164164
IDisposable serverSelectionWaitQueueDisposer = null;
165165
(selector, var operationCountSelector, var stopwatch) = BeginServerSelection(expirableClusterDescription.ClusterDescription, selector);
@@ -168,16 +168,16 @@ public IServer SelectServer(OperationContext operationContext, IServerSelector s
168168
{
169169
while (true)
170170
{
171-
var result = SelectServer(expirableClusterDescription, selector, operationCountSelector);
172-
if (result != default)
171+
var server = SelectServer(expirableClusterDescription, selector, operationCountSelector);
172+
if (server != null)
173173
{
174-
EndServerSelection(expirableClusterDescription.ClusterDescription, selector, result.ServerDescription, stopwatch);
175-
return result.Server;
174+
EndServerSelection(expirableClusterDescription.ClusterDescription, selector, server.Description, stopwatch);
175+
return server;
176176
}
177177

178-
serverSelectionWaitQueueDisposer ??= _serverSelectionWaitQueue.Enter(operationContext, selector, expirableClusterDescription.ClusterDescription, EventContext.OperationId);
178+
serverSelectionWaitQueueDisposer ??= _serverSelectionWaitQueue.Enter(serverSelectionOperationContext, selector, expirableClusterDescription.ClusterDescription, EventContext.OperationId);
179179

180-
operationContext.WaitTask(expirableClusterDescription.Expired);
180+
serverSelectionOperationContext.WaitTask(expirableClusterDescription.Expired);
181181
expirableClusterDescription = _expirableClusterDescription;
182182
}
183183
}
@@ -197,7 +197,7 @@ public async Task<IServer> SelectServerAsync(OperationContext operationContext,
197197
Ensure.IsNotNull(operationContext, nameof(operationContext));
198198
ThrowIfDisposedOrNotOpen();
199199

200-
operationContext = operationContext.WithTimeout(Settings.ServerSelectionTimeout);
200+
using var serverSelectionOperationContext = operationContext.WithTimeout(Settings.ServerSelectionTimeout);
201201
var expirableClusterDescription = _expirableClusterDescription;
202202
IDisposable serverSelectionWaitQueueDisposer = null;
203203
(selector, var operationCountSelector, var stopwatch) = BeginServerSelection(expirableClusterDescription.ClusterDescription, selector);
@@ -206,16 +206,16 @@ public async Task<IServer> SelectServerAsync(OperationContext operationContext,
206206
{
207207
while (true)
208208
{
209-
var result = SelectServer(expirableClusterDescription, selector, operationCountSelector);
210-
if (result != default)
209+
var server = SelectServer(expirableClusterDescription, selector, operationCountSelector);
210+
if (server != null)
211211
{
212-
EndServerSelection(expirableClusterDescription.ClusterDescription, selector, result.ServerDescription, stopwatch);
213-
return result.Server;
212+
EndServerSelection(expirableClusterDescription.ClusterDescription, selector, server.Description, stopwatch);
213+
return server;
214214
}
215215

216-
serverSelectionWaitQueueDisposer ??= _serverSelectionWaitQueue.Enter(operationContext, selector, expirableClusterDescription.ClusterDescription, EventContext.OperationId);
216+
serverSelectionWaitQueueDisposer ??= _serverSelectionWaitQueue.Enter(serverSelectionOperationContext, selector, expirableClusterDescription.ClusterDescription, EventContext.OperationId);
217217

218-
await operationContext.WaitTaskAsync(expirableClusterDescription.Expired).ConfigureAwait(false);
218+
await serverSelectionOperationContext.WaitTaskAsync(expirableClusterDescription.Expired).ConfigureAwait(false);
219219
expirableClusterDescription = _expirableClusterDescription;
220220
}
221221
}
@@ -306,7 +306,7 @@ private Exception HandleServerSelectionException(ClusterDescription clusterDescr
306306
return exception;
307307
}
308308

309-
private (IClusterableServer Server, ServerDescription ServerDescription) SelectServer(ExpirableClusterDescription clusterDescriptionChangeSource, IServerSelector selector, OperationsCountServerSelector operationCountSelector)
309+
private SelectedServer SelectServer(ExpirableClusterDescription clusterDescriptionChangeSource, IServerSelector selector, OperationsCountServerSelector operationCountSelector)
310310
{
311311
MongoIncompatibleDriverException.ThrowIfNotSupported(clusterDescriptionChangeSource.ClusterDescription);
312312

@@ -320,7 +320,7 @@ private Exception HandleServerSelectionException(ClusterDescription clusterDescr
320320
var selectedServer = clusterDescriptionChangeSource.ConnectedServers.FirstOrDefault(s => EndPointHelper.Equals(s.EndPoint, selectedServerDescription.EndPoint));
321321
if (selectedServer != null)
322322
{
323-
return (selectedServer, selectedServerDescription);
323+
return new(selectedServer, selectedServerDescription);
324324
}
325325
}
326326

src/MongoDB.Driver/Core/Clusters/LoadBalancedCluster.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public IServer SelectServer(OperationContext operationContext, IServerSelector s
176176
Ensure.IsNotNull(operationContext, nameof(operationContext));
177177
ThrowIfDisposed();
178178

179-
var serverSelectionOperationContext = operationContext.WithTimeout(_settings.ServerSelectionTimeout);
179+
using var serverSelectionOperationContext = operationContext.WithTimeout(_settings.ServerSelectionTimeout);
180180

181181
_serverSelectionEventLogger.LogAndPublish(new ClusterSelectingServerEvent(
182182
_description,
@@ -205,10 +205,11 @@ public IServer SelectServer(OperationContext operationContext, IServerSelector s
205205
stopwatch.Elapsed,
206206
null,
207207
EventContext.OperationName));
208+
209+
return new SelectedServer(_server, _server.Description);
208210
}
209211

210-
return _server ??
211-
throw new InvalidOperationException("The server must be created before usage."); // should not be reached
212+
throw new InvalidOperationException("The server must be created before usage."); // should not be reached
212213
}
213214

214215
public async Task<IServer> SelectServerAsync(OperationContext operationContext, IServerSelector selector)
@@ -217,7 +218,7 @@ public async Task<IServer> SelectServerAsync(OperationContext operationContext,
217218
Ensure.IsNotNull(operationContext, nameof(operationContext));
218219
ThrowIfDisposed();
219220

220-
var serverSelectionOperationContext = operationContext.WithTimeout(_settings.ServerSelectionTimeout);
221+
using var serverSelectionOperationContext = operationContext.WithTimeout(_settings.ServerSelectionTimeout);
221222

222223
_serverSelectionEventLogger.LogAndPublish(new ClusterSelectingServerEvent(
223224
_description,
@@ -245,10 +246,11 @@ public async Task<IServer> SelectServerAsync(OperationContext operationContext,
245246
stopwatch.Elapsed,
246247
null,
247248
EventContext.OperationName));
249+
250+
return new SelectedServer(_server, _server.Description);
248251
}
249252

250-
return _server ??
251-
throw new InvalidOperationException("The server must be created before usage."); // should not be reached
253+
throw new InvalidOperationException("The server must be created before usage."); // should not be reached
252254
}
253255

254256
public ICoreSessionHandle StartSession(CoreSessionOptions options = null)

src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
using Snappier;
1717
using System.IO;
18-
using System.Threading;
1918
using MongoDB.Driver.Core.Misc;
2019

2120
namespace MongoDB.Driver.Core.Compression
@@ -34,7 +33,7 @@ public void Compress(Stream input, Stream output)
3433
{
3534
var uncompressedSize = (int)(input.Length - input.Position);
3635
var uncompressedBytes = new byte[uncompressedSize]; // does not include uncompressed message headers
37-
input.ReadBytes(uncompressedBytes, offset: 0, count: uncompressedSize, Timeout.InfiniteTimeSpan, CancellationToken.None);
36+
input.ReadBytes(OperationContext.NoTimeout, uncompressedBytes, offset: 0, count: uncompressedSize);
3837
var maxCompressedSize = Snappy.GetMaxCompressedLength(uncompressedSize);
3938
var compressedBytes = new byte[maxCompressedSize];
4039
var compressedSize = Snappy.Compress(uncompressedBytes, compressedBytes);
@@ -50,7 +49,7 @@ public void Decompress(Stream input, Stream output)
5049
{
5150
var compressedSize = (int)(input.Length - input.Position);
5251
var compressedBytes = new byte[compressedSize];
53-
input.ReadBytes(compressedBytes, offset: 0, count: compressedSize, Timeout.InfiniteTimeSpan, CancellationToken.None);
52+
input.ReadBytes(OperationContext.NoTimeout, compressedBytes, offset: 0, count: compressedSize);
5453
var uncompressedSize = Snappy.GetUncompressedLength(compressedBytes);
5554
var decompressedBytes = new byte[uncompressedSize];
5655
var decompressedSize = Snappy.Decompress(compressedBytes, decompressedBytes);

0 commit comments

Comments
 (0)