Skip to content

Commit 9e77d23

Browse files
committed
Improve file delete performance for expire_snapshots
1 parent fd2b5a2 commit 9e77d23

File tree

11 files changed

+84
-32
lines changed

11 files changed

+84
-32
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package io.trino.plugin.iceberg;
16+
17+
import com.google.inject.BindingAnnotation;
18+
19+
import java.lang.annotation.Retention;
20+
import java.lang.annotation.Target;
21+
22+
import static java.lang.annotation.ElementType.FIELD;
23+
import static java.lang.annotation.ElementType.METHOD;
24+
import static java.lang.annotation.ElementType.PARAMETER;
25+
import static java.lang.annotation.RetentionPolicy.RUNTIME;
26+
27+
@Retention(RUNTIME)
28+
@Target({FIELD, PARAMETER, METHOD})
29+
@BindingAnnotation
30+
public @interface ForIcebergFileDelete
31+
{}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public class IcebergConfig
9191
private Set<String> queryPartitionFilterRequiredSchemas = ImmutableSet.of();
9292
private int splitManagerThreads = Math.min(Runtime.getRuntime().availableProcessors() * 2, 32);
9393
private int planningThreads = Math.min(Runtime.getRuntime().availableProcessors(), 16);
94+
private int fileDeleteThreads = Math.min(Runtime.getRuntime().availableProcessors(), 8);
9495
private List<String> allowedExtraProperties = ImmutableList.of();
9596
private boolean incrementalRefreshEnabled = true;
9697
private boolean metadataCacheEnabled = true;
@@ -510,6 +511,20 @@ public IcebergConfig setPlanningThreads(String planningThreads)
510511
return this;
511512
}
512513

514+
@Min(0)
515+
public int getFileDeleteThreads()
516+
{
517+
return fileDeleteThreads;
518+
}
519+
520+
@Config("iceberg.file-delete-threads")
521+
@ConfigDescription("Number of threads to use for deleting files when expire_snapshots")
522+
public IcebergConfig setFileDeleteThreads(String fileDeleteThreads)
523+
{
524+
this.fileDeleteThreads = ThreadCount.valueOf(fileDeleteThreads).getThreadCount();
525+
return this;
526+
}
527+
513528
public List<String> getAllowedExtraProperties()
514529
{
515530
return allowedExtraProperties;

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,17 @@ public ExecutorService createPlanningExecutor(CatalogName catalogName, IcebergCo
8383
config.getPlanningThreads(),
8484
daemonThreadsNamed("iceberg-planning-" + catalogName + "-%s"));
8585
}
86+
87+
@Provides
88+
@Singleton
89+
@ForIcebergFileDelete
90+
public ExecutorService createFileDeleteExecutor(CatalogName catalogName, IcebergConfig config)
91+
{
92+
if (config.getFileDeleteThreads() == 0) {
93+
return newDirectExecutorService();
94+
}
95+
return newFixedThreadPool(
96+
config.getFileDeleteThreads(),
97+
daemonThreadsNamed("iceberg-file-delete-" + catalogName + "-%s"));
98+
}
8699
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ public class IcebergMetadata
455455
private final ExecutorService icebergScanExecutor;
456456
private final Executor metadataFetchingExecutor;
457457
private final ExecutorService icebergPlanningExecutor;
458+
private final ExecutorService fileDeleteExecutor;
458459
private final Map<IcebergTableHandle, AtomicReference<TableStatistics>> tableStatisticsCache = new ConcurrentHashMap<>();
459460

460461
private Transaction transaction;
@@ -472,7 +473,8 @@ public IcebergMetadata(
472473
Predicate<String> allowedExtraProperties,
473474
ExecutorService icebergScanExecutor,
474475
Executor metadataFetchingExecutor,
475-
ExecutorService icebergPlanningExecutor)
476+
ExecutorService icebergPlanningExecutor,
477+
ExecutorService fileDeleteExecutor)
476478
{
477479
this.typeManager = requireNonNull(typeManager, "typeManager is null");
478480
this.trinoCatalogHandle = requireNonNull(trinoCatalogHandle, "trinoCatalogHandle is null");
@@ -486,6 +488,7 @@ public IcebergMetadata(
486488
this.icebergScanExecutor = requireNonNull(icebergScanExecutor, "icebergScanExecutor is null");
487489
this.metadataFetchingExecutor = requireNonNull(metadataFetchingExecutor, "metadataFetchingExecutor is null");
488490
this.icebergPlanningExecutor = requireNonNull(icebergPlanningExecutor, "icebergPlanningExecutor is null");
491+
this.fileDeleteExecutor = requireNonNull(fileDeleteExecutor, "fileDeleteExecutor is null");
489492
}
490493

491494
@Override
@@ -2177,34 +2180,12 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
21772180
IcebergConfig.EXPIRE_SNAPSHOTS_MIN_RETENTION,
21782181
IcebergSessionProperties.EXPIRE_SNAPSHOTS_MIN_RETENTION);
21792182

2180-
long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis();
2181-
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), table.io().properties());
2182-
List<Location> pathsToDelete = new ArrayList<>();
2183-
// deleteFunction is not accessed from multiple threads unless .executeDeleteWith() is used
2184-
Consumer<String> deleteFunction = path -> {
2185-
pathsToDelete.add(Location.of(path));
2186-
if (pathsToDelete.size() == DELETE_BATCH_SIZE) {
2187-
try {
2188-
fileSystem.deleteFiles(pathsToDelete);
2189-
pathsToDelete.clear();
2190-
}
2191-
catch (IOException e) {
2192-
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
2193-
}
2194-
}
2195-
};
2196-
2197-
try {
2198-
table.expireSnapshots()
2199-
.expireOlderThan(expireTimestampMillis)
2200-
.deleteWith(deleteFunction)
2201-
.commit();
2202-
2203-
fileSystem.deleteFiles(pathsToDelete);
2204-
}
2205-
catch (IOException e) {
2206-
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
2207-
}
2183+
// ForwardingFileIo handles bulk operations so no separate function implementation is needed
2184+
table.expireSnapshots()
2185+
.expireOlderThan(session.getStart().toEpochMilli() - retention.toMillis())
2186+
.executeDeleteWith(fileDeleteExecutor)
2187+
.planWith(icebergScanExecutor)
2188+
.commit();
22082189
}
22092190

22102191
private static void validateTableExecuteParameters(

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class IcebergMetadataFactory
4747
private final ExecutorService icebergScanExecutor;
4848
private final Executor metadataFetchingExecutor;
4949
private final ExecutorService icebergPlanningExecutor;
50+
private final ExecutorService fileDeleteExecutor;
5051

5152
@Inject
5253
public IcebergMetadataFactory(
@@ -60,6 +61,7 @@ public IcebergMetadataFactory(
6061
@ForIcebergSplitManager ExecutorService icebergScanExecutor,
6162
@ForIcebergMetadata ExecutorService metadataExecutorService,
6263
@ForIcebergPlanning ExecutorService icebergPlanningExecutor,
64+
@ForIcebergFileDelete ExecutorService fileDeleteExecutor,
6365
IcebergConfig config)
6466
{
6567
this.typeManager = requireNonNull(typeManager, "typeManager is null");
@@ -85,6 +87,7 @@ public IcebergMetadataFactory(
8587
this.metadataFetchingExecutor = new BoundedExecutor(metadataExecutorService, config.getMetadataParallelism());
8688
}
8789
this.icebergPlanningExecutor = requireNonNull(icebergPlanningExecutor, "icebergPlanningExecutor is null");
90+
this.fileDeleteExecutor = requireNonNull(fileDeleteExecutor, "fileDeleteExecutor is null");
8891
}
8992

9093
public IcebergMetadata create(ConnectorIdentity identity)
@@ -101,6 +104,7 @@ public IcebergMetadata create(ConnectorIdentity identity)
101104
allowedExtraProperties,
102105
icebergScanExecutor,
103106
metadataFetchingExecutor,
104-
icebergPlanningExecutor);
107+
icebergPlanningExecutor,
108+
fileDeleteExecutor);
105109
}
106110
}

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,9 @@ public void testDefaults()
7272
.setSortedWritingEnabled(true)
7373
.setQueryPartitionFilterRequired(false)
7474
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of())
75-
.setSplitManagerThreads(Integer.toString(Runtime.getRuntime().availableProcessors() * 2))
76-
.setPlanningThreads(Integer.toString(Runtime.getRuntime().availableProcessors()))
75+
.setSplitManagerThreads(Integer.toString(Math.min(Runtime.getRuntime().availableProcessors() * 2, 32)))
76+
.setPlanningThreads(Integer.toString(Math.min(Runtime.getRuntime().availableProcessors(), 16)))
77+
.setFileDeleteThreads(Integer.toString(Math.min(Runtime.getRuntime().availableProcessors(), 8)))
7778
.setAllowedExtraProperties(ImmutableList.of())
7879
.setIncrementalRefreshEnabled(true)
7980
.setMetadataCacheEnabled(true)
@@ -117,6 +118,7 @@ public void testExplicitPropertyMappings()
117118
.put("iceberg.query-partition-filter-required-schemas", "bronze,silver")
118119
.put("iceberg.split-manager-threads", "42")
119120
.put("iceberg.planning-threads", "42")
121+
.put("iceberg.file-delete-threads", "42")
120122
.put("iceberg.allowed-extra-properties", "propX,propY")
121123
.put("iceberg.incremental-refresh-enabled", "false")
122124
.put("iceberg.metadata-cache.enabled", "false")
@@ -156,6 +158,7 @@ public void testExplicitPropertyMappings()
156158
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver"))
157159
.setSplitManagerThreads("42")
158160
.setPlanningThreads("42")
161+
.setFileDeleteThreads("42")
159162
.setAllowedExtraProperties(ImmutableList.of("propX", "propY"))
160163
.setIncrementalRefreshEnabled(false)
161164
.setMetadataCacheEnabled(false)

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ public void testNonLowercaseNamespace()
152152
_ -> false,
153153
newDirectExecutorService(),
154154
directExecutor(),
155+
newDirectExecutorService(),
155156
newDirectExecutorService());
156157
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
157158
.isFalse();

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ public void testNonLowercaseGlueDatabase()
139139
_ -> false,
140140
newDirectExecutorService(),
141141
directExecutor(),
142+
newDirectExecutorService(),
142143
newDirectExecutorService());
143144
assertThat(icebergMetadata.schemaExists(SESSION, databaseName)).as("icebergMetadata.schemaExists(databaseName)")
144145
.isFalse();

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ public void testNonLowercaseNamespace()
193193
_ -> false,
194194
newDirectExecutorService(),
195195
directExecutor(),
196+
newDirectExecutorService(),
196197
newDirectExecutorService());
197198
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
198199
.isTrue();

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public void testNonLowercaseNamespace()
126126
_ -> false,
127127
newDirectExecutorService(),
128128
directExecutor(),
129+
newDirectExecutorService(),
129130
newDirectExecutorService());
130131
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
131132
.isTrue();

0 commit comments

Comments
 (0)