Skip to content

Commit a0992f1

Browse files
grantatspotheroraunaqmorarka
authored andcommitted
Reduce memory usage in Iceberg remove orphan files procedure
1 parent 97254f2 commit a0992f1

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@
145145
import org.apache.iceberg.FileFormat;
146146
import org.apache.iceberg.FileMetadata;
147147
import org.apache.iceberg.FileScanTask;
148+
import org.apache.iceberg.IcebergManifestUtils;
148149
import org.apache.iceberg.IsolationLevel;
149150
import org.apache.iceberg.ManifestFile;
150151
import org.apache.iceberg.ManifestFiles;
@@ -2260,11 +2261,18 @@ private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTabl
22602261
ImmutableSet.Builder<String> validDataFileNames = ImmutableSet.builder();
22612262

22622263
for (Snapshot snapshot : table.snapshots()) {
2263-
if (snapshot.manifestListLocation() != null) {
2264-
validMetadataFileNames.add(fileName(snapshot.manifestListLocation()));
2264+
String manifestListLocation = snapshot.manifestListLocation();
2265+
List<ManifestFile> allManifests;
2266+
if (manifestListLocation != null) {
2267+
validMetadataFileNames.add(fileName(manifestListLocation));
2268+
allManifests = loadAllManifestsFromManifestList(table, manifestListLocation);
2269+
}
2270+
else {
2271+
// This is to maintain support for V1 tables which have embedded manifest lists
2272+
allManifests = loadAllManifestsFromSnapshot(table, snapshot);
22652273
}
22662274

2267-
for (ManifestFile manifest : loadAllManifestsFromSnapshot(table, snapshot)) {
2275+
for (ManifestFile manifest : allManifests) {
22682276
if (!processedManifestFilePaths.add(manifest.path())) {
22692277
// Already read this manifest
22702278
continue;
@@ -3469,6 +3477,21 @@ private static List<ManifestFile> loadAllManifestsFromSnapshot(Table icebergTabl
34693477
}
34703478
}
34713479

3480+
/**
3481+
* Use instead of loadAllManifestsFromSnapshot when loading manifests from multiple distinct snapshots
3482+
* Each BaseSnapshot object caches manifest files separately, so loading manifests from multiple distinct snapshots
3483+
* results in O(num_snapshots^2) copies of the same manifest file metadata in memory
3484+
*/
3485+
private static List<ManifestFile> loadAllManifestsFromManifestList(Table icebergTable, String manifestListLocation)
3486+
{
3487+
try {
3488+
return IcebergManifestUtils.read(icebergTable.io(), manifestListLocation);
3489+
}
3490+
catch (NotFoundException | UncheckedIOException e) {
3491+
throw new TrinoException(ICEBERG_INVALID_METADATA, "Error accessing manifest file for table %s".formatted(icebergTable.name()), e);
3492+
}
3493+
}
3494+
34723495
private static Set<Integer> identityPartitionColumnsInAllSpecs(Table table)
34733496
{
34743497
// Extract identity partition column source ids common to ALL specs
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.apache.iceberg;
15+
16+
import org.apache.iceberg.io.FileIO;
17+
18+
import java.util.List;
19+
20+
public class IcebergManifestUtils
21+
{
22+
private IcebergManifestUtils() {}
23+
24+
public static List<ManifestFile> read(FileIO fileIO, String manifestListLocation)
25+
{
26+
// Avoid using snapshot.allManifests() when processing multiple snapshots,
27+
// as each Snapshot instance internally caches `org.apache.iceberg.BaseSnapshot.allManifests`
28+
// and leads to high memory usage
29+
return ManifestLists.read(fileIO.newInputFile(manifestListLocation));
30+
}
31+
}

0 commit comments

Comments
 (0)