-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Improve file delete performance for expire_snapshots #26230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
9e77d23
to
9ffdd87
Compare
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
Outdated
Show resolved
Hide resolved
.../trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
896fd0d
to
e0cd470
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR improves the performance of the expire_snapshots
procedure in Iceberg by introducing parallel file deletion capabilities and delegating file cleanup operations to Iceberg's native bulk delete functionality.
Key changes include:
- Introduces a new configurable thread pool for file deletion operations
- Replaces custom file deletion logic with Iceberg's native bulk delete operations
- Updates thread pool sizing defaults with reasonable upper bounds
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
ForIcebergFileDelete.java | Adds new binding annotation for file delete executor dependency injection |
IcebergConfig.java | Adds configuration for file delete thread pool size |
IcebergExecutorModule.java | Creates new executor service for file deletion operations |
IcebergMetadata.java | Replaces custom file deletion with Iceberg's native bulk delete operations |
IcebergMetadataFactory.java | Integrates file delete executor into metadata factory |
TestIcebergConfig.java | Updates tests for new configuration and thread pool defaults |
BaseIcebergMinioConnectorSmokeTest.java | Updates test expectations for bulk delete behavior |
Various test files | Adds file delete executor parameter to test constructors |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Show resolved
Hide resolved
@tbaeg are you able to share any benchmark results from trying this change ? |
At the moment, I do not have any benchmarks to share. For our testing environment I'd first need to back port the change. I can try to get something this week. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good except for missing documentation.
e0cd470
to
c0402c7
Compare
@tbaeg if it is not much trouble, could you add in the description of the PR an informal benchmark with and without parallelism for |
@@ -228,6 +228,9 @@ implementation is used: | |||
- Number of threads used for retrieving metadata. Currently, only table loading | |||
is parallelized. | |||
- `8` | |||
* - `iceberg.file-delete-threads` | |||
- Number of threads to use for deleting files when running `expire_snapshots` procedure. | |||
- Double the number of processors on the coordinator node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Misc question: Do we have any way to restrict certain settings only for the coordinator ?
Outside of the scope of the current PR
.../trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
c0402c7
to
4dc4d6c
Compare
// ForwardingFileIo handles bulk operations so no separate function implementation is needed | ||
table.expireSnapshots() | ||
.expireOlderThan(session.getStart().toEpochMilli() - retention.toMillis()) | ||
.executeDeleteWith(fileDeleteExecutor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder whether we could ensure somehow that there are no regressions in terms of sending in bulk requests to aws s3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming something needs to be deleted in each category (i.e. - data/manifest/manifest list/statistics) you technically have more minimum requests. But the improvements should really shine as more files need to be deleted. So in a sense, this is a "regression".. although I'd categorize this as more of a trade off for throughput.
One of the aspects I imagine could become an issue, is excessive 500 responses from S3, since each element in a batch delete requests technically translates to a request towards the rate limiting quota. This would require testing at a certain scale to verify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies, I was wrong about the parallelization in iceberg. Looking at the code more closely, it only parallelizes when the FileIO
implementation does NOT support bulk operations. So, there is effectively no performance change.
There would need to be a change in iceberg to support parallelized bulk operations, or the ForwardingFileIo
would need to handle the parallelization.
Old:
10 per deletes per request (manually set the delete batch size) - 412 total requests
2025-07-21T13:00:29.337-0600 INFO ForkJoinPool-1-worker-1 stdout 5024 metadata files found
2025-07-21T13:00:29.338-0600 INFO ForkJoinPool-1-worker-1 stdout 1002 data files found
2025-07-21T13:00:32.795-0600 INFO ForkJoinPool-1-worker-1 stdout Time for expire_snapshots time: 3457
2025-07-21T13:00:32.845-0600 INFO ForkJoinPool-1-worker-1 stdout 2011 metadata files found after expire_snapshots
2025-07-21T13:00:32.845-0600 INFO ForkJoinPool-1-worker-1 stdout 2 data files found after expire_snapshots
New:
10 per deletes per request (manually set the delete batch size) - 413 total requests
2025-07-21T12:53:18.326-0600 INFO ForkJoinPool-1-worker-1 stdout 5024 metadata files found
2025-07-21T12:53:18.326-0600 INFO ForkJoinPool-1-worker-1 stdout 1002 data files found
2025-07-21T12:53:21.775-0600 INFO ForkJoinPool-1-worker-1 stdout Time for expire_snapshots time: 3448
2025-07-21T12:53:21.819-0600 INFO ForkJoinPool-1-worker-1 stdout 2011 metadata files found after expire_snapshots
2025-07-21T12:53:21.819-0600 INFO ForkJoinPool-1-worker-1 stdout 2 data files found after expire_snapshots
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our case SupportsBulkOperations
is always implemented because of using ForwardingFileIo
.
So we have to do the parallelism in io.trino.plugin.iceberg.fileio.ForwardingFileIo#deleteFiles
and skip using executeDeleteWith
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another crude benchmark.
2025-07-22T12:50:13.288-0600 INFO ForkJoinPool-1-worker-1 stdout 5024 metadata files found
2025-07-22T12:50:13.289-0600 INFO ForkJoinPool-1-worker-1 stdout 1002 data files found
2025-07-22T12:50:15.741-0600 INFO ForkJoinPool-1-worker-1 stdout Time for expire_snapshots time: 2452
2025-07-22T12:50:15.797-0600 INFO ForkJoinPool-1-worker-1 stdout 2011 metadata files found after expire_snapshots
2025-07-22T12:50:15.797-0600 INFO ForkJoinPool-1-worker-1 stdout 2 data files found after expire_snapshots
4dc4d6c
to
5bc97de
Compare
I was trying to get something informal together on a test cluster with more substantial data, but it looks like it might not be possible. I can write a simple test and profile it locally, but not sure how worthwhile that is. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
Show resolved
Hide resolved
dc1c0d2
to
8845baa
Compare
Did a POC that parallelizes |
353c0eb
to
49a2a12
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.
import com.google.inject.Module; | ||
import com.google.inject.Scopes; | ||
|
||
public class FileIoModule |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a new Module now ? Doesn't seem justified with just one binding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly because the lakehouse
connector was using it. If I place it in the IcebergModule
it doesn't work. Open to suggestions on where to place it.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/FileIoFactory.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java
Outdated
Show resolved
Hide resolved
...in/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/RegisterTableProcedure.java
Outdated
Show resolved
Hide resolved
8685530
to
d39a0a4
Compare
- Add ForwardingFileIoFactory so delete specific ExecutorService is not injected in multiple places - Remove direct usage of ForwardingFileIo in favor of a FileIoFactory implementation - Add parallelized deletes in ForwardingFileIo
d39a0a4
to
1be0c5a
Compare
Description
Improve file delete performance for expire_snapshots.
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: