Skip to content

Commit 32c8b1c

Browse files
committed
Add query.max-select-rows property
1 parent e6c903a commit 32c8b1c

File tree

7 files changed

+108
-2
lines changed

7 files changed

+108
-2
lines changed

core/trino-main/src/main/java/io/trino/SystemSessionProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.Optional;
3838
import java.util.OptionalInt;
39+
import java.util.OptionalLong;
3940

4041
import static com.google.common.base.Preconditions.checkArgument;
4142
import static io.airlift.units.DataSize.Unit.MEGABYTE;
@@ -220,6 +221,7 @@ public final class SystemSessionProperties
220221
public static final String CLOSE_IDLE_WRITERS_TRIGGER_DURATION = "close_idle_writers_trigger_duration";
221222
public static final String COLUMNAR_FILTER_EVALUATION_ENABLED = "columnar_filter_evaluation_enabled";
222223
public static final String SPOOLING_ENABLED = "spooling_enabled";
224+
public static final String QUERY_MAX_SELECT_ROWS = "query_max_select_rows";
223225

224226
private final List<PropertyMetadata<?>> sessionProperties;
225227

@@ -419,6 +421,12 @@ public SystemSessionProperties(
419421
"Temporary: Maximum number of stages a query can have",
420422
queryManagerConfig.getMaxStageCount(),
421423
true),
424+
longProperty(
425+
QUERY_MAX_SELECT_ROWS,
426+
"Maximum rows returned by a top-level SELECT query",
427+
queryManagerConfig.getQueryMaxSelectRows().isPresent() ? queryManagerConfig.getQueryMaxSelectRows().getAsLong() : null,
428+
value -> validateNonNegativeLongValue(value, QUERY_MAX_SELECT_ROWS),
429+
false),
422430
booleanProperty(
423431
DICTIONARY_AGGREGATION,
424432
"Enable optimization for aggregations on dictionaries",
@@ -1298,6 +1306,12 @@ public static int getQueryMaxStageCount(Session session)
12981306
return session.getSystemProperty(QUERY_MAX_STAGE_COUNT, Integer.class);
12991307
}
13001308

1309+
public static OptionalLong getQueryMaxSelectRows(Session session)
1310+
{
1311+
Long value = session.getSystemProperty(QUERY_MAX_SELECT_ROWS, Long.class);
1312+
return value == null ? OptionalLong.empty() : OptionalLong.of(value);
1313+
}
1314+
13011315
public static boolean isUseTableScanNodePartitioning(Session session)
13021316
{
13031317
return session.getSystemProperty(USE_TABLE_SCAN_NODE_PARTITIONING, Boolean.class);

core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import jakarta.validation.constraints.NotNull;
2929

3030
import java.util.Optional;
31+
import java.util.OptionalLong;
3132
import java.util.concurrent.TimeUnit;
3233

3334
import static com.google.common.base.Preconditions.checkArgument;
@@ -99,6 +100,8 @@ public class QueryManagerConfig
99100
private Optional<DataSize> queryMaxWritePhysicalSize = Optional.empty();
100101
private int queryReportedRuleStatsLimit = 10;
101102
private int dispatcherQueryPoolSize = DISPATCHER_THREADPOOL_MAX_SIZE;
103+
@Min(0)
104+
private Long queryMaxSelectRows;
102105

103106
private int requiredWorkers = 1;
104107
private Duration requiredWorkersMaxWait = new Duration(5, TimeUnit.MINUTES);
@@ -1228,4 +1231,17 @@ public QueryManagerConfig setFaultTolerantExecutionAdaptiveJoinReorderingMinSize
12281231
this.faultTolerantExecutionAdaptiveJoinReorderingMinSizeThreshold = faultTolerantExecutionAdaptiveJoinReorderingMinSizeThreshold;
12291232
return this;
12301233
}
1234+
1235+
public OptionalLong getQueryMaxSelectRows()
1236+
{
1237+
return queryMaxSelectRows == null ? OptionalLong.empty() : OptionalLong.of(queryMaxSelectRows);
1238+
}
1239+
1240+
@Config("query.max-select-rows")
1241+
@ConfigDescription("Maximum rows returned by a top-level SELECT query")
1242+
public QueryManagerConfig setQueryMaxSelectRows(Long queryMaxSelectRows)
1243+
{
1244+
this.queryMaxSelectRows = queryMaxSelectRows;
1245+
return this;
1246+
}
12311247
}

core/trino-main/src/main/java/io/trino/sql/analyzer/Analyzer.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.trino.sql.tree.NodeRef;
3232
import io.trino.sql.tree.Parameter;
3333
import io.trino.sql.tree.Statement;
34+
import io.trino.util.StatementUtils;
3435

3536
import java.util.List;
3637
import java.util.Map;
@@ -39,6 +40,8 @@
3940
import static io.trino.sql.analyzer.ExpressionTreeUtils.extractAggregateFunctions;
4041
import static io.trino.sql.analyzer.ExpressionTreeUtils.extractExpressions;
4142
import static io.trino.sql.analyzer.ExpressionTreeUtils.extractWindowExpressions;
43+
import static io.trino.sql.analyzer.QueryType.DESCRIBE;
44+
import static io.trino.sql.analyzer.QueryType.EXPLAIN;
4245
import static io.trino.sql.analyzer.QueryType.OTHERS;
4346
import static io.trino.sql.analyzer.SemanticExceptions.semanticException;
4447
import static io.trino.tracing.ScopedSpan.scopedSpan;
@@ -84,7 +87,16 @@ public Analysis analyze(Statement statement)
8487
.setParent(Context.current().with(session.getQuerySpan()))
8588
.startSpan();
8689
try (var _ = scopedSpan(span)) {
87-
return analyze(statement, OTHERS);
90+
// derive high-level type of the *original* statement before it gets rewritten
91+
QueryType queryType = StatementUtils.getQueryType(statement)
92+
.map(original -> switch (original) {
93+
case DESCRIBE -> DESCRIBE;
94+
case EXPLAIN -> EXPLAIN;
95+
default -> OTHERS;
96+
})
97+
.orElse(OTHERS);
98+
99+
return analyze(statement, queryType);
88100
}
89101
}
90102

core/trino-main/src/main/java/io/trino/sql/planner/QueryPlanner.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.common.collect.Iterables;
2121
import com.google.common.collect.Sets;
2222
import io.trino.Session;
23+
import io.trino.SystemSessionProperties;
2324
import io.trino.metadata.Metadata;
2425
import io.trino.metadata.ResolvedFunction;
2526
import io.trino.metadata.TableHandle;
@@ -29,6 +30,7 @@
2930
import io.trino.spi.connector.ColumnSchema;
3031
import io.trino.spi.connector.RowChangeParadigm;
3132
import io.trino.spi.connector.SortOrder;
33+
import io.trino.spi.resourcegroups.QueryType;
3234
import io.trino.spi.type.DecimalType;
3335
import io.trino.spi.type.Int128;
3436
import io.trino.spi.type.RowType;
@@ -110,6 +112,7 @@
110112
import io.trino.sql.tree.WindowFrame;
111113
import io.trino.sql.tree.WindowOperation;
112114
import io.trino.type.Reals;
115+
import io.trino.util.StatementUtils;
113116

114117
import java.util.ArrayList;
115118
import java.util.Arrays;
@@ -121,6 +124,7 @@
121124
import java.util.List;
122125
import java.util.Map;
123126
import java.util.Optional;
127+
import java.util.OptionalLong;
124128
import java.util.Set;
125129
import java.util.function.Function;
126130
import java.util.stream.Collectors;
@@ -239,7 +243,12 @@ public RelationPlan plan(Query query)
239243
Optional<OrderingScheme> orderingScheme = orderingScheme(builder, query.getOrderBy(), analysis.getOrderByExpressions(query));
240244
builder = sort(builder, orderingScheme);
241245
builder = offset(builder, query.getOffset());
242-
builder = limit(builder, query.getLimit(), orderingScheme);
246+
if (analysis.getStatement() == query) {
247+
builder = limitWithTopLevelSelectCheck(builder, query, orderingScheme);
248+
}
249+
else {
250+
builder = limit(builder, query.getLimit(), orderingScheme);
251+
}
243252
builder = builder.appendProjections(outputs, symbolAllocator, idAllocator);
244253

245254
return new RelationPlan(
@@ -2247,6 +2256,41 @@ private PlanBuilder offset(PlanBuilder subPlan, Optional<Offset> offset)
22472256
analysis.getOffset(offset.get())));
22482257
}
22492258

2259+
private PlanBuilder limitWithTopLevelSelectCheck(PlanBuilder subPlan, Query query, Optional<OrderingScheme> orderingScheme)
2260+
{
2261+
Optional<Node> limit = query.getLimit();
2262+
if (limit.isPresent() && analysis.getLimit(limit.get()).isPresent()) {
2263+
Optional<OrderingScheme> tiesResolvingScheme = Optional.empty();
2264+
if (limit.get() instanceof FetchFirst && ((FetchFirst) limit.get()).isWithTies()) {
2265+
tiesResolvingScheme = orderingScheme;
2266+
}
2267+
return subPlan.withNewRoot(
2268+
new LimitNode(
2269+
idAllocator.getNextId(),
2270+
subPlan.getRoot(),
2271+
analysis.getLimit(limit.get()).getAsLong(),
2272+
tiesResolvingScheme,
2273+
false,
2274+
ImmutableList.of()));
2275+
}
2276+
2277+
OptionalLong sessionLimit = SystemSessionProperties.getQueryMaxSelectRows(session);
2278+
if (sessionLimit.isPresent() && !analysis.isDescribe() &&
2279+
StatementUtils.getQueryType(analysis.getStatement())
2280+
.map(type -> type == QueryType.SELECT)
2281+
.orElse(false)) {
2282+
return subPlan.withNewRoot(
2283+
new LimitNode(
2284+
idAllocator.getNextId(),
2285+
subPlan.getRoot(),
2286+
sessionLimit.getAsLong(),
2287+
Optional.empty(),
2288+
false,
2289+
ImmutableList.of()));
2290+
}
2291+
return subPlan;
2292+
}
2293+
22502294
private PlanBuilder limit(PlanBuilder subPlan, Optional<Node> limit, Optional<OrderingScheme> orderingScheme)
22512295
{
22522296
if (limit.isPresent() && analysis.getLimit(limit.get()).isPresent()) {

core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public void testDefaults()
7373
.setDispatcherQueryPoolSize(Integer.toString(max(50, Runtime.getRuntime().availableProcessors() * 10)))
7474
.setQueryMaxScanPhysicalBytes(null)
7575
.setQueryMaxWritePhysicalSize(null)
76+
.setQueryMaxSelectRows(null)
7677
.setRequiredWorkers(1)
7778
.setRequiredWorkersMaxWait(new Duration(5, MINUTES))
7879
.setRetryPolicy(RetryPolicy.NONE)
@@ -157,6 +158,7 @@ public void testExplicitPropertyMappings()
157158
.put("query.dispatcher-query-pool-size", "151")
158159
.put("query.max-scan-physical-bytes", "1kB")
159160
.put("query.max-write-physical-size", "1TB")
161+
.put("query.max-select-rows", "10")
160162
.put("query-manager.required-workers", "333")
161163
.put("query-manager.required-workers-max-wait", "33m")
162164
.put("retry-policy", "QUERY")
@@ -238,6 +240,7 @@ public void testExplicitPropertyMappings()
238240
.setDispatcherQueryPoolSize("151")
239241
.setQueryMaxScanPhysicalBytes(DataSize.of(1, KILOBYTE))
240242
.setQueryMaxWritePhysicalSize(DataSize.of(1, TERABYTE))
243+
.setQueryMaxSelectRows(10L)
241244
.setRequiredWorkers(333)
242245
.setRequiredWorkersMaxWait(new Duration(33, MINUTES))
243246
.setRetryPolicy(RetryPolicy.QUERY)

docs/src/main/sphinx/admin/properties-query-management.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,3 +283,12 @@ The {ref}`retry policy <fte-retry-policy>` to use for
283283
- `TASK` - Retry individual tasks within a query in the event of failure.
284284
Requires configuration of an {ref}`exchange manager <fte-exchange-manager>`.
285285
- `QUERY` - Retry the whole query in the event of failure.
286+
287+
## `query.max-select-rows`
288+
289+
- **Type:** {ref}`prop-type-long`
290+
- **Session property:** `query_max_select_rows`
291+
292+
The maximum number of rows that can be returned for a top-level `SELECT` query.
293+
If this property is set and a query also includes an explicit `LIMIT` clause, the more restrictive
294+
(the minimum) of the two limits is enforced.

docs/src/main/sphinx/admin/properties.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,11 @@ Some `integer` type properties enforce their own minimum and maximum values.
119119
The properties of type `string` support a set of values that consist of a
120120
sequence of characters. Allowed values are defined on a property-by-property
121121
basis, refer to the specific property for its supported and default values.
122+
123+
(prop-type-long)=
124+
### `long`
125+
126+
The properties of type `long` support whole numeric values, similar to the `integer` type,
127+
but can accommodate a significantly larger range of numbers. For example, in addition
128+
to values like `100`, it supports very large numbers such as `4000000000`. Negative values are also supported.
129+
Like `integer`, `long` values must be whole numbers; decimal values such as `2.5` are not supported.

0 commit comments

Comments
 (0)