Skip to content

Commit d39a0a4

Browse files
committed
Improve file delete performance for expire_snapshots
- Add ForwardingFileIoFactory so delete specific ExecutorService is not injected in multiple places - Remove direct usage of ForwardingFileIo in favor of a FileIoFactory implementation - Add parallelized deletes in ForwardingFileIo
1 parent 6abf6a9 commit d39a0a4

File tree

49 files changed

+383
-150
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+383
-150
lines changed

docs/src/main/sphinx/connector/iceberg.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,9 @@ implementation is used:
228228
- Number of threads used for retrieving metadata. Currently, only table loading
229229
is parallelized.
230230
- `8`
231+
* - `iceberg.file-delete-threads`
232+
- Number of threads to use for deleting files when running `expire_snapshots` procedure.
233+
- Double the number of processors on the coordinator node.
231234
* - `iceberg.bucket-execution`
232235
- Enable bucket-aware execution. This allows the engine to use physical
233236
bucketing information to optimize queries by reducing data exchanges.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package io.trino.plugin.iceberg;
16+
17+
import com.google.inject.BindingAnnotation;
18+
19+
import java.lang.annotation.Retention;
20+
import java.lang.annotation.Target;
21+
22+
import static java.lang.annotation.ElementType.FIELD;
23+
import static java.lang.annotation.ElementType.METHOD;
24+
import static java.lang.annotation.ElementType.PARAMETER;
25+
import static java.lang.annotation.RetentionPolicy.RUNTIME;
26+
27+
@Retention(RUNTIME)
28+
@Target({FIELD, PARAMETER, METHOD})
29+
@BindingAnnotation
30+
public @interface ForIcebergFileDelete
31+
{}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public class IcebergConfig
9191
private Set<String> queryPartitionFilterRequiredSchemas = ImmutableSet.of();
9292
private int splitManagerThreads = Math.min(Runtime.getRuntime().availableProcessors() * 2, 32);
9393
private int planningThreads = Math.min(Runtime.getRuntime().availableProcessors(), 16);
94+
private int fileDeleteThreads = Runtime.getRuntime().availableProcessors() * 2;
9495
private List<String> allowedExtraProperties = ImmutableList.of();
9596
private boolean incrementalRefreshEnabled = true;
9697
private boolean metadataCacheEnabled = true;
@@ -510,6 +511,20 @@ public IcebergConfig setPlanningThreads(String planningThreads)
510511
return this;
511512
}
512513

514+
@Min(0)
515+
public int getFileDeleteThreads()
516+
{
517+
return fileDeleteThreads;
518+
}
519+
520+
@Config("iceberg.file-delete-threads")
521+
@ConfigDescription("Number of threads to use for deleting files when running expire_snapshots procedure")
522+
public IcebergConfig setFileDeleteThreads(String fileDeleteThreads)
523+
{
524+
this.fileDeleteThreads = ThreadCount.valueOf(fileDeleteThreads).getThreadCount();
525+
return this;
526+
}
527+
513528
public List<String> getAllowedExtraProperties()
514529
{
515530
return allowedExtraProperties;

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,17 @@ public ExecutorService createPlanningExecutor(CatalogName catalogName, IcebergCo
8383
config.getPlanningThreads(),
8484
daemonThreadsNamed("iceberg-planning-" + catalogName + "-%s"));
8585
}
86+
87+
@Provides
88+
@Singleton
89+
@ForIcebergFileDelete
90+
public ExecutorService createFileDeleteExecutor(CatalogName catalogName, IcebergConfig config)
91+
{
92+
if (config.getFileDeleteThreads() == 0) {
93+
return newDirectExecutorService();
94+
}
95+
return newFixedThreadPool(
96+
config.getFileDeleteThreads(),
97+
daemonThreadsNamed("iceberg-file-delete-" + catalogName + "-%s"));
98+
}
8699
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2177,34 +2177,11 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
21772177
IcebergConfig.EXPIRE_SNAPSHOTS_MIN_RETENTION,
21782178
IcebergSessionProperties.EXPIRE_SNAPSHOTS_MIN_RETENTION);
21792179

2180-
long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis();
2181-
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), table.io().properties());
2182-
List<Location> pathsToDelete = new ArrayList<>();
2183-
// deleteFunction is not accessed from multiple threads unless .executeDeleteWith() is used
2184-
Consumer<String> deleteFunction = path -> {
2185-
pathsToDelete.add(Location.of(path));
2186-
if (pathsToDelete.size() == DELETE_BATCH_SIZE) {
2187-
try {
2188-
fileSystem.deleteFiles(pathsToDelete);
2189-
pathsToDelete.clear();
2190-
}
2191-
catch (IOException e) {
2192-
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
2193-
}
2194-
}
2195-
};
2196-
2197-
try {
2198-
table.expireSnapshots()
2199-
.expireOlderThan(expireTimestampMillis)
2200-
.deleteWith(deleteFunction)
2201-
.commit();
2202-
2203-
fileSystem.deleteFiles(pathsToDelete);
2204-
}
2205-
catch (IOException e) {
2206-
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
2207-
}
2180+
// ForwardingFileIo handles bulk operations so no separate function implementation is needed
2181+
table.expireSnapshots()
2182+
.expireOlderThan(session.getStart().toEpochMilli() - retention.toMillis())
2183+
.planWith(icebergScanExecutor)
2184+
.commit();
22082185
}
22092186

22102187
private static void validateTableExecuteParameters(

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
3636
import io.trino.plugin.iceberg.cache.IcebergCacheKeyProvider;
3737
import io.trino.plugin.iceberg.catalog.rest.DefaultIcebergFileSystemFactory;
38+
import io.trino.plugin.iceberg.fileio.FileIoModule;
3839
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
3940
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory;
4041
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider;
@@ -138,9 +139,9 @@ public void configure(Binder binder)
138139

139140
newOptionalBinder(binder, IcebergFileSystemFactory.class).setDefault().to(DefaultIcebergFileSystemFactory.class).in(Scopes.SINGLETON);
140141
newOptionalBinder(binder, CacheKeyProvider.class).setBinding().to(IcebergCacheKeyProvider.class).in(Scopes.SINGLETON);
141-
142142
binder.bind(IcebergConnector.class).in(Scopes.SINGLETON);
143143

144144
binder.install(new IcebergExecutorModule());
145+
binder.install(new FileIoModule());
145146
}
146147
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition;
2424
import io.trino.plugin.iceberg.IcebergUtil;
2525
import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform;
26-
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
26+
import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory;
2727
import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
2828
import io.trino.spi.TrinoException;
2929
import io.trino.spi.catalog.CatalogName;
@@ -121,23 +121,26 @@ public abstract class AbstractTrinoCatalog
121121
protected static final String TRINO_QUERY_ID_NAME = HiveMetadata.TRINO_QUERY_ID_NAME;
122122

123123
private final CatalogName catalogName;
124+
private final boolean useUniqueTableLocation;
124125
protected final TypeManager typeManager;
125126
protected final IcebergTableOperationsProvider tableOperationsProvider;
126-
private final TrinoFileSystemFactory fileSystemFactory;
127-
private final boolean useUniqueTableLocation;
127+
protected final TrinoFileSystemFactory fileSystemFactory;
128+
protected final ForwardingFileIoFactory fileIoFactory;
128129

129130
protected AbstractTrinoCatalog(
130131
CatalogName catalogName,
132+
boolean useUniqueTableLocation,
131133
TypeManager typeManager,
132134
IcebergTableOperationsProvider tableOperationsProvider,
133135
TrinoFileSystemFactory fileSystemFactory,
134-
boolean useUniqueTableLocation)
136+
ForwardingFileIoFactory fileIoFactory)
135137
{
136138
this.catalogName = requireNonNull(catalogName, "catalogName is null");
139+
this.useUniqueTableLocation = useUniqueTableLocation;
137140
this.typeManager = requireNonNull(typeManager, "typeManager is null");
138141
this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
139142
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
140-
this.useUniqueTableLocation = useUniqueTableLocation;
143+
this.fileIoFactory = requireNonNull(fileIoFactory, "fileIoFactory is null");
141144
}
142145

143146
@Override
@@ -325,7 +328,7 @@ protected Location createMaterializedViewStorage(
325328
protected void dropMaterializedViewStorage(ConnectorSession session, TrinoFileSystem fileSystem, String storageMetadataLocation)
326329
throws IOException
327330
{
328-
TableMetadata metadata = TableMetadataParser.read(new ForwardingFileIo(fileSystem, isUseFileSizeFromMetadata(session)), storageMetadataLocation);
331+
TableMetadata metadata = TableMetadataParser.read(fileIoFactory.create(fileSystem, isUseFileSizeFromMetadata(session)), storageMetadataLocation);
329332
String storageLocation = metadata.location();
330333
fileSystem.deleteDirectory(Location.of(storageLocation));
331334
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperationsProvider.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
2020
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
2121
import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog;
22-
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
22+
import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory;
2323
import io.trino.spi.connector.ConnectorSession;
2424

2525
import java.util.Optional;
@@ -31,11 +31,15 @@ public class FileMetastoreTableOperationsProvider
3131
implements IcebergTableOperationsProvider
3232
{
3333
private final TrinoFileSystemFactory fileSystemFactory;
34+
private final ForwardingFileIoFactory fileIoFactory;
3435

3536
@Inject
36-
public FileMetastoreTableOperationsProvider(TrinoFileSystemFactory fileSystemFactory)
37+
public FileMetastoreTableOperationsProvider(
38+
TrinoFileSystemFactory fileSystemFactory,
39+
ForwardingFileIoFactory fileIoFactory)
3740
{
3841
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
42+
this.fileIoFactory = requireNonNull(fileIoFactory, "fileIoFactory is null");
3943
}
4044

4145
@Override
@@ -48,7 +52,7 @@ public IcebergTableOperations createTableOperations(
4852
Optional<String> location)
4953
{
5054
return new FileMetastoreTableOperations(
51-
new ForwardingFileIo(fileSystemFactory.create(session), isUseFileSizeFromMetadata(session)),
55+
fileIoFactory.create(fileSystemFactory.create(session), isUseFileSizeFromMetadata(session)),
5256
((TrinoHiveCatalog) catalog).getMetastore(),
5357
session,
5458
database,

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperationsProvider.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
2020
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
2121
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
22-
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
22+
import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory;
2323
import io.trino.spi.connector.ConnectorSession;
2424
import io.trino.spi.type.TypeManager;
2525
import software.amazon.awssdk.services.glue.GlueClient;
@@ -32,23 +32,26 @@
3232
public class GlueIcebergTableOperationsProvider
3333
implements IcebergTableOperationsProvider
3434
{
35+
private final TrinoFileSystemFactory fileSystemFactory;
36+
private final ForwardingFileIoFactory fileIoFactory;
3537
private final TypeManager typeManager;
3638
private final boolean cacheTableMetadata;
37-
private final TrinoFileSystemFactory fileSystemFactory;
3839
private final GlueClient glueClient;
3940
private final GlueMetastoreStats stats;
4041

4142
@Inject
4243
public GlueIcebergTableOperationsProvider(
44+
TrinoFileSystemFactory fileSystemFactory,
45+
ForwardingFileIoFactory fileIoFactory,
4346
TypeManager typeManager,
4447
IcebergGlueCatalogConfig catalogConfig,
45-
TrinoFileSystemFactory fileSystemFactory,
4648
GlueMetastoreStats stats,
4749
GlueClient glueClient)
4850
{
51+
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
52+
this.fileIoFactory = requireNonNull(fileIoFactory, "fileIoFactory is null");
4953
this.typeManager = requireNonNull(typeManager, "typeManager is null");
5054
this.cacheTableMetadata = catalogConfig.isCacheTableMetadata();
51-
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
5255
this.stats = requireNonNull(stats, "stats is null");
5356
this.glueClient = requireNonNull(glueClient, "glueClient is null");
5457
}
@@ -70,7 +73,7 @@ public IcebergTableOperations createTableOperations(
7073
// Share Glue Table cache between Catalog and TableOperations so that, when doing metadata queries (e.g. information_schema.columns)
7174
// the GetTableRequest is issued once per table.
7275
((TrinoGlueCatalog) catalog)::getTable,
73-
new ForwardingFileIo(fileSystemFactory.create(session), isUseFileSizeFromMetadata(session)),
76+
fileIoFactory.create(fileSystemFactory.create(session), isUseFileSizeFromMetadata(session)),
7477
session,
7578
database,
7679
table,

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog;
3838
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
3939
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
40-
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
40+
import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory;
4141
import io.trino.spi.TrinoException;
4242
import io.trino.spi.catalog.CatalogName;
4343
import io.trino.spi.connector.CatalogSchemaTableName;
@@ -169,7 +169,6 @@ public class TrinoGlueCatalog
169169

170170
private final String trinoVersion;
171171
private final boolean cacheTableMetadata;
172-
private final TrinoFileSystemFactory fileSystemFactory;
173172
private final Optional<String> defaultSchemaLocation;
174173
private final GlueClient glueClient;
175174
private final GlueMetastoreStats stats;
@@ -195,6 +194,7 @@ public class TrinoGlueCatalog
195194
public TrinoGlueCatalog(
196195
CatalogName catalogName,
197196
TrinoFileSystemFactory fileSystemFactory,
197+
ForwardingFileIoFactory fileIoFactory,
198198
TypeManager typeManager,
199199
boolean cacheTableMetadata,
200200
IcebergTableOperationsProvider tableOperationsProvider,
@@ -207,10 +207,9 @@ public TrinoGlueCatalog(
207207
boolean hideMaterializedViewStorageTable,
208208
Executor metadataFetchingExecutor)
209209
{
210-
super(catalogName, typeManager, tableOperationsProvider, fileSystemFactory, useUniqueTableLocation);
210+
super(catalogName, useUniqueTableLocation, typeManager, tableOperationsProvider, fileSystemFactory, fileIoFactory);
211211
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
212212
this.cacheTableMetadata = cacheTableMetadata;
213-
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
214213
this.glueClient = requireNonNull(glueClient, "glueClient is null");
215214
this.stats = requireNonNull(stats, "stats is null");
216215
this.isUsingSystemSecurity = isUsingSystemSecurity;
@@ -872,7 +871,7 @@ private Optional<Table> getTableAndCacheMetadata(ConnectorSession session, Schem
872871
try {
873872
// Cache the TableMetadata while we have the Table retrieved anyway
874873
// Note: this is racy from cache invalidation perspective, but it should not matter here
875-
uncheckedCacheGet(tableMetadataCache, schemaTableName, () -> TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session), isUseFileSizeFromMetadata(session)), metadataLocation));
874+
uncheckedCacheGet(tableMetadataCache, schemaTableName, () -> TableMetadataParser.read(fileIoFactory.create(fileSystemFactory.create(session), isUseFileSizeFromMetadata(session)), metadataLocation));
876875
}
877876
catch (RuntimeException e) {
878877
LOG.warn(e, "Failed to cache table metadata from table at %s", metadataLocation);
@@ -1408,7 +1407,7 @@ private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session,
14081407
requireNonNull(storageMetadataLocation, "storageMetadataLocation is null");
14091408
return uncheckedCacheGet(tableMetadataCache, storageTableName, () -> {
14101409
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
1411-
return TableMetadataParser.read(new ForwardingFileIo(fileSystem, isUseFileSizeFromMetadata(session)), storageMetadataLocation);
1410+
return TableMetadataParser.read(fileIoFactory.create(fileSystem, isUseFileSizeFromMetadata(session)), storageMetadataLocation);
14121411
});
14131412
}
14141413

0 commit comments

Comments
 (0)