Skip to content

Commit 1c37cdf

Browse files
committed
Allow using temporary staging path in Iceberg for writing sorted files
1 parent 5b81408 commit 1c37cdf

File tree

11 files changed

+171
-23
lines changed

11 files changed

+171
-23
lines changed

docs/src/main/sphinx/connector/iceberg.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,22 @@ implementation is used:
217217
- Enable [sorted writing](iceberg-sorted-files) to tables with a specified sort order. Equivalent
218218
session property is `sorted_writing_enabled`.
219219
- `true`
220+
* - `iceberg.temporary-staging-directory-enabled`
221+
- Controls whether the temporary staging directory configured at
222+
`iceberg.temporary-staging-directory-path` is used for write operations.
223+
Temporary staging directory is never used for writes to non-sorted tables on
224+
S3, encrypted HDFS or external location. Writes to sorted tables will
225+
utilize this path for staging temporary files during sorting operation. When
226+
disabled, the target storage will be used for staging while writing sorted
227+
tables which can be inefficient when writing to object stores like S3. Equivalent
228+
session property is `sorted_writing_temporary_staging_directory_enabled`.
229+
- `false`
230+
* - `iceberg.temporary-staging-directory-path`
231+
- Controls the location of temporary staging directory that is used for write
232+
operations. The `${USER}` placeholder can be used to use a different
233+
location for each user. Equivalent session property is
234+
`sorted_writing_temporary_staging_directory_path`.
235+
- `/tmp/presto-${USER}`
220236
* - `iceberg.allowed-extra-properties`
221237
- List of extra properties that are allowed to be set on Iceberg tables.
222238
Use `*` to allow all properties.

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public class IcebergConfig
8787
private boolean hideMaterializedViewStorageTable = true;
8888
private Optional<String> materializedViewsStorageSchema = Optional.empty();
8989
private boolean sortedWritingEnabled = true;
90+
private boolean temporaryStagingDirectoryEnabled;
91+
private String temporaryStagingDirectoryPath = "/tmp/presto-${USER}";
9092
private boolean queryPartitionFilterRequired;
9193
private Set<String> queryPartitionFilterRequiredSchemas = ImmutableSet.of();
9294
private int splitManagerThreads = Math.min(Runtime.getRuntime().availableProcessors() * 2, 32);
@@ -454,6 +456,33 @@ public IcebergConfig setSortedWritingEnabled(boolean sortedWritingEnabled)
454456
return this;
455457
}
456458

459+
public boolean isTemporaryStagingDirectoryEnabled()
460+
{
461+
return temporaryStagingDirectoryEnabled;
462+
}
463+
464+
@Config("iceberg.temporary-staging-directory-enabled")
465+
@ConfigDescription("Should use temporary staging directory for write operations")
466+
public IcebergConfig setTemporaryStagingDirectoryEnabled(boolean temporaryStagingDirectoryEnabled)
467+
{
468+
this.temporaryStagingDirectoryEnabled = temporaryStagingDirectoryEnabled;
469+
return this;
470+
}
471+
472+
@NotNull
473+
public String getTemporaryStagingDirectoryPath()
474+
{
475+
return temporaryStagingDirectoryPath;
476+
}
477+
478+
@Config("iceberg.temporary-staging-directory-path")
479+
@ConfigDescription("Location of temporary staging directory for write operations. Use ${USER} placeholder to use different location for each user")
480+
public IcebergConfig setTemporaryStagingDirectoryPath(String temporaryStagingDirectoryPath)
481+
{
482+
this.temporaryStagingDirectoryPath = temporaryStagingDirectoryPath;
483+
return this;
484+
}
485+
457486
@Config("iceberg.query-partition-filter-required")
458487
@ConfigDescription("Require a filter on at least one partition column")
459488
public IcebergConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired)

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,11 @@
6363
import static io.airlift.slice.Slices.wrappedBuffer;
6464
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
6565
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_TOO_MANY_OPEN_PARTITIONS;
66+
import static io.trino.plugin.iceberg.IcebergSessionProperties.getSortedWritingTempStagingDirPath;
67+
import static io.trino.plugin.iceberg.IcebergSessionProperties.getSortedWritingWriterBufferSize;
68+
import static io.trino.plugin.iceberg.IcebergSessionProperties.getSortedWritingWriterMaxOpenFiles;
6669
import static io.trino.plugin.iceberg.IcebergSessionProperties.isSortedWritingEnabled;
70+
import static io.trino.plugin.iceberg.IcebergSessionProperties.isSortedWritingTempStagingDirEnabled;
6771
import static io.trino.plugin.iceberg.IcebergUtil.getTopLevelColumns;
6872
import static io.trino.plugin.iceberg.PartitionTransforms.getColumnTransform;
6973
import static io.trino.plugin.iceberg.util.Timestamps.getTimestampTz;
@@ -116,6 +120,8 @@ public class IcebergPageSink
116120
private final boolean sortedWritingEnabled;
117121
private final DataSize sortingFileWriterBufferSize;
118122
private final Integer sortingFileWriterMaxOpenFiles;
123+
private final boolean sortedWritingTempStagingPathEnabled;
124+
private final String sortedWritingTempStagingPath;
119125
private final Location tempDirectory;
120126
private final TypeManager typeManager;
121127
private final PageSorter pageSorter;
@@ -147,8 +153,6 @@ public IcebergPageSink(
147153
Map<String, String> storageProperties,
148154
int maxOpenWriters,
149155
List<TrinoSortField> sortOrder,
150-
DataSize sortingFileWriterBufferSize,
151-
int sortingFileWriterMaxOpenFiles,
152156
TypeManager typeManager,
153157
PageSorter pageSorter)
154158
{
@@ -169,8 +173,10 @@ public IcebergPageSink(
169173
this.storageProperties = requireNonNull(storageProperties, "storageProperties is null");
170174
this.sortOrder = requireNonNull(sortOrder, "sortOrder is null");
171175
this.sortedWritingEnabled = isSortedWritingEnabled(session);
172-
this.sortingFileWriterBufferSize = requireNonNull(sortingFileWriterBufferSize, "sortingFileWriterBufferSize is null");
173-
this.sortingFileWriterMaxOpenFiles = sortingFileWriterMaxOpenFiles;
176+
this.sortingFileWriterBufferSize = getSortedWritingWriterBufferSize(session);
177+
this.sortingFileWriterMaxOpenFiles = getSortedWritingWriterMaxOpenFiles(session);
178+
this.sortedWritingTempStagingPathEnabled = isSortedWritingTempStagingDirEnabled(session);
179+
this.sortedWritingTempStagingPath = getSortedWritingTempStagingDirPath(session);
174180
this.tempDirectory = Location.of(locationProvider.newDataLocation("trino-tmp-files"));
175181
this.typeManager = requireNonNull(typeManager, "typeManager is null");
176182
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
@@ -355,12 +361,10 @@ private int[] getWriterIndexes(Page page)
355361
.orElseGet(() -> locationProvider.newDataLocation(fileName));
356362

357363
if (!sortOrder.isEmpty() && sortedWritingEnabled) {
358-
String tempName = "sorting-file-writer-%s-%s".formatted(session.getQueryId(), randomUUID());
359-
Location tempFilePrefix = tempDirectory.appendPath(tempName);
360364
WriteContext writerContext = createWriter(outputPath, partitionData);
361365
IcebergFileWriter sortedFileWriter = new IcebergSortingFileWriter(
362366
fileSystem,
363-
tempFilePrefix,
367+
createTempFileLocation(),
364368
writerContext.getWriter(),
365369
sortingFileWriterBufferSize,
366370
sortingFileWriterMaxOpenFiles,
@@ -388,6 +392,29 @@ private int[] getWriterIndexes(Page page)
388392
return writerIndexes;
389393
}
390394

395+
private Location createTempFileLocation()
396+
{
397+
String tempFileName = "sorting-file-writer-%s-%s".formatted(session.getQueryId(), randomUUID());
398+
Location tempFilePath;
399+
if (sortedWritingTempStagingPathEnabled) {
400+
String stagingPath = sortedWritingTempStagingPath.replace("${USER}", session.getIdentity().getUser());
401+
Location tempPrefix = setSchemeToFileIfAbsent(Location.of(stagingPath));
402+
tempFilePath = tempPrefix.appendPath(tempFileName);
403+
}
404+
else {
405+
tempFilePath = tempDirectory.appendPath(tempFileName);
406+
}
407+
return tempFilePath;
408+
}
409+
410+
private static Location setSchemeToFileIfAbsent(Location location)
411+
{
412+
if (location.scheme().isPresent()) {
413+
return location;
414+
}
415+
return Location.of("file:///" + location.path());
416+
}
417+
391418
@Override
392419
public void closeIdleWriters()
393420
{

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515

1616
import com.google.inject.Inject;
1717
import io.airlift.json.JsonCodec;
18-
import io.airlift.units.DataSize;
19-
import io.trino.plugin.hive.SortingFileWriterConfig;
2018
import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle;
2119
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
2220
import io.trino.spi.PageIndexerFactory;
@@ -52,8 +50,6 @@ public class IcebergPageSinkProvider
5250
private final JsonCodec<CommitTaskData> jsonCodec;
5351
private final IcebergFileWriterFactory fileWriterFactory;
5452
private final PageIndexerFactory pageIndexerFactory;
55-
private final DataSize sortingFileWriterBufferSize;
56-
private final int sortingFileWriterMaxOpenFiles;
5753
private final TypeManager typeManager;
5854
private final PageSorter pageSorter;
5955

@@ -63,16 +59,13 @@ public IcebergPageSinkProvider(
6359
JsonCodec<CommitTaskData> jsonCodec,
6460
IcebergFileWriterFactory fileWriterFactory,
6561
PageIndexerFactory pageIndexerFactory,
66-
SortingFileWriterConfig sortingFileWriterConfig,
6762
TypeManager typeManager,
6863
PageSorter pageSorter)
6964
{
7065
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
7166
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
7267
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
7368
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
74-
this.sortingFileWriterBufferSize = sortingFileWriterConfig.getWriterSortBufferSize();
75-
this.sortingFileWriterMaxOpenFiles = sortingFileWriterConfig.getMaxOpenSortFiles();
7669
this.typeManager = requireNonNull(typeManager, "typeManager is null");
7770
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
7871
}
@@ -109,8 +102,6 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
109102
tableHandle.storageProperties(),
110103
maxPartitionsPerWriter(session),
111104
tableHandle.sortOrder(),
112-
sortingFileWriterBufferSize,
113-
sortingFileWriterMaxOpenFiles,
114105
typeManager,
115106
pageSorter);
116107
}
@@ -140,8 +131,6 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
140131
optimizeHandle.tableStorageProperties(),
141132
maxPartitionsPerWriter(session),
142133
optimizeHandle.sortOrder(),
143-
sortingFileWriterBufferSize,
144-
sortingFileWriterMaxOpenFiles,
145134
typeManager,
146135
pageSorter);
147136
case OPTIMIZE_MANIFESTS:

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.trino.orc.OrcWriteValidation.OrcWriteValidationMode;
2121
import io.trino.plugin.base.session.SessionPropertiesProvider;
2222
import io.trino.plugin.hive.HiveCompressionOption;
23+
import io.trino.plugin.hive.SortingFileWriterConfig;
2324
import io.trino.plugin.hive.orc.OrcReaderConfig;
2425
import io.trino.plugin.hive.orc.OrcWriterConfig;
2526
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
@@ -106,6 +107,10 @@ public final class IcebergSessionProperties
106107
public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "remove_orphan_files_min_retention";
107108
private static final String MERGE_MANIFESTS_ON_WRITE = "merge_manifests_on_write";
108109
private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled";
110+
private static final String SORTED_WRITING_WRITER_BUFFER_SIZE = "sorted_writing_write_buffer_size";
111+
private static final String SORTED_WRITING_WRITER_MAX_OPEN_FILES = "sorted_writing_writer_max_open_files";
112+
private static final String SORTED_WRITING_TEMP_STAGING_DIR_ENABLED = "sorted_writing_temporary_staging_directory_enabled";
113+
private static final String SORTED_WRITING_TEMP_STAGING_DIR_PATH = "sorted_writing_temporary_staging_directory_path";
109114
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
110115
private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas";
111116
private static final String INCREMENTAL_REFRESH_ENABLED = "incremental_refresh_enabled";
@@ -121,7 +126,8 @@ public IcebergSessionProperties(
121126
OrcReaderConfig orcReaderConfig,
122127
OrcWriterConfig orcWriterConfig,
123128
ParquetReaderConfig parquetReaderConfig,
124-
ParquetWriterConfig parquetWriterConfig)
129+
ParquetWriterConfig parquetWriterConfig,
130+
SortingFileWriterConfig sortingFileWriterConfig)
125131
{
126132
sessionProperties = ImmutableList.<PropertyMetadata<?>>builder()
127133
.add(dataSizeProperty(
@@ -368,6 +374,26 @@ public IcebergSessionProperties(
368374
"Enable sorted writing to tables with a specified sort order",
369375
icebergConfig.isSortedWritingEnabled(),
370376
false))
377+
.add(dataSizeProperty(
378+
SORTED_WRITING_WRITER_BUFFER_SIZE,
379+
"Target size of buffer files used during sorting",
380+
sortingFileWriterConfig.getWriterSortBufferSize(),
381+
false))
382+
.add(integerProperty(
383+
SORTED_WRITING_WRITER_MAX_OPEN_FILES,
384+
"Max number of concurrently open buffer files during sorting",
385+
sortingFileWriterConfig.getMaxOpenSortFiles(),
386+
false))
387+
.add(booleanProperty(
388+
SORTED_WRITING_TEMP_STAGING_DIR_ENABLED,
389+
"Should use (if possible) temporary staging directory for write operations",
390+
icebergConfig.isTemporaryStagingDirectoryEnabled(),
391+
false))
392+
.add(stringProperty(
393+
SORTED_WRITING_TEMP_STAGING_DIR_PATH,
394+
"Location of temporary staging directory for write operations. Use ${USER} placeholder to use different location for each user",
395+
icebergConfig.getTemporaryStagingDirectoryPath(),
396+
false))
371397
.add(booleanProperty(
372398
QUERY_PARTITION_FILTER_REQUIRED,
373399
"Require filter on partition column",
@@ -643,6 +669,26 @@ public static boolean isSortedWritingEnabled(ConnectorSession session)
643669
return session.getProperty(SORTED_WRITING_ENABLED, Boolean.class);
644670
}
645671

672+
public static DataSize getSortedWritingWriterBufferSize(ConnectorSession session)
673+
{
674+
return session.getProperty(SORTED_WRITING_WRITER_BUFFER_SIZE, DataSize.class);
675+
}
676+
677+
public static Integer getSortedWritingWriterMaxOpenFiles(ConnectorSession session)
678+
{
679+
return session.getProperty(SORTED_WRITING_WRITER_MAX_OPEN_FILES, Integer.class);
680+
}
681+
682+
public static boolean isSortedWritingTempStagingDirEnabled(ConnectorSession session)
683+
{
684+
return session.getProperty(SORTED_WRITING_TEMP_STAGING_DIR_ENABLED, Boolean.class);
685+
}
686+
687+
public static String getSortedWritingTempStagingDirPath(ConnectorSession session)
688+
{
689+
return session.getProperty(SORTED_WRITING_TEMP_STAGING_DIR_PATH, String.class);
690+
}
691+
646692
public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
647693
{
648694
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,33 @@ public void testFileSortingWithLargerTable()
566566
}
567567
}
568568

569+
@Test
570+
public void testFileSortingWithLargerTableAndTempDirForBufferFiles()
571+
{
572+
// Using a larger table forces buffered data to be written to disk
573+
Session withSmallRowGroups = Session.builder(getSession())
574+
.setCatalogSessionProperty("iceberg", "orc_writer_max_stripe_rows", "200")
575+
.setCatalogSessionProperty("iceberg", "parquet_writer_block_size", "20kB")
576+
.setCatalogSessionProperty("iceberg", "parquet_writer_batch_size", "200")
577+
.setCatalogSessionProperty("iceberg", "sorted_writing_temporary_staging_directory_enabled", "true")
578+
.setCatalogSessionProperty("iceberg", "sorted_writing_write_buffer_size", "2kB")
579+
.setCatalogSessionProperty("iceberg", "sorted_writing_writer_max_open_files", "5")
580+
.build();
581+
try (TestTable table = new TestTable(
582+
getQueryRunner()::execute,
583+
"test_sorted_lineitem_table",
584+
"WITH (sorted_by = ARRAY['comment'], format = '" + format.name() + "') AS TABLE tpch.tiny.lineitem WITH NO DATA")) {
585+
assertUpdate(
586+
withSmallRowGroups,
587+
"INSERT INTO " + table.getName() + " TABLE tpch.tiny.lineitem",
588+
"VALUES 60175");
589+
for (Object filePath : computeActual("SELECT file_path from \"" + table.getName() + "$files\"").getOnlyColumnAsSet()) {
590+
assertThat(isFileSorted(Location.of((String) filePath), "comment")).isTrue();
591+
}
592+
assertQuery("SELECT * FROM " + table.getName(), "SELECT * FROM lineitem");
593+
}
594+
}
595+
569596
@Test
570597
public void testDropTableWithMissingMetadataFile()
571598
throws Exception

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.trino.parquet.metadata.ParquetMetadata;
3737
import io.trino.parquet.reader.MetadataReader;
3838
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
39+
import io.trino.plugin.hive.SortingFileWriterConfig;
3940
import io.trino.plugin.hive.TrinoViewHiveMetastore;
4041
import io.trino.plugin.hive.orc.OrcReaderConfig;
4142
import io.trino.plugin.hive.orc.OrcWriterConfig;
@@ -91,7 +92,8 @@ public final class IcebergTestUtils
9192
new OrcReaderConfig(),
9293
new OrcWriterConfig(),
9394
new ParquetReaderConfig(),
94-
new ParquetWriterConfig()).getSessionProperties())
95+
new ParquetWriterConfig(),
96+
new SortingFileWriterConfig()).getSessionProperties())
9597
.build();
9698

9799
private IcebergTestUtils() {}

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ public void testDefaults()
7070
.setRegisterTableProcedureEnabled(false)
7171
.setAddFilesProcedureEnabled(false)
7272
.setSortedWritingEnabled(true)
73+
.setTemporaryStagingDirectoryEnabled(false)
74+
.setTemporaryStagingDirectoryPath("/tmp/presto-${USER}")
7375
.setQueryPartitionFilterRequired(false)
7476
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of())
7577
.setSplitManagerThreads(Integer.toString(Runtime.getRuntime().availableProcessors() * 2))
@@ -113,6 +115,8 @@ public void testExplicitPropertyMappings()
113115
.put("iceberg.register-table-procedure.enabled", "true")
114116
.put("iceberg.add-files-procedure.enabled", "true")
115117
.put("iceberg.sorted-writing-enabled", "false")
118+
.put("iceberg.temporary-staging-directory-enabled", "true")
119+
.put("iceberg.temporary-staging-directory-path", "/tmp/presto")
116120
.put("iceberg.query-partition-filter-required", "true")
117121
.put("iceberg.query-partition-filter-required-schemas", "bronze,silver")
118122
.put("iceberg.split-manager-threads", "42")
@@ -152,6 +156,8 @@ public void testExplicitPropertyMappings()
152156
.setRegisterTableProcedureEnabled(true)
153157
.setAddFilesProcedureEnabled(true)
154158
.setSortedWritingEnabled(false)
159+
.setTemporaryStagingDirectoryEnabled(true)
160+
.setTemporaryStagingDirectoryPath("/tmp/presto")
155161
.setQueryPartitionFilterRequired(true)
156162
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver"))
157163
.setSplitManagerThreads("42")

0 commit comments

Comments
 (0)