Skip to content

Commit c965815

Browse files
author
annie-mac
committed
add throughput control group config in diagnostics
1 parent c713a38 commit c965815

File tree

14 files changed

+410
-103
lines changed

14 files changed

+410
-103
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlTests.java

Lines changed: 255 additions & 89 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/throughputControl/sdk/ThroughputRequestThrottlerTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void processRequest() {
5353
this.assertRequestThrottlerState(requestThrottler, availableThroughput, scheduledThroughput);
5454

5555
// Request2: will get throttled since there is no available throughput
56-
requestMock.requestContext.throughputControlCycleId = StringUtils.EMPTY;
56+
requestMock.requestContext.throughputControlRequestContext.setThroughputControlCycleId(StringUtils.EMPTY);
5757
TestPublisher requestPublisher2 = TestPublisher.create();
5858
StepVerifier.create(requestThrottler.processRequest(requestMock, requestPublisher2.mono()))
5959
.verifyError(RequestRateTooLargeException.class);
@@ -66,7 +66,7 @@ public void processRequest() {
6666
assertThat(requestThrottler.getAvailableThroughput()).isEqualTo(availableThroughput);
6767

6868
// Request 3: will get throttled since there is no available throughput
69-
requestMock.requestContext.throughputControlCycleId = StringUtils.EMPTY;
69+
requestMock.requestContext.throughputControlRequestContext.setThroughputControlCycleId(StringUtils.EMPTY);
7070
TestPublisher requestPublisher3 = TestPublisher.create();
7171
StepVerifier.create(requestThrottler.processRequest(requestMock, requestPublisher3.mono()))
7272
.verifyErrorSatisfies((t) -> {
@@ -87,7 +87,7 @@ public void processRequest() {
8787
Mockito.doReturn(mockHeaders).when(bulkRequestMock).getHeaders();
8888
bulkRequestMock.requestContext = new DocumentServiceRequestContext();
8989

90-
bulkRequestMock.requestContext.throughputControlCycleId = StringUtils.EMPTY;
90+
bulkRequestMock.requestContext.throughputControlRequestContext.setThroughputControlCycleId(StringUtils.EMPTY);
9191
TestPublisher requestPublisher4 = TestPublisher.create();
9292
StepVerifier.create(requestThrottler.processRequest(bulkRequestMock, requestPublisher4.mono()))
9393
.verifyErrorSatisfies((t) -> {
@@ -105,7 +105,7 @@ public void processRequest() {
105105
assertThat(requestThrottler.getAvailableThroughput()).isEqualTo(availableThroughput);
106106

107107
// Request 5: will pass the request, and record the charge from exception
108-
requestMock.requestContext.throughputControlCycleId = StringUtils.EMPTY;
108+
requestMock.requestContext.throughputControlRequestContext.setThroughputControlCycleId(StringUtils.EMPTY);
109109
NotFoundException notFoundException = Mockito.mock(NotFoundException.class);
110110
Mockito.doReturn(requestChargePerRequest).when(notFoundException).getRequestCharge();
111111
TestPublisher requestPublisher5 = TestPublisher.create();
@@ -137,7 +137,7 @@ public void responseOutOfCycle() {
137137
TestPublisher<StoreResponse> requestPublisher1 = TestPublisher.create();
138138
StepVerifier.create(requestThrottler.processRequest(requestMock, requestPublisher1.mono()))
139139
.then(() -> {
140-
requestMock.requestContext.throughputControlCycleId = UUID.randomUUID().toString();
140+
requestMock.requestContext.throughputControlRequestContext.setThroughputControlCycleId(UUID.randomUUID().toString());
141141
requestPublisher1.emit(responseMock);
142142
})
143143
.expectNext(responseMock)

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -949,7 +949,6 @@ public CosmosScripts getScripts() {
949949
}
950950

951951
// TODO: should make partitionkey public in CosmosAsyncItem and fix the below call
952-
953952
private <T> CosmosPagedIterable<T> getCosmosPagedIterable(CosmosPagedFlux<T> cosmosPagedFlux) {
954953
return new CosmosPagedIterable<>(cosmosPagedFlux);
955954
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ public void recordGatewayResponse(
270270
gatewayStatistics.faultInjectionRuleId = storeResponseDiagnostics.getFaultInjectionRuleId();
271271
gatewayStatistics.faultInjectionEvaluationResults = storeResponseDiagnostics.getFaultInjectionEvaluationResults();
272272
gatewayStatistics.endpoint = storeResponseDiagnostics.getEndpoint();
273+
gatewayStatistics.requestThroughputControlGroupName = storeResponseDiagnostics.getRequestThroughputControlGroupName();
274+
gatewayStatistics.requestThroughputControlGroupConfig = storeResponseDiagnostics.getRequestThroughputControlGroupConfig();
273275

274276
this.activityId = storeResponseDiagnostics.getActivityId() != null ? storeResponseDiagnostics.getActivityId() :
275277
rxDocumentServiceRequest.getActivityId().toString();
@@ -910,6 +912,8 @@ public static class GatewayStatistics {
910912
private PerPartitionCircuitBreakerInfoHolder perPartitionCircuitBreakerInfoHolder;
911913
private PerPartitionFailoverInfoHolder perPartitionFailoverInfoHolder;
912914
private String endpoint;
915+
private String requestThroughputControlGroupName;
916+
private String requestThroughputControlGroupConfig;
913917

914918
public String getSessionToken() {
915919
return sessionToken;
@@ -979,6 +983,14 @@ public String getEndpoint() {
979983
return this.endpoint;
980984
}
981985

986+
public String getRequestThroughputControlGroupName() {
987+
return this.requestThroughputControlGroupName;
988+
}
989+
990+
public String getRequestThroughputControlGroupConfig() {
991+
return this.requestThroughputControlGroupConfig;
992+
}
993+
982994
public static class GatewayStatisticsSerializer extends StdSerializer<GatewayStatistics> {
983995
private static final long serialVersionUID = 1L;
984996

@@ -1016,6 +1028,8 @@ public void serialize(GatewayStatistics gatewayStatistics,
10161028
this.writeNonNullObjectField(jsonGenerator, "perPartitionCircuitBreakerInfoHolder", gatewayStatistics.getPerPartitionCircuitBreakerInfoHolder());
10171029
this.writeNonNullObjectField(jsonGenerator, "perPartitionFailoverInfoHolder", gatewayStatistics.getPerPartitionFailoverInfoHolder());
10181030

1031+
this.writeNonNullStringField(jsonGenerator, "requestTCG", gatewayStatistics.getRequestThroughputControlGroupName());
1032+
this.writeNonNullStringField(jsonGenerator, "requestTCGConfig", gatewayStatistics.getRequestThroughputControlGroupConfig());
10191033
jsonGenerator.writeEndObject();
10201034
}
10211035

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.azure.cosmos.implementation.directconnectivity.Uri;
1919
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
2020
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
21+
import com.azure.cosmos.implementation.throughputControl.ThroughputControlRequestContext;
2122

2223
import java.util.ArrayList;
2324
import java.util.List;
@@ -54,7 +55,7 @@ public class DocumentServiceRequestContext implements Cloneable {
5455
public volatile PartitionKeyInternal effectivePartitionKey;
5556
public volatile CosmosDiagnostics cosmosDiagnostics;
5657
public volatile String resourcePhysicalAddress;
57-
public volatile String throughputControlCycleId;
58+
public ThroughputControlRequestContext throughputControlRequestContext;
5859
public volatile boolean replicaAddressValidationEnabled = Configs.isReplicaAddressValidationEnabled();
5960
private final Set<Uri> failedEndpoints = ConcurrentHashMap.newKeySet();
6061
private CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig;
@@ -132,6 +133,10 @@ public void addToFailedEndpoints(Exception exception, Uri address) {
132133
}
133134
}
134135

136+
public void setThroughputControlRequestContext(ThroughputControlRequestContext throughputControlRequestContext) {
137+
this.throughputControlRequestContext = throughputControlRequestContext;
138+
}
139+
135140
@Override
136141
public DocumentServiceRequestContext clone() {
137142
DocumentServiceRequestContext context = new DocumentServiceRequestContext();
@@ -158,7 +163,7 @@ public DocumentServiceRequestContext clone() {
158163
context.performedBackgroundAddressRefresh = this.performedBackgroundAddressRefresh;
159164
context.cosmosDiagnostics = this.cosmosDiagnostics;
160165
context.resourcePhysicalAddress = this.resourcePhysicalAddress;
161-
context.throughputControlCycleId = this.throughputControlCycleId;
166+
context.throughputControlRequestContext = this.throughputControlRequestContext;
162167
context.replicaAddressValidationEnabled = this.replicaAddressValidationEnabled;
163168
context.endToEndOperationLatencyPolicyConfig = this.endToEndOperationLatencyPolicyConfig;
164169
context.unavailableRegionsForPartition = this.unavailableRegionsForPartition;

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseDiagnostics.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public class StoreResponseDiagnostics {
5252
private final String faultInjectionRuleId;
5353
private final List<String> faultInjectionEvaluationResults;
5454
private final String endpoint;
55+
private final String requestThroughputControlGroupName;
56+
private final String requestThroughputControlGroupConfig;
5557

5658
public static StoreResponseDiagnostics createStoreResponseDiagnostics(
5759
StoreResponse storeResponse,
@@ -91,6 +93,9 @@ private StoreResponseDiagnostics(StoreResponse storeResponse, RxDocumentServiceR
9193
this.faultInjectionRuleId = storeResponse.getFaultInjectionRuleId();
9294
this.faultInjectionEvaluationResults = storeResponse.getFaultInjectionRuleEvaluationResults();
9395
this.endpoint = storeResponse.getEndpoint();
96+
this.requestThroughputControlGroupName = rxDocumentServiceRequest.throughputControlGroupName;
97+
this.requestThroughputControlGroupConfig =
98+
rxDocumentServiceRequest.requestContext.throughputControlRequestContext != null ? rxDocumentServiceRequest.requestContext.throughputControlRequestContext.getConfigString() : null;
9499
}
95100

96101
private StoreResponseDiagnostics(CosmosException e, RxDocumentServiceRequest rxDocumentServiceRequest) {
@@ -122,6 +127,9 @@ private StoreResponseDiagnostics(CosmosException e, RxDocumentServiceRequest rxD
122127
} else {
123128
this.endpoint = "";
124129
}
130+
this.requestThroughputControlGroupName = rxDocumentServiceRequest.throughputControlGroupName;
131+
this.requestThroughputControlGroupConfig =
132+
rxDocumentServiceRequest.requestContext.throughputControlRequestContext != null ? rxDocumentServiceRequest.requestContext.throughputControlRequestContext.getConfigString() : null;
125133
}
126134

127135
public int getStatusCode() {
@@ -205,4 +213,12 @@ public List<String> getFaultInjectionEvaluationResults() {
205213
public String getEndpoint() {
206214
return this.endpoint;
207215
}
216+
217+
public String getRequestThroughputControlGroupName() {
218+
return this.requestThroughputControlGroupName;
219+
}
220+
221+
public String getRequestThroughputControlGroupConfig() {
222+
return this.requestThroughputControlGroupConfig;
223+
}
208224
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResultDiagnostics.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,14 @@ public void serialize(StoreResultDiagnostics storeResultDiagnostics,
240240
jsonGenerator.writeObjectField("channelStatistics", storeResponseDiagnostics.getRntbdChannelStatistics());
241241
jsonGenerator.writeObjectField("serviceEndpointStatistics", storeResponseDiagnostics.getRntbdEndpointStatistics());
242242

243+
this.writeNonNullStringField(
244+
jsonGenerator,
245+
"requestTCG",
246+
storeResponseDiagnostics.getRequestThroughputControlGroupName());
247+
this.writeNonNullStringField(
248+
jsonGenerator,
249+
"requestTCGConfig",
250+
storeResponseDiagnostics.getRequestThroughputControlGroupConfig());
243251
jsonGenerator.writeEndObject();
244252
}
245253

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.throughputControl;
5+
6+
public interface IThroughputControlGroup {
7+
String getDiagnosticsString();
8+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.throughputControl;
5+
6+
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
7+
8+
import java.util.concurrent.atomic.AtomicReference;
9+
10+
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
11+
12+
public class ThroughputControlRequestContext {
13+
private final String configString;
14+
private final AtomicReference<String> throughputControlCycleId;
15+
16+
public ThroughputControlRequestContext(String configString) {
17+
checkArgument(StringUtils.isNotEmpty(configString), "Argument 'configString' cannot be null or empty.");
18+
this.configString = configString;
19+
this.throughputControlCycleId = new AtomicReference<>();
20+
}
21+
22+
public String getConfigString() {
23+
return this.configString;
24+
}
25+
26+
public String getThroughputControlCycleId() {
27+
return this.throughputControlCycleId.get();
28+
}
29+
30+
public void setThroughputControlCycleId(String throughputControlCycleId) {
31+
this.throughputControlCycleId.set(throughputControlCycleId);
32+
}
33+
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/sdk/ThroughputRequestThrottler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ public <T> Mono<T> processRequest(RxDocumentServiceRequest request, Mono<T> orig
9292
}));
9393

9494
if (this.availableThroughput.get() > 0) {
95-
if (StringUtils.isEmpty(request.requestContext.throughputControlCycleId)) {
96-
request.requestContext.throughputControlCycleId = this.cycleId;
95+
if (StringUtils.isEmpty(request.requestContext.throughputControlRequestContext.getThroughputControlCycleId())) {
96+
request.requestContext.throughputControlRequestContext.setThroughputControlCycleId(this.cycleId);
9797
}
9898

9999
trackingUnit.increasePassedRequest();
@@ -178,7 +178,7 @@ private <T> void trackRequestCharge (RxDocumentServiceRequest request, T respons
178178
}
179179

180180
// If the response comes back in a different cycle, discard it.
181-
if (StringUtils.equals(this.cycleId, request.requestContext.throughputControlCycleId)) {
181+
if (StringUtils.equals(this.cycleId, request.requestContext.throughputControlRequestContext.getThroughputControlCycleId())) {
182182
this.availableThroughput.getAndAccumulate(requestCharge, (available, consumed) -> available - consumed);
183183
} else {
184184
if (trackingUnit != null) {

0 commit comments

Comments
 (0)