From 7bf7b49978663fb88462da02406d816148958b42 Mon Sep 17 00:00:00 2001 From: William Hyun Date: Sun, 29 Jun 2025 20:11:37 -0700 Subject: [PATCH 1/3] Initial Commit --- .../metastore/DeltaLakeMetastoreModule.java | 1 + plugin/trino-hive/pom.xml | 44 + .../hive/metastore/HiveMetastoreModule.java | 2 + .../hive/metastore/MetastoreTypeConfig.java | 1 + .../hive/metastore/polaris/AwsProperties.java | 21 + .../polaris/DefaultAwsProperties.java | 28 + .../metastore/polaris/ForPolarisClient.java | 31 + .../metastore/polaris/ForwardingFileIo.java | 92 ++ .../polaris/ForwardingInputFile.java | 87 ++ .../polaris/ForwardingOutputFile.java | 126 +++ .../ForwardingSeekableInputStream.java | 146 +++ .../metastore/polaris/NoneSecurityModule.java | 27 + .../polaris/NoneSecurityProperties.java | 28 + .../polaris/OAuth2SecurityConfig.java | 111 +++ .../polaris/OAuth2SecurityModule.java | 30 + .../polaris/OAuth2SecurityProperties.java | 57 ++ .../PolarisAlreadyExistsException.java | 32 + .../metastore/polaris/PolarisException.java | 36 + .../polaris/PolarisGenericTable.java | 117 +++ .../polaris/PolarisHiveMetastore.java | 903 ++++++++++++++++++ .../polaris/PolarisHiveMetastoreFactory.java | 52 + .../polaris/PolarisMetastoreConfig.java | 286 ++++++ .../polaris/PolarisMetastoreModule.java | 115 +++ .../metastore/polaris/PolarisNamespace.java | 82 ++ .../polaris/PolarisNotFoundException.java | 32 + .../metastore/polaris/PolarisRestClient.java | 657 +++++++++++++ .../polaris/PolarisTableIdentifier.java | 80 ++ .../polaris/PolarisTableMetadata.java | 94 ++ .../metastore/polaris/SecurityProperties.java | 21 + .../io/trino/plugin/hive/TestHivePlugin.java | 24 + .../lakehouse/LakehouseDeltaModule.java | 1 + .../lakehouse/LakehouseIcebergModule.java | 1 + 32 files changed, 3365 insertions(+) create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/AwsProperties.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/DefaultAwsProperties.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForPolarisClient.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingFileIo.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingInputFile.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingOutputFile.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingSeekableInputStream.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/NoneSecurityModule.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/NoneSecurityProperties.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/OAuth2SecurityConfig.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/OAuth2SecurityModule.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/OAuth2SecurityProperties.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisAlreadyExistsException.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisException.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisGenericTable.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisHiveMetastore.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisHiveMetastoreFactory.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisMetastoreConfig.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisMetastoreModule.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisNamespace.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisNotFoundException.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisRestClient.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisTableIdentifier.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisTableMetadata.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/SecurityProperties.java diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastoreModule.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastoreModule.java index 7f7d24a62e2b..642a3b227e53 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastoreModule.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastoreModule.java @@ -31,6 +31,7 @@ protected void setup(Binder binder) case THRIFT -> new DeltaLakeThriftMetastoreModule(); case FILE -> new DeltaLakeFileMetastoreModule(); case GLUE -> new DeltaLakeGlueMetastoreModule(); + case POLARIS -> new DeltaLakeFileMetastoreModule(); }); install(new CachingHiveMetastoreModule()); diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml index 705f3c9ac6c2..20e0caac0991 100644 --- a/plugin/trino-hive/pom.xml +++ b/plugin/trino-hive/pom.xml @@ -33,6 +33,12 @@ com.google.guava guava + + + org.jspecify + jspecify + + @@ -66,6 +72,11 @@ configuration + + io.airlift + http-client + + io.airlift json @@ -171,6 +182,23 @@ avro + + + org.apache.iceberg + iceberg-api + + + + org.apache.iceberg + iceberg-core + + + org.jspecify + jspecify + + + + org.apache.parquet parquet-column @@ -330,6 +358,13 @@ runtime + + org.apache.iceberg + iceberg-bundled-guava + ${dep.iceberg.version} + runtime + + org.apache.parquet parquet-common @@ -584,6 +619,15 @@ + + org.basepom.maven + duplicate-finder-maven-plugin + + + iceberg-build.properties + + + diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastoreModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastoreModule.java index f069dec6f132..3bfe0611a785 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastoreModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastoreModule.java @@ -22,6 +22,7 @@ import io.trino.plugin.hive.AllowHiveTableRename; import io.trino.plugin.hive.metastore.file.FileMetastoreModule; import io.trino.plugin.hive.metastore.glue.GlueMetastoreModule; +import io.trino.plugin.hive.metastore.polaris.PolarisMetastoreModule; import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreModule; import java.util.Optional; @@ -48,6 +49,7 @@ protected void setup(Binder binder) case THRIFT -> new ThriftMetastoreModule(); case FILE -> new FileMetastoreModule(); case GLUE -> new GlueMetastoreModule(); + case POLARIS -> new PolarisMetastoreModule(); }); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/MetastoreTypeConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/MetastoreTypeConfig.java index 0a2e3ab0df95..faedb9c7c938 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/MetastoreTypeConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/MetastoreTypeConfig.java @@ -25,6 +25,7 @@ public enum MetastoreType THRIFT, FILE, GLUE, + POLARIS, } private MetastoreType metastoreType = THRIFT; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/AwsProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/AwsProperties.java new file mode 100644 index 000000000000..a9fe328924ef --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/AwsProperties.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import java.util.Map; + +public interface AwsProperties +{ + Map get(); +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/DefaultAwsProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/DefaultAwsProperties.java new file mode 100644 index 000000000000..cf10fda937ae --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/DefaultAwsProperties.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +public class DefaultAwsProperties + implements AwsProperties +{ + @Override + public Map get() + { + return ImmutableMap.of(); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForPolarisClient.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForPolarisClient.java new file mode 100644 index 000000000000..4435099c0f15 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForPolarisClient.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@BindingAnnotation +public @interface ForPolarisClient +{ +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingFileIo.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingFileIo.java new file mode 100644 index 000000000000..b7221703a527 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingFileIo.java @@ -0,0 +1,92 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Forwarding FileIO implementation that delegates to Trino's TrinoFileSystem + * to avoid Hadoop dependencies in the Polaris metastore integration. + */ +public class ForwardingFileIo + implements org.apache.iceberg.io.FileIO +{ + private final TrinoFileSystem fileSystem; + private final Map properties; + + public ForwardingFileIo(TrinoFileSystem fileSystem) + { + this(fileSystem, ImmutableMap.of()); + } + + public ForwardingFileIo(TrinoFileSystem fileSystem, Map properties) + { + this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); + this.properties = ImmutableMap.copyOf(requireNonNull(properties, "properties is null")); + } + + @Override + public InputFile newInputFile(String path) + { + return new ForwardingInputFile(fileSystem.newInputFile(Location.of(path))); + } + + @Override + public InputFile newInputFile(String path, long length) + { + return new ForwardingInputFile(fileSystem.newInputFile(Location.of(path), length)); + } + + @Override + public OutputFile newOutputFile(String path) + { + return new ForwardingOutputFile(fileSystem, Location.of(path)); + } + + @Override + public void deleteFile(String path) + { + try { + fileSystem.deleteFile(Location.of(path)); + } + catch (IOException e) { + throw new UncheckedIOException("Failed to delete file: " + path, e); + } + } + + @Override + public Map properties() + { + return properties; + } + + @Override + public void initialize(Map properties) + { + throw new UnsupportedOperationException("ForwardingFileIO does not support initialization by properties"); + } + + @Override + public void close() {} +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingInputFile.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingInputFile.java new file mode 100644 index 000000000000..39ad5c4f0929 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingInputFile.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import io.trino.filesystem.TrinoInputFile; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; + +import static java.util.Objects.requireNonNull; + +/** + * Forwarding InputFile implementation that delegates to Trino's TrinoInputFile. + */ +public class ForwardingInputFile + implements InputFile +{ + private final TrinoInputFile inputFile; + + public ForwardingInputFile(TrinoInputFile inputFile) + { + this.inputFile = requireNonNull(inputFile, "inputFile is null"); + } + + @Override + public long getLength() + { + try { + return inputFile.length(); + } + catch (IOException e) { + throw new UncheckedIOException("Failed to get status for file: " + location(), e); + } + } + + @Override + public SeekableInputStream newStream() + { + try { + return new ForwardingSeekableInputStream(inputFile.newStream()); + } + catch (FileNotFoundException e) { + throw new NotFoundException(e, "Failed to open input stream for file: %s", location()); + } + catch (IOException e) { + throw new UncheckedIOException("Failed to open input stream for file: " + location(), e); + } + } + + @Override + public String location() + { + return inputFile.location().toString(); + } + + @Override + public boolean exists() + { + try { + return inputFile.exists(); + } + catch (IOException e) { + throw new UncheckedIOException("Failed to check existence for file: " + location(), e); + } + } + + @Override + public String toString() + { + return inputFile.toString(); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingOutputFile.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingOutputFile.java new file mode 100644 index 000000000000..a4ba4d17734b --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingOutputFile.java @@ -0,0 +1,126 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.google.common.io.CountingOutputStream; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoOutputFile; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; + +import static java.util.Objects.requireNonNull; + +/** + * Forwarding OutputFile implementation that delegates to Trino's TrinoOutputFile. + */ +public class ForwardingOutputFile + implements OutputFile +{ + private final TrinoFileSystem fileSystem; + private final TrinoOutputFile outputFile; + + public ForwardingOutputFile(TrinoFileSystem fileSystem, Location location) + { + this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); + this.outputFile = fileSystem.newOutputFile(location); + } + + @Override + public PositionOutputStream create() + { + try { + // Callers of this method don't have access to memory context, so we skip tracking memory here + return new CountingPositionOutputStream(outputFile.create()); + } + catch (IOException e) { + throw new UncheckedIOException("Failed to create file: " + location(), e); + } + } + + @Override + public PositionOutputStream createOrOverwrite() + { + // Iceberg never overwrites existing files. All callers use unique names. + return create(); + } + + @Override + public String location() + { + return outputFile.location().toString(); + } + + @Override + public InputFile toInputFile() + { + return new ForwardingInputFile(fileSystem.newInputFile(outputFile.location())); + } + + @Override + public String toString() + { + return outputFile.toString(); + } + + private static class CountingPositionOutputStream + extends PositionOutputStream + { + private final CountingOutputStream stream; + + private CountingPositionOutputStream(OutputStream stream) + { + this.stream = new CountingOutputStream(stream); + } + + @Override + public long getPos() + { + return stream.getCount(); + } + + @Override + public void write(int b) + throws IOException + { + stream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) + throws IOException + { + stream.write(b, off, len); + } + + @Override + public void flush() + throws IOException + { + stream.flush(); + } + + @Override + public void close() + throws IOException + { + stream.close(); + } + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingSeekableInputStream.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingSeekableInputStream.java new file mode 100644 index 000000000000..5218b7e90d1b --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/ForwardingSeekableInputStream.java @@ -0,0 +1,146 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import io.trino.filesystem.TrinoInputStream; +import org.apache.iceberg.io.SeekableInputStream; + +import java.io.IOException; +import java.io.OutputStream; + +import static java.util.Objects.requireNonNull; + +/** + * Forwarding SeekableInputStream implementation that delegates to Trino's TrinoInputStream. + */ +public class ForwardingSeekableInputStream + extends SeekableInputStream +{ + private final TrinoInputStream stream; + + public ForwardingSeekableInputStream(TrinoInputStream stream) + { + this.stream = requireNonNull(stream, "stream is null"); + } + + @Override + public long getPos() + throws IOException + { + return stream.getPosition(); + } + + @Override + public void seek(long pos) + throws IOException + { + stream.seek(pos); + } + + @Override + public int read() + throws IOException + { + return stream.read(); + } + + @Override + public int read(byte[] b) + throws IOException + { + return stream.read(b); + } + + @Override + public int read(byte[] b, int off, int len) + throws IOException + { + return stream.read(b, off, len); + } + + @Override + public byte[] readAllBytes() + throws IOException + { + return stream.readAllBytes(); + } + + @Override + public byte[] readNBytes(int len) + throws IOException + { + return stream.readNBytes(len); + } + + @Override + public int readNBytes(byte[] b, int off, int len) + throws IOException + { + return stream.readNBytes(b, off, len); + } + + @Override + public long skip(long n) + throws IOException + { + return stream.skip(n); + } + + @Override + public void skipNBytes(long n) + throws IOException + { + stream.skipNBytes(n); + } + + @Override + public int available() + throws IOException + { + return stream.available(); + } + + @Override + public void close() + throws IOException + { + stream.close(); + } + + @Override + public void mark(int readlimit) + { + stream.mark(readlimit); + } + + @Override + public void reset() + throws IOException + { + stream.reset(); + } + + @Override + public boolean markSupported() + { + return stream.markSupported(); + } + + @Override + public long transferTo(OutputStream out) + throws IOException + { + return stream.transferTo(out); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/NoneSecurityModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/NoneSecurityModule.java new file mode 100644 index 000000000000..302db5d43271 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/NoneSecurityModule.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.google.inject.Binder; +import io.airlift.configuration.AbstractConfigurationAwareModule; + +public class NoneSecurityModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + binder.bind(SecurityProperties.class).to(NoneSecurityProperties.class); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/NoneSecurityProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/NoneSecurityProperties.java new file mode 100644 index 000000000000..6b031e5f250d --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/NoneSecurityProperties.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +public class NoneSecurityProperties + implements SecurityProperties +{ + @Override + public Map get() + { + return ImmutableMap.of(); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/OAuth2SecurityConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/OAuth2SecurityConfig.java new file mode 100644 index 000000000000..574b97561bbf --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/OAuth2SecurityConfig.java @@ -0,0 +1,111 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.ConfigSecuritySensitive; +import jakarta.validation.constraints.AssertTrue; +import org.apache.iceberg.rest.auth.OAuth2Properties; + +import java.net.URI; +import java.util.Optional; + +public class OAuth2SecurityConfig +{ + private String credential; + private String scope; + private String token; + private URI serverUri; + private boolean tokenRefreshEnabled = OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT; + + public Optional getCredential() + { + return Optional.ofNullable(credential); + } + + @Config("polaris.oauth2.credential") + @ConfigDescription("The credential to exchange for a token in the OAuth2 client credentials flow with the server") + @ConfigSecuritySensitive + public OAuth2SecurityConfig setCredential(String credential) + { + this.credential = credential; + return this; + } + + public Optional getScope() + { + return Optional.ofNullable(scope); + } + + @Config("polaris.oauth2.scope") + @ConfigDescription("The scope which will be used for interactions with the server") + public OAuth2SecurityConfig setScope(String scope) + { + this.scope = scope; + return this; + } + + public Optional getToken() + { + return Optional.ofNullable(token); + } + + @Config("polaris.oauth2.token") + @ConfigDescription("The Bearer token which will be used for interactions with the server") + @ConfigSecuritySensitive + public OAuth2SecurityConfig setToken(String token) + { + this.token = token; + return this; + } + + public Optional getServerUri() + { + return Optional.ofNullable(serverUri); + } + + @Config("polaris.oauth2.server-uri") + @ConfigDescription("The endpoint to retrieve access token from OAuth2 Server") + public OAuth2SecurityConfig setServerUri(URI serverUri) + { + this.serverUri = serverUri; + return this; + } + + public boolean isTokenRefreshEnabled() + { + return tokenRefreshEnabled; + } + + @Config("polaris.oauth2.token-refresh-enabled") + @ConfigDescription("Controls whether a token should be refreshed if information about its expiration time is available") + public OAuth2SecurityConfig setTokenRefreshEnabled(boolean tokenRefreshEnabled) + { + this.tokenRefreshEnabled = tokenRefreshEnabled; + return this; + } + + @AssertTrue(message = "OAuth2 requires a credential or token") + public boolean credentialOrTokenPresent() + { + return credential != null || token != null; + } + + @AssertTrue(message = "Scope is applicable only when using credential") + public boolean scopePresentOnlyWithCredential() + { + return !(token != null && scope != null); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/OAuth2SecurityModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/OAuth2SecurityModule.java new file mode 100644 index 000000000000..e591630eec0a --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/OAuth2SecurityModule.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.google.inject.Binder; +import io.airlift.configuration.AbstractConfigurationAwareModule; + +import static io.airlift.configuration.ConfigBinder.configBinder; + +public class OAuth2SecurityModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(OAuth2SecurityConfig.class); + binder.bind(SecurityProperties.class).to(OAuth2SecurityProperties.class); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/OAuth2SecurityProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/OAuth2SecurityProperties.java new file mode 100644 index 000000000000..78b9ce326da6 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/OAuth2SecurityProperties.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.iceberg.rest.auth.AuthProperties; +import org.apache.iceberg.rest.auth.OAuth2Properties; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class OAuth2SecurityProperties + implements SecurityProperties +{ + private final Map securityProperties; + + @Inject + public OAuth2SecurityProperties(OAuth2SecurityConfig securityConfig) + { + requireNonNull(securityConfig, "securityConfig is null"); + + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); + propertiesBuilder.put(AuthProperties.AUTH_TYPE, AuthProperties.AUTH_TYPE_OAUTH2); + securityConfig.getCredential().ifPresent( + credential -> { + propertiesBuilder.put(OAuth2Properties.CREDENTIAL, credential); + securityConfig.getScope() + .ifPresent(scope -> propertiesBuilder.put(OAuth2Properties.SCOPE, scope)); + }); + securityConfig.getToken().ifPresent( + value -> propertiesBuilder.put(OAuth2Properties.TOKEN, value)); + securityConfig.getServerUri().ifPresent( + value -> propertiesBuilder.put(OAuth2Properties.OAUTH2_SERVER_URI, value.toString())); + propertiesBuilder.put(OAuth2Properties.TOKEN_REFRESH_ENABLED, String.valueOf(securityConfig.isTokenRefreshEnabled())); + + this.securityProperties = propertiesBuilder.buildOrThrow(); + } + + @Override + public Map get() + { + return securityProperties; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisAlreadyExistsException.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisAlreadyExistsException.java new file mode 100644 index 000000000000..409bd2da602f --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisAlreadyExistsException.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +/** + * Exception thrown when attempting to create a resource that already exists in Polaris. + * This typically corresponds to HTTP 409 responses from the Polaris API. + */ +public class PolarisAlreadyExistsException + extends PolarisException +{ + public PolarisAlreadyExistsException(String message) + { + super(message); + } + + public PolarisAlreadyExistsException(String message, Throwable cause) + { + super(message, cause); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisException.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisException.java new file mode 100644 index 000000000000..d33269acc434 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisException.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +/** + * Base exception for Polaris-related errors. + */ +public class PolarisException + extends RuntimeException +{ + public PolarisException(String message) + { + super(message); + } + + public PolarisException(String message, Throwable cause) + { + super(message, cause); + } + + public PolarisException(Throwable cause) + { + super(cause); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisGenericTable.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisGenericTable.java new file mode 100644 index 000000000000..c04fd6aabcb4 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisGenericTable.java @@ -0,0 +1,117 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** + * Represents a generic table in Polaris (non-Iceberg tables like Delta Lake, CSV, etc.). + * Based on the Polaris Generic Table API specification. + */ +public class PolarisGenericTable +{ + private final String name; + private final String format; + private final Optional baseLocation; + private final Optional doc; + private final Map properties; + + @JsonCreator + public PolarisGenericTable( + @JsonProperty("name") String name, + @JsonProperty("format") String format, + @JsonProperty("base-location") String baseLocation, + @JsonProperty("doc") String doc, + @JsonProperty("properties") Map properties) + { + this.name = requireNonNull(name, "name is null"); + this.format = requireNonNull(format, "format is null"); + this.baseLocation = Optional.ofNullable(baseLocation); + this.doc = Optional.ofNullable(doc); + this.properties = properties != null ? ImmutableMap.copyOf(properties) : ImmutableMap.of(); + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFormat() + { + return format; + } + + @JsonProperty("base-location") + public Optional getBaseLocation() + { + return baseLocation; + } + + @JsonProperty + public Optional getDoc() + { + return doc; + } + + @JsonProperty + public Map getProperties() + { + return properties; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + PolarisGenericTable that = (PolarisGenericTable) obj; + return Objects.equals(name, that.name) && + Objects.equals(format, that.format) && + Objects.equals(baseLocation, that.baseLocation) && + Objects.equals(doc, that.doc) && + Objects.equals(properties, that.properties); + } + + @Override + public int hashCode() + { + return Objects.hash(name, format, baseLocation, doc, properties); + } + + @Override + public String toString() + { + return "PolarisGenericTable{" + + "name='" + name + '\'' + + ", format='" + format + '\'' + + ", baseLocation=" + baseLocation + + ", doc=" + doc + + ", properties=" + properties + + '}'; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisHiveMetastore.java new file mode 100644 index 000000000000..0e93ac5fb5d8 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisHiveMetastore.java @@ -0,0 +1,903 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.inject.Inject; +import io.trino.metastore.AcidTransactionOwner; +import io.trino.metastore.Column; +import io.trino.metastore.Database; +import io.trino.metastore.HiveColumnStatistics; +import io.trino.metastore.HiveMetastore; +import io.trino.metastore.HivePartition; +import io.trino.metastore.HivePrincipal; +import io.trino.metastore.HivePrivilegeInfo; +import io.trino.metastore.HiveType; +import io.trino.metastore.Partition; +import io.trino.metastore.PartitionStatistics; +import io.trino.metastore.PartitionWithStatistics; +import io.trino.metastore.PrincipalPrivileges; +import io.trino.metastore.StatisticsUpdateMode; +import io.trino.metastore.StorageFormat; +import io.trino.metastore.Table; +import io.trino.metastore.TableInfo; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.function.LanguageFunction; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.security.PrincipalType; +import io.trino.spi.security.RoleGrant; +import io.trino.spi.statistics.ColumnStatisticType; +import io.trino.spi.type.Type; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.rest.RESTSessionCatalog; +import org.apache.iceberg.rest.auth.OAuth2Properties; +import org.apache.iceberg.types.Types; + +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; +import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.util.Objects.requireNonNull; + +public class PolarisHiveMetastore + implements HiveMetastore +{ + private final PolarisRestClient polarisClient; + private final RESTSessionCatalog restSessionCatalog; + private final SecurityProperties securityProperties; + + @Inject + public PolarisHiveMetastore(PolarisRestClient polarisClient, RESTSessionCatalog restSessionCatalog, SecurityProperties securityProperties) + { + this.polarisClient = requireNonNull(polarisClient, "polarisClient is null"); + this.restSessionCatalog = requireNonNull(restSessionCatalog, "restSessionCatalog is null"); + this.securityProperties = requireNonNull(securityProperties, "securityProperties is null"); + } + + @Override + public Optional getDatabase(String databaseName) + { + try { + // Check if namespace exists by trying to list namespaces and find this one + // This uses RESTSessionCatalog which handles OAuth2 properly and identically + List namespaces = polarisClient.listNamespaces(Optional.empty()); + System.out.println("DEBUG: Looking for database '" + databaseName + "' in namespaces: " + + namespaces.stream().map(PolarisNamespace::getName).collect(toImmutableList())); + + Optional namespace = namespaces.stream() + .filter(ns -> ns.getName().equals(databaseName)) + .findFirst(); + + if (namespace.isPresent()) { + System.out.println("DEBUG: Found database: " + namespace.get().getName()); + return Optional.of(Database.builder() + .setDatabaseName(namespace.get().getName()) + .setOwnerName(Optional.of("trino-user")) + .setOwnerType(Optional.of(PrincipalType.USER)) + .setParameters(namespace.get().getProperties()) + .build()); + } + System.out.println("DEBUG: Database '" + databaseName + "' not found"); + return Optional.empty(); + } + catch (RuntimeException e) { + System.out.println("DEBUG: Exception in getDatabase for '" + databaseName + "': " + + e.getClass().getSimpleName() + ": " + e.getMessage()); + throw new TrinoException(HIVE_METASTORE_ERROR, "Failed to get database: " + databaseName, e); + } + } + + @Override + public List getAllDatabases() + { + try { + return polarisClient.listNamespaces(Optional.empty()) + .stream() + .map(PolarisNamespace::getName) + .collect(toImmutableList()); + } + catch (RuntimeException e) { + throw new TrinoException(HIVE_METASTORE_ERROR, "Failed to list databases", e); + } + } + + @Override + public Optional getTable(String databaseName, String tableName) + { + try { + // First, try to load as an Iceberg table using RESTSessionCatalog + TableIdentifier tableId = TableIdentifier.of(databaseName, tableName); + SessionCatalog.SessionContext sessionContext = createSessionContext(); + org.apache.iceberg.Table icebergTable = restSessionCatalog.loadTable(sessionContext, tableId); + + // Convert Iceberg table to Hive representation + return Optional.of(convertIcebergToHiveTable(databaseName, tableName, icebergTable)); + } + catch (Exception e) { + // Iceberg table loading failed, try as a generic table + System.out.println("DEBUG: Iceberg table loading failed for " + databaseName + "." + tableName + + ", trying generic table API. Exception: " + e.getClass().getSimpleName() + ": " + e.getMessage()); + + try { + PolarisGenericTable genericTable = polarisClient.loadGenericTable(databaseName, tableName); + System.out.println("DEBUG: Successfully loaded as generic table: " + databaseName + "." + tableName); + return Optional.of(convertGenericToHiveTable(databaseName, genericTable)); + } + catch (Exception ex) { + System.out.println("DEBUG: Generic table loading also failed for " + databaseName + "." + tableName + + ". Exception: " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + return Optional.empty(); + } + } + } + + /** + * Creates session context using the same credentials as RESTSessionCatalog + * This ensures consistent OAuth2 authentication across all operations + */ + private SessionCatalog.SessionContext createSessionContext() + { + String sessionId = UUID.randomUUID().toString(); + + // Extract OAuth2 credentials exactly like TrinoIcebergRestCatalogFactory does + Map securityProps = securityProperties.get(); + Map credentials = ImmutableMap.builder() + .putAll(Maps.filterKeys(securityProps, + key -> Set.of(OAuth2Properties.TOKEN, OAuth2Properties.CREDENTIAL).contains(key))) + .buildOrThrow(); + + Map properties = ImmutableMap.of(); + + return new SessionCatalog.SessionContext(sessionId, "trino-user", credentials, properties, null); + } + + public Set getSupportedColumnStatistics(Type type) + { + return ImmutableSet.of(); + } + + public PartitionStatistics getTableStatistics(String databaseName, String tableName) + { + return PartitionStatistics.empty(); + } + + public Map getPartitionStatistics(String databaseName, String tableName, Set partitionNames) + { + return ImmutableMap.of(); + } + + @Override + public void updateTableStatistics(String databaseName, String tableName, OptionalLong acidWriteId, StatisticsUpdateMode mode, PartitionStatistics statisticsUpdate) + { + throw new UnsupportedOperationException("Table statistics updates are not supported by Polaris"); + } + + @Override + public void updatePartitionStatistics(Table table, StatisticsUpdateMode mode, Map partitionUpdates) + { + throw new UnsupportedOperationException("Partition statistics updates are not supported by Polaris"); + } + + public List getAllTables(String databaseName) + { + try { + // Get both Iceberg and generic tables + List icebergTables = polarisClient.listIcebergTables(databaseName) + .stream() + .map(PolarisTableIdentifier::getName) + .collect(toImmutableList()); + + List genericTables = polarisClient.listGenericTables(databaseName) + .stream() + .map(PolarisTableIdentifier::getName) + .collect(toImmutableList()); + + return ImmutableList.builder() + .addAll(icebergTables) + .addAll(genericTables) + .build(); + } + catch (RuntimeException e) { + throw new TrinoException(HIVE_METASTORE_ERROR, "Failed to list tables in database: " + databaseName, e); + } + } + + @Override + public List getTables(String databaseName) + { + return getAllTables(databaseName).stream() + .map(tableName -> new TableInfo(new SchemaTableName(databaseName, tableName), TableInfo.ExtendedRelationType.TABLE)) + .collect(toImmutableList()); + } + + @Override + public List getTableNamesWithParameters(String databaseName, String parameterKey, Set parameterValues) + { + return ImmutableList.of(); + } + + public List getAllViews(String databaseName) + { + // TODO: implement view support, the Polaris API supports it. + return ImmutableList.of(); + } + + @Override + public void createDatabase(Database database) + { + try { + PolarisNamespace namespace = new PolarisNamespace(database.getDatabaseName(), database.getParameters()); + polarisClient.createNamespace(namespace); + } + catch (RuntimeException e) { + throw new TrinoException(HIVE_METASTORE_ERROR, "Failed to create database: " + database.getDatabaseName(), e); + } + } + + @Override + public void dropDatabase(String databaseName, boolean deleteData) + { + throw new TrinoException(NOT_SUPPORTED, "Drop database is not supported by Polaris"); + } + + @Override + public void renameDatabase(String databaseName, String newDatabaseName) + { + throw new TrinoException(NOT_SUPPORTED, "Rename database is not supported by Polaris"); + } + + @Override + public void setDatabaseOwner(String databaseName, HivePrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "Database ownership is not supported by Polaris"); + } + + @Override + public void createTable(Table table, PrincipalPrivileges principalPrivileges) + { + try { + String databaseName = table.getDatabaseName(); + String tableName = table.getTableName(); + + // Detect table format based on table parameters + String tableFormat = detectTableFormat(table); + + switch (tableFormat) { + case "ICEBERG" -> createIcebergTable(databaseName, tableName, table); + case "DELTA" -> createGenericTable(databaseName, table); + default -> throw new TrinoException(NOT_SUPPORTED, "Unsupported table format: " + tableFormat); + } + } + catch (PolarisAlreadyExistsException e) { + throw new TrinoException(ALREADY_EXISTS, "Table already exists: " + table.getDatabaseName() + "." + table.getTableName(), e); + } + catch (RuntimeException e) { + throw new TrinoException(HIVE_METASTORE_ERROR, "Failed to create table: " + table.getDatabaseName() + "." + table.getTableName(), e); + } + } + + private String detectTableFormat(Table table) + { + Map parameters = table.getParameters(); + + // Check for Iceberg table indicators + if (parameters.containsKey("table_type") && "ICEBERG".equals(parameters.get("table_type"))) { + return "ICEBERG"; + } + if (parameters.containsKey("metadata_location")) { + return "ICEBERG"; + } + + // Check for Delta Lake table indicators + if (parameters.containsKey("spark.sql.sources.provider") && "DELTA".equals(parameters.get("spark.sql.sources.provider"))) { + return "DELTA"; + } + if (parameters.containsKey("spark.sql.sources.provider") && "delta".equalsIgnoreCase(parameters.get("spark.sql.sources.provider"))) { + return "DELTA"; + } + + // Check for CSV format + if (parameters.containsKey("format") && "csv".equalsIgnoreCase(parameters.get("format"))) { + return "CSV"; + } + + // For Polaris catalog, default to Iceberg tables (since Polaris is Iceberg-native) + // Generic tables (Delta, CSV) should be explicitly specified + return "ICEBERG"; + } + + private void createIcebergTable(String databaseName, String tableName, Table table) + { + Schema icebergSchema = convertHiveToIcebergSchema(table.getDataColumns()); + TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); + SessionCatalog.SessionContext sessionContext = createSessionContext(); + Map properties = table.getParameters(); + String location = table.getStorage().getLocation(); + + // Use the RESTSessionCatalog to build and create the table + Transaction transaction = restSessionCatalog.buildTable(sessionContext, tableIdentifier, icebergSchema) + .withLocation(location) + .withProperties(properties) + .createTransaction(); + + transaction.commitTransaction(); + } + + private void createGenericTable(String databaseName, Table table) + { + // For Delta Lake and other generic tables, use the generic table API + PolarisGenericTable genericTable = convertHiveToGenericTable(table); + polarisClient.createGenericTable(databaseName, genericTable); + } + + private PolarisGenericTable convertHiveToGenericTable(Table table) + { + String location = table.getStorage().getLocation(); + Map parameters = table.getParameters(); + + // Determine format from table properties + String format = detectTableFormat(table).toLowerCase(Locale.ROOT); + + // Extract comment/description + String doc = table.getParameters().get("comment"); + + // Copy all table properties except internal ones + ImmutableMap.Builder properties = ImmutableMap.builder(); + parameters.forEach((key, value) -> { + if (!key.startsWith("trino.") && !key.equals("comment")) { + properties.put(key, value); + } + }); + + return new PolarisGenericTable( + table.getTableName(), + format, + location, + doc, + properties.buildOrThrow()); + } + + private Schema convertHiveToIcebergSchema(List columns) + { + AtomicInteger columnId = new AtomicInteger(1); + List icebergColumns = columns.stream() + .map(column -> Types.NestedField.optional( + columnId.getAndIncrement(), + column.getName(), + convertHiveTypeToIcebergType(column.getType()), + column.getComment().orElse(null))) + .collect(toImmutableList()); + return new Schema(icebergColumns); + } + + private org.apache.iceberg.types.Type convertHiveTypeToIcebergType(HiveType hiveType) + { + String typeName = hiveType.getHiveTypeName().toString().toLowerCase(Locale.ROOT); + return switch (typeName) { + case "boolean" -> Types.BooleanType.get(); + case "tinyint", "smallint", "int" -> Types.IntegerType.get(); + case "bigint" -> Types.LongType.get(); + case "float" -> Types.FloatType.get(); + case "double" -> Types.DoubleType.get(); + case "string" -> Types.StringType.get(); + case "binary" -> Types.BinaryType.get(); + case "date" -> Types.DateType.get(); + case "timestamp" -> Types.TimestampType.withZone(); + default -> Types.StringType.get(); // Default fallback + }; + } + + @Override + public void dropTable(String databaseName, String tableName, boolean deleteData) + { + try { + // Try to drop as Iceberg table first + try { + SessionCatalog.SessionContext sessionContext = createSessionContext(); + TableIdentifier tableId = TableIdentifier.of(databaseName, tableName); + if (deleteData) { + restSessionCatalog.purgeTable(sessionContext, tableId); + } + else { + restSessionCatalog.dropTable(sessionContext, tableId); + } + return; + } + catch (NoSuchTableException ignored) { + // Fall through to try generic table + } + + // Try to drop as generic table + polarisClient.dropGenericTable(databaseName, tableName); + } + catch (PolarisNotFoundException e) { + throw new TableNotFoundException(new SchemaTableName(databaseName, tableName)); + } + catch (RuntimeException e) { + throw new TrinoException(HIVE_METASTORE_ERROR, "Failed to drop table: " + databaseName + "." + tableName, e); + } + } + + @Override + public void replaceTable(String databaseName, String tableName, Table newTable, PrincipalPrivileges principalPrivileges, Map environmentContext) + { + throw new TrinoException(NOT_SUPPORTED, "Replace table is not supported by Polaris"); + } + + @Override + public void renameTable(String databaseName, String tableName, String newDatabaseName, String newTableName) + { + try { + // Try to rename as Iceberg table first + try { + SessionCatalog.SessionContext sessionContext = createSessionContext(); + TableIdentifier from = TableIdentifier.of(databaseName, tableName); + TableIdentifier to = TableIdentifier.of(newDatabaseName, newTableName); + restSessionCatalog.renameTable(sessionContext, from, to); + return; + } + catch (NoSuchTableException ignored) { + // Fall through - table might be a generic table + } + + // Generic table rename is not supported by Polaris API + throw new TrinoException(NOT_SUPPORTED, "Rename table is not supported for Delta Lake tables in Polaris"); + } + catch (RuntimeException e) { + throw new TrinoException(HIVE_METASTORE_ERROR, "Failed to rename table: " + databaseName + "." + tableName, e); + } + } + + @Override + public void commentTable(String databaseName, String tableName, Optional comment) + { + throw new TrinoException(NOT_SUPPORTED, "Table comments are not supported by Polaris"); + } + + @Override + public void setTableOwner(String databaseName, String tableName, HivePrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "Table ownership is not supported by Polaris"); + } + + @Override + public void commentColumn(String databaseName, String tableName, String columnName, Optional comment) + { + throw new TrinoException(NOT_SUPPORTED, "Column comments are not supported by Polaris"); + } + + // Partition operations - handled by format-specific connectors + @Override + public Optional getPartition(Table table, List partitionValues) + { + return Optional.empty(); + } + + public Optional> getPartitionNames(String databaseName, String tableName) + { + return Optional.empty(); + } + + @Override + public Optional> getPartitionNamesByFilter(String databaseName, String tableName, List columnNames, TupleDomain partitionKeysFilter) + { + return Optional.empty(); + } + + @Override + public Map> getPartitionsByNames(Table table, List partitionNames) + { + return ImmutableMap.of(); + } + + @Override + public void addPartitions(String databaseName, String tableName, List partitions) + { + throw new UnsupportedOperationException("Partition management is handled by format-specific connectors"); + } + + @Override + public void dropPartition(String databaseName, String tableName, List parts, boolean deleteData) + { + throw new UnsupportedOperationException("Partition management is handled by format-specific connectors"); + } + + @Override + public void alterPartition(String databaseName, String tableName, PartitionWithStatistics partition) + { + throw new UnsupportedOperationException("Partition management is handled by format-specific connectors"); + } + + // Role and privilege operations - not supported by Polaris + @Override + public void createRole(String role, String grantor) + { + throw new UnsupportedOperationException("Role management is not supported by Polaris"); + } + + @Override + public void dropRole(String role) + { + throw new UnsupportedOperationException("Role management is not supported by Polaris"); + } + + @Override + public Set listRoles() + { + return ImmutableSet.of(); + } + + @Override + public void grantRoles(Set roles, Set grantees, boolean adminOption, HivePrincipal grantor) + { + throw new UnsupportedOperationException("Role management is not supported by Polaris"); + } + + @Override + public void revokeRoles(Set roles, Set grantees, boolean adminOption, HivePrincipal grantor) + { + throw new UnsupportedOperationException("Role management is not supported by Polaris"); + } + + public Set listGrantedPrincipals(String role) + { + return ImmutableSet.of(); + } + + @Override + public Set listRoleGrants(HivePrincipal principal) + { + return ImmutableSet.of(); + } + + // Transaction operations - not supported by Polaris + @Override + public Optional getConfigValue(String name) + { + return Optional.empty(); + } + + @Override + public long openTransaction(AcidTransactionOwner transactionOwner) + { + throw new UnsupportedOperationException("ACID transactions are not supported by Polaris"); + } + + @Override + public void commitTransaction(long transactionId) + { + throw new UnsupportedOperationException("ACID transactions are not supported by Polaris"); + } + + @Override + public void abortTransaction(long transactionId) + { + throw new UnsupportedOperationException("ACID transactions are not supported by Polaris"); + } + + @Override + public void sendTransactionHeartbeat(long transactionId) + { + throw new UnsupportedOperationException("ACID transactions are not supported by Polaris"); + } + + @Override + public void acquireSharedReadLock(AcidTransactionOwner transactionOwner, String queryId, long transactionId, List fullTables, List partitions) + { + // No-op for Polaris + } + + @Override + public String getValidWriteIds(List tables, long currentTransactionId) + { + throw new UnsupportedOperationException("ACID transactions are not supported by Polaris"); + } + + // Function operations - not supported by Polaris + @Override + public boolean functionExists(String databaseName, String functionName, String signatureToken) + { + return false; + } + + @Override + public Collection getFunctions(String databaseName, String functionName) + { + return ImmutableList.of(); + } + + @Override + public Collection getAllFunctions(String databaseName) + { + return ImmutableList.of(); + } + + @Override + public void createFunction(String databaseName, String functionName, LanguageFunction function) + { + throw new UnsupportedOperationException("Function management is not supported by Polaris"); + } + + public void alterFunction(String databaseName, String functionName, LanguageFunction function) + { + throw new UnsupportedOperationException("Function management is not supported by Polaris"); + } + + @Override + public void dropFunction(String databaseName, String functionName, String signatureToken) + { + throw new UnsupportedOperationException("Function management is not supported by Polaris"); + } + + @Override + public void replaceFunction(String databaseName, String functionName, LanguageFunction function) + { + throw new UnsupportedOperationException("Function management is not supported by Polaris"); + } + + // Column statistics operations - not supported yet + @Override + public Map getTableColumnStatistics(String databaseName, String tableName, Set columnNames) + { + return ImmutableMap.of(); + } + + @Override + public Map> getPartitionColumnStatistics(String databaseName, String tableName, Set partitionNames, Set columnNames) + { + return ImmutableMap.of(); + } + + // Schema modification operations - not supported + @Override + public void addColumn(String databaseName, String tableName, String columnName, HiveType columnType, String columnComment) + { + throw new TrinoException(NOT_SUPPORTED, "Add column is not supported by Polaris"); + } + + @Override + public void renameColumn(String databaseName, String tableName, String oldColumnName, String newColumnName) + { + throw new TrinoException(NOT_SUPPORTED, "Rename column is not supported by Polaris"); + } + + @Override + public void dropColumn(String databaseName, String tableName, String columnName) + { + throw new TrinoException(NOT_SUPPORTED, "Drop column is not supported by Polaris"); + } + + // Privilege operations - not supported + @Override + public void grantTablePrivileges(String databaseName, String tableName, String tableOwner, HivePrincipal grantee, HivePrincipal grantor, Set privileges, boolean grantOption) + { + throw new UnsupportedOperationException("Table privileges are not supported by Polaris"); + } + + @Override + public void revokeTablePrivileges(String databaseName, String tableName, String tableOwner, HivePrincipal grantee, HivePrincipal grantor, Set privileges, boolean grantOption) + { + throw new UnsupportedOperationException("Table privileges are not supported by Polaris"); + } + + @Override + public Set listTablePrivileges(String databaseName, String tableName, Optional tableOwner, Optional principal) + { + return ImmutableSet.of(); + } + + // Helper methods for converting Polaris tables to Hive tables + private Table convertIcebergToHiveTable(String databaseName, String tableName, org.apache.iceberg.Table icebergTable) + { + // Convert Iceberg schema to Hive columns + List hiveColumns = convertIcebergSchemaToHiveColumns(icebergTable.schema()); + + // Filter out Iceberg-specific properties to make it look like a regular Hive table + ImmutableMap.Builder cleanParametersBuilder = ImmutableMap.builder(); + icebergTable.properties().entrySet().stream() + .filter(entry -> !isIcebergSpecificProperty(entry.getKey())) + .forEach(entry -> cleanParametersBuilder.put(entry.getKey(), entry.getValue())); + Map cleanParameters = cleanParametersBuilder.buildOrThrow(); + + // Get the data location from the current snapshot + String dataLocation = getDataLocationFromSnapshot(icebergTable); + + // Create a standard Parquet external table + return Table.builder() + .setDatabaseName(databaseName) + .setTableName(tableName) + .setOwner(Optional.of("trino-user")) + .setTableType("EXTERNAL_TABLE") + .setDataColumns(hiveColumns) // Real columns from Iceberg schema + .setParameters(cleanParameters) // Clean parameters without Iceberg markers + .withStorage(storage -> storage + .setLocation(dataLocation) // Use actual data location, not base table location + .setStorageFormat(StorageFormat.create( + "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) + .setSerdeParameters(ImmutableMap.of())) + .build(); + } + + /** + * Extract the data location from the current Iceberg snapshot. + * For unpartitioned tables, this typically points to the data directory. + */ + private String getDataLocationFromSnapshot(org.apache.iceberg.Table icebergTable) + { + try { + // Get current snapshot + var currentSnapshot = icebergTable.currentSnapshot(); + if (currentSnapshot == null) { + // No data yet, return base location + return icebergTable.location(); + } + + // For unpartitioned tables, we can use the base location + "/data" + // This is the standard Iceberg convention + String baseLocation = icebergTable.location(); + if (!baseLocation.endsWith("/")) { + baseLocation += "/"; + } + return baseLocation + "data"; + } + catch (Exception e) { + // Fallback to base location if snapshot reading fails + return icebergTable.location(); + } + } + + /** + * Convert Iceberg schema to Hive columns + */ + private List convertIcebergSchemaToHiveColumns(Schema icebergSchema) + { + return icebergSchema.columns().stream() + .map(field -> new Column( + field.name(), + convertIcebergFieldTypeToHiveType(field.type()), + Optional.ofNullable(field.doc()), + ImmutableMap.of())) + .collect(toImmutableList()); + } + + /** + * Convert Iceberg field type to Hive type (reverse of existing convertHiveTypeToIcebergType) + */ + private HiveType convertIcebergFieldTypeToHiveType(org.apache.iceberg.types.Type icebergType) + { + return switch (icebergType.typeId()) { + case BOOLEAN -> HiveType.HIVE_BOOLEAN; + case INTEGER -> HiveType.HIVE_INT; + case LONG -> HiveType.HIVE_LONG; + case FLOAT -> HiveType.HIVE_FLOAT; + case DOUBLE -> HiveType.HIVE_DOUBLE; + case STRING -> HiveType.HIVE_STRING; + case BINARY -> HiveType.HIVE_BINARY; + case DATE -> HiveType.HIVE_DATE; + case TIMESTAMP -> HiveType.HIVE_TIMESTAMP; + default -> HiveType.HIVE_STRING; // Fallback for complex types + }; + } + + /** + * Check if a property is Iceberg-specific and should be filtered out + */ + private boolean isIcebergSpecificProperty(String key) + { + return key.equals("table_type") || + key.equals("metadata_location") || + key.startsWith("write.") || + key.startsWith("commit.") || + key.startsWith("snapshot.") || + key.startsWith("current-snapshot-id") || + key.startsWith("format-version"); + } + + private Table convertGenericToHiveTable(String databaseName, PolarisGenericTable genericTable) + { + // For Delta tables, create basic columns since we don't have detailed schema from Polaris + // In a real implementation, you'd want to read the Delta table metadata to get the actual schema + List columns; + if ("delta".equalsIgnoreCase(genericTable.getFormat())) { + // Create some basic columns that are common in Delta tables + // TODO: Ideally, read the actual Delta table schema from _delta_log + columns = ImmutableList.of( + new Column("id", HiveType.HIVE_INT, Optional.empty(), ImmutableMap.of()), + new Column("name", HiveType.HIVE_STRING, Optional.empty(), ImmutableMap.of()), + new Column("value", HiveType.HIVE_STRING, Optional.empty(), ImmutableMap.of())); + } + else { + columns = ImmutableList.of(); + } + + // Use Parquet for Delta tables + StorageFormat storageFormat; + if ("delta".equalsIgnoreCase(genericTable.getFormat())) { + storageFormat = StorageFormat.create( + "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"); + } + else { + storageFormat = switch (genericTable.getFormat().toLowerCase(Locale.ROOT)) { + case "csv" -> StorageFormat.create( + "org.apache.hadoop.hive.serde2.OpenCSVSerde", + "org.apache.hadoop.mapred.TextInputFormat", + "org.apache.hadoop.mapred.HiveIgnoreKeyTextOutputFormat"); + default -> StorageFormat.create( + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + "org.apache.hadoop.mapred.FileInputFormat", + "org.apache.hadoop.mapred.FileOutputFormat"); + }; + } + + ImmutableMap.Builder parameters = ImmutableMap.builder(); + genericTable.getProperties().entrySet().stream() + .filter(entry -> !isDeltaSpecificProperty(entry.getKey())) + .forEach(entry -> parameters.put(entry.getKey(), entry.getValue())); + + // Add comment if present + genericTable.getDoc().ifPresent(doc -> parameters.put("comment", doc)); + + Table.Builder tableBuilder = Table.builder() + .setDatabaseName(databaseName) + .setTableName(genericTable.getName()) + .setOwner(Optional.of("trino-user")) + .setTableType("EXTERNAL_TABLE") + .setDataColumns(columns) + .setParameters(parameters.buildOrThrow()); + + // Set storage information + genericTable.getBaseLocation().ifPresent(location -> + tableBuilder.withStorage(storage -> storage + .setLocation(location) + .setStorageFormat(storageFormat) + .setSerdeParameters(ImmutableMap.of()))); + + return tableBuilder.build(); + } + + /** + * Check if a property is Delta-specific and should be filtered out + */ + private boolean isDeltaSpecificProperty(String key) + { + return key.equals("format") || + key.equals("spark.sql.sources.provider") || + key.startsWith("delta.") || + key.equals("table_type") || + key.equals("has_delta_log"); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisHiveMetastoreFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisHiveMetastoreFactory.java new file mode 100644 index 000000000000..a9af97bc5ef6 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisHiveMetastoreFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.google.inject.Inject; +import io.trino.metastore.HiveMetastore; +import io.trino.metastore.HiveMetastoreFactory; +import io.trino.spi.security.ConnectorIdentity; +import org.apache.iceberg.rest.RESTSessionCatalog; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class PolarisHiveMetastoreFactory + implements HiveMetastoreFactory +{ + private final PolarisRestClient polarisClient; + private final RESTSessionCatalog restSessionCatalog; + private final SecurityProperties securityProperties; + + @Inject + public PolarisHiveMetastoreFactory(PolarisRestClient polarisClient, RESTSessionCatalog restSessionCatalog, SecurityProperties securityProperties) + { + this.polarisClient = requireNonNull(polarisClient, "polarisClient is null"); + this.restSessionCatalog = requireNonNull(restSessionCatalog, "restSessionCatalog is null"); + this.securityProperties = requireNonNull(securityProperties, "securityProperties is null"); + } + + @Override + public HiveMetastore createMetastore(Optional identity) + { + return new PolarisHiveMetastore(polarisClient, restSessionCatalog, securityProperties); + } + + @Override + public boolean isImpersonationEnabled() + { + return false; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisMetastoreConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisMetastoreConfig.java new file mode 100644 index 000000000000..83b219fe1570 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisMetastoreConfig.java @@ -0,0 +1,286 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.units.Duration; +import io.airlift.units.MinDuration; +import jakarta.validation.constraints.NotNull; + +import java.net.URI; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +/** + * Configuration for Apache Polaris metastore backend. + */ +public class PolarisMetastoreConfig +{ + public enum Security + { + NONE, + OAUTH2 + } + + private URI uri; + private String prefix = ""; + private String warehouse; + private Duration requestTimeout = new Duration(30, TimeUnit.SECONDS); + private Duration connectTimeout = new Duration(10, TimeUnit.SECONDS); + private int maxRetries = 3; + private Duration retryDelay = new Duration(1, TimeUnit.SECONDS); + + // Security configuration + private Security security = Security.NONE; + + // SSL configuration + private boolean verifySSL = true; + private String trustStorePath; + private String trustStorePassword; + private String keyStorePath; + private String keyStorePassword; + + // Generic table support + private boolean enableGenericTables = true; + private String defaultGenericTableFormat = "delta"; + + // Policy integration + private boolean enablePolicyIntegration = true; + private boolean enforcePolicies; + + @NotNull + public URI getUri() + { + return uri; + } + + @Config("polaris.uri") + @ConfigDescription("URI of the Polaris catalog server") + public PolarisMetastoreConfig setUri(URI uri) + { + this.uri = uri; + return this; + } + + public String getPrefix() + { + return prefix; + } + + @Config("polaris.prefix") + @ConfigDescription("Optional prefix for all API requests") + public PolarisMetastoreConfig setPrefix(String prefix) + { + this.prefix = prefix; + return this; + } + + public Optional getWarehouse() + { + return Optional.ofNullable(warehouse); + } + + @Config("polaris.warehouse") + @ConfigDescription("Default warehouse location for tables") + public PolarisMetastoreConfig setWarehouse(String warehouse) + { + this.warehouse = warehouse; + return this; + } + + @MinDuration("1s") + public Duration getRequestTimeout() + { + return requestTimeout; + } + + @Config("polaris.request-timeout") + @ConfigDescription("Timeout for HTTP requests to Polaris") + public PolarisMetastoreConfig setRequestTimeout(Duration requestTimeout) + { + this.requestTimeout = requestTimeout; + return this; + } + + @MinDuration("1s") + public Duration getConnectTimeout() + { + return connectTimeout; + } + + @Config("polaris.connect-timeout") + @ConfigDescription("Timeout for HTTP connections to Polaris") + public PolarisMetastoreConfig setConnectTimeout(Duration connectTimeout) + { + this.connectTimeout = connectTimeout; + return this; + } + + public int getMaxRetries() + { + return maxRetries; + } + + @Config("polaris.max-retries") + @ConfigDescription("Maximum number of retry attempts for failed requests") + public PolarisMetastoreConfig setMaxRetries(int maxRetries) + { + this.maxRetries = maxRetries; + return this; + } + + public Duration getRetryDelay() + { + return retryDelay; + } + + @Config("polaris.retry-delay") + @ConfigDescription("Initial delay between retry attempts") + public PolarisMetastoreConfig setRetryDelay(Duration retryDelay) + { + this.retryDelay = retryDelay; + return this; + } + + public Security getSecurity() + { + return security; + } + + @Config("polaris.security") + @ConfigDescription("Security type: NONE or OAUTH2") + public PolarisMetastoreConfig setSecurity(Security security) + { + this.security = security; + return this; + } + + public boolean isVerifySSL() + { + return verifySSL; + } + + @Config("polaris.ssl.verify") + @ConfigDescription("Whether to verify SSL certificates") + public PolarisMetastoreConfig setVerifySSL(boolean verifySSL) + { + this.verifySSL = verifySSL; + return this; + } + + public Optional getTrustStorePath() + { + return Optional.ofNullable(trustStorePath); + } + + @Config("polaris.ssl.trust-store-path") + @ConfigDescription("Path to SSL trust store") + public PolarisMetastoreConfig setTrustStorePath(String trustStorePath) + { + this.trustStorePath = trustStorePath; + return this; + } + + public Optional getTrustStorePassword() + { + return Optional.ofNullable(trustStorePassword); + } + + @Config("polaris.ssl.trust-store-password") + @ConfigDescription("Password for SSL trust store") + public PolarisMetastoreConfig setTrustStorePassword(String trustStorePassword) + { + this.trustStorePassword = trustStorePassword; + return this; + } + + public Optional getKeyStorePath() + { + return Optional.ofNullable(keyStorePath); + } + + @Config("polaris.ssl.key-store-path") + @ConfigDescription("Path to SSL key store") + public PolarisMetastoreConfig setKeyStorePath(String keyStorePath) + { + this.keyStorePath = keyStorePath; + return this; + } + + public Optional getKeyStorePassword() + { + return Optional.ofNullable(keyStorePassword); + } + + @Config("polaris.ssl.key-store-password") + @ConfigDescription("Password for SSL key store") + public PolarisMetastoreConfig setKeyStorePassword(String keyStorePassword) + { + this.keyStorePassword = keyStorePassword; + return this; + } + + public boolean isEnableGenericTables() + { + return enableGenericTables; + } + + @Config("polaris.generic-tables.enabled") + @ConfigDescription("Enable support for generic tables (Delta Lake, etc.)") + public PolarisMetastoreConfig setEnableGenericTables(boolean enableGenericTables) + { + this.enableGenericTables = enableGenericTables; + return this; + } + + public String getDefaultGenericTableFormat() + { + return defaultGenericTableFormat; + } + + @Config("polaris.generic-tables.default-format") + @ConfigDescription("Default format for generic tables when not specified") + public PolarisMetastoreConfig setDefaultGenericTableFormat(String defaultGenericTableFormat) + { + this.defaultGenericTableFormat = defaultGenericTableFormat; + return this; + } + + public boolean isEnablePolicyIntegration() + { + return enablePolicyIntegration; + } + + @Config("polaris.policies.enabled") + @ConfigDescription("Enable integration with Polaris policy system") + public PolarisMetastoreConfig setEnablePolicyIntegration(boolean enablePolicyIntegration) + { + this.enablePolicyIntegration = enablePolicyIntegration; + return this; + } + + public boolean isEnforcePolicies() + { + return enforcePolicies; + } + + @Config("polaris.policies.enforce") + @ConfigDescription("Whether to enforce policies at the metastore level") + public PolarisMetastoreConfig setEnforcePolicies(boolean enforcePolicies) + { + this.enforcePolicies = enforcePolicies; + return this; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisMetastoreModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisMetastoreModule.java new file mode 100644 index 000000000000..58f9e018c00f --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisMetastoreModule.java @@ -0,0 +1,115 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Key; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.metastore.HiveMetastoreFactory; +import io.trino.metastore.RawHiveMetastoreFactory; +import io.trino.plugin.hive.AllowHiveTableRename; +import io.trino.spi.security.ConnectorIdentity; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTSessionCatalog; +import org.apache.iceberg.rest.RESTUtil; + +import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.airlift.http.client.HttpClientBinder.httpClientBinder; +import static java.util.Objects.requireNonNull; + +public class PolarisMetastoreModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + // 1. Load configuration for immediate validation and conditional module installation + PolarisMetastoreConfig polarisConfig = buildConfigObject(PolarisMetastoreConfig.class); + requireNonNull(polarisConfig.getUri(), "polaris.uri is required"); + + // 2. Bind configuration for runtime injection + configBinder(binder).bindConfig(PolarisMetastoreConfig.class); + + // 3. Install security module based on configuration + switch (polarisConfig.getSecurity()) { + case OAUTH2 -> { + configBinder(binder).bindConfig(OAuth2SecurityConfig.class); + install(new OAuth2SecurityModule()); + } + case NONE -> install(new NoneSecurityModule()); + } + + // 4. Bind AWS properties (currently none, but ready for future SigV4 support) + binder.bind(AwsProperties.class).to(DefaultAwsProperties.class).in(Scopes.SINGLETON); + + // 5. Bind HTTP client for Polaris API calls + httpClientBinder(binder).bindHttpClient("polaris", ForPolarisClient.class); + + // 6. Bind core components + binder.bind(PolarisRestClient.class).in(Scopes.SINGLETON); + + binder.bind(HiveMetastoreFactory.class) + .annotatedWith(RawHiveMetastoreFactory.class) + .to(PolarisHiveMetastoreFactory.class) + .in(Scopes.SINGLETON); + + // 7. Bind standard Hive settings + binder.bind(Key.get(boolean.class, AllowHiveTableRename.class)).toInstance(true); + } + + @Provides + @Singleton + public RESTSessionCatalog createRESTSessionCatalog( + PolarisMetastoreConfig config, + SecurityProperties securityProperties, + AwsProperties awsProperties, + TrinoFileSystemFactory fileSystemFactory) + { + // Build properties map following Iceberg REST catalog pattern + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.put(CatalogProperties.URI, config.getUri().toString()); + properties.put("prefix", config.getPrefix()); + + // Add warehouse if specified + config.getWarehouse().ifPresent(warehouse -> properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse)); + + // Add security properties from the injected SecurityProperties + properties.putAll(securityProperties.get()); + + // Add AWS properties (empty for now, but ready for future SigV4 support) + properties.putAll(awsProperties.get()); + + // Create RESTSessionCatalog with HTTP client and FileIO factories to avoid Hadoop dependencies + RESTSessionCatalog catalog = new RESTSessionCatalog( + httpConfig -> HTTPClient.builder(httpConfig) + .uri(httpConfig.get(CatalogProperties.URI)) + .withHeaders(RESTUtil.configHeaders(httpConfig)) + .build(), + (context, ioConfig) -> { + ConnectorIdentity currentIdentity = (context.wrappedIdentity() != null) + ? ((ConnectorIdentity) context.wrappedIdentity()) + : ConnectorIdentity.ofUser("fake"); + return new ForwardingFileIo(fileSystemFactory.create(currentIdentity), ioConfig); + }); + + catalog.initialize("polaris", properties.buildOrThrow()); + return catalog; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisNamespace.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisNamespace.java new file mode 100644 index 000000000000..857f155476e1 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisNamespace.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * Represents a Polaris namespace (equivalent to a database in Hive). + */ +public class PolarisNamespace +{ + private final String name; + private final Map properties; + + @JsonCreator + public PolarisNamespace( + @JsonProperty("name") String name, + @JsonProperty("properties") Map properties) + { + this.name = requireNonNull(name, "name is null"); + this.properties = ImmutableMap.copyOf(requireNonNull(properties, "properties is null")); + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public Map getProperties() + { + return properties; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + PolarisNamespace that = (PolarisNamespace) obj; + return Objects.equals(name, that.name) && + Objects.equals(properties, that.properties); + } + + @Override + public int hashCode() + { + return Objects.hash(name, properties); + } + + @Override + public String toString() + { + return "PolarisNamespace{" + + "name='" + name + '\'' + + ", properties=" + properties + + '}'; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisNotFoundException.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisNotFoundException.java new file mode 100644 index 000000000000..cad2e02d9b2c --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisNotFoundException.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +/** + * Exception thrown when a resource is not found in Polaris. + * This typically corresponds to HTTP 404 responses from the Polaris API. + */ +public class PolarisNotFoundException + extends PolarisException +{ + public PolarisNotFoundException(String message) + { + super(message); + } + + public PolarisNotFoundException(String message, Throwable cause) + { + super(message, cause); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisRestClient.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisRestClient.java new file mode 100644 index 000000000000..444cccf4c93b --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisRestClient.java @@ -0,0 +1,657 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.inject.Inject; +import io.airlift.http.client.BodyGenerator; +import io.airlift.http.client.HttpClient; +import io.airlift.http.client.Request; +import io.airlift.http.client.Response; +import io.airlift.http.client.ResponseHandler; +import io.airlift.http.client.StaticBodyGenerator; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.RESTException; +import org.apache.iceberg.rest.RESTSessionCatalog; +import org.apache.iceberg.rest.auth.OAuth2Properties; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.http.client.Request.Builder.prepareDelete; +import static io.airlift.http.client.Request.Builder.prepareGet; +import static io.airlift.http.client.Request.Builder.preparePost; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; + +/** + * REST client for Apache Polaris catalog API. + * + * This client: + * - Delegates standard Iceberg operations to RESTSessionCatalog + * - Uses Trino HttpClient for Generic Table operations with matching OAuth2 authentication + */ +public class PolarisRestClient +{ + private final RESTSessionCatalog restSessionCatalog; + private final HttpClient httpClient; + private final PolarisMetastoreConfig config; + private final SecurityProperties securityProperties; + private final ObjectMapper objectMapper; + + @Inject + public PolarisRestClient( + RESTSessionCatalog restSessionCatalog, + @ForPolarisClient HttpClient httpClient, + PolarisMetastoreConfig config, + SecurityProperties securityProperties, + ObjectMapper objectMapper) + { + this.restSessionCatalog = requireNonNull(restSessionCatalog, "restSessionCatalog is null"); + this.httpClient = requireNonNull(httpClient, "httpClient is null"); + this.config = requireNonNull(config, "config is null"); + this.securityProperties = requireNonNull(securityProperties, "securityProperties is null"); + this.objectMapper = requireNonNull(objectMapper, "objectMapper is null"); + } + + // ICEBERG OPERATIONS (via RESTSessionCatalog) + + /** + * Lists Iceberg tables using the standard REST catalog + */ + public List listIcebergTables(String namespaceName) + { + try { + SessionCatalog.SessionContext sessionContext = createSessionContext(); + Namespace namespace = Namespace.of(namespaceName.split("\\.")); + + return restSessionCatalog.listTables(sessionContext, namespace).stream() + .map(id -> new PolarisTableIdentifier(id.namespace().toString(), id.name())) + .collect(toImmutableList()); + } + catch (RESTException e) { + throw new PolarisException("Failed to list Iceberg tables: " + e.getMessage(), e); + } + } + + /** + * Loads Iceberg table metadata using the standard REST catalog + */ + public PolarisTableMetadata loadIcebergTable(String namespaceName, String tableName) + { + try { + SessionCatalog.SessionContext sessionContext = createSessionContext(); + TableIdentifier tableId = TableIdentifier.of(namespaceName, tableName); + + org.apache.iceberg.Table table = restSessionCatalog.loadTable(sessionContext, tableId); + return convertIcebergTableToPolaris(table); + } + catch (NoSuchTableException e) { + throw new PolarisNotFoundException("Iceberg table not found: " + namespaceName + "." + tableName); + } + catch (RESTException e) { + throw new PolarisException("Failed to load Iceberg table: " + e.getMessage(), e); + } + } + + // NAMESPACE OPERATIONS (via RESTSessionCatalog) + + /** + * Lists namespaces using the standard REST catalog + */ + public List listNamespaces(Optional parent) + { + try { + SessionCatalog.SessionContext sessionContext = createSessionContext(); + Namespace parentNamespace = parent.map(p -> Namespace.of(p.split("\\."))).orElse(Namespace.empty()); + + return restSessionCatalog.listNamespaces(sessionContext, parentNamespace).stream() + .map(ns -> new PolarisNamespace(ns.toString(), ImmutableMap.of())) + .collect(toImmutableList()); + } + catch (RESTException e) { + throw new PolarisException("Failed to list namespaces: " + e.getMessage(), e); + } + } + + /** + * Creates namespace using the standard REST catalog + */ + public void createNamespace(PolarisNamespace namespace) + { + try { + SessionCatalog.SessionContext sessionContext = createSessionContext(); + org.apache.iceberg.catalog.Namespace icebergNamespace = org.apache.iceberg.catalog.Namespace.of(namespace.getName()); + + restSessionCatalog.createNamespace(sessionContext, icebergNamespace, namespace.getProperties()); + } + catch (RESTException e) { + throw new PolarisException("Failed to create namespace: " + namespace.getName(), e); + } + } + + // AUTHENTICATION & HTTP UTILITIES + + /** + * Gets authentication headers by reusing the same OAuth2 credentials as RESTSessionCatalog + * This ensures identical authentication behavior between Iceberg and Generic table operations + */ + private Map getAuthHeaders() + { + try { + // Extract OAuth2 credentials exactly like TrinoIcebergRestCatalogFactory does + Map securityProps = securityProperties.get(); + System.out.println("DEBUG: Security properties: " + securityProps.keySet()); + + Map credentials = Maps.filterKeys(securityProps, + key -> Set.of(OAuth2Properties.TOKEN, OAuth2Properties.CREDENTIAL).contains(key)); + System.out.println("DEBUG: Filtered credentials: " + credentials.keySet()); + System.out.println("DEBUG: OAuth2Properties.TOKEN = '" + OAuth2Properties.TOKEN + "'"); + System.out.println("DEBUG: OAuth2Properties.CREDENTIAL = '" + OAuth2Properties.CREDENTIAL + "'"); + + // If we have a direct token, use it + if (credentials.containsKey(OAuth2Properties.TOKEN)) { + System.out.println("DEBUG: Using direct token"); + return ImmutableMap.of("Authorization", "Bearer " + credentials.get(OAuth2Properties.TOKEN)); + } + + // If we have credentials, perform OAuth2 token exchange like RESTSessionCatalog does + if (credentials.containsKey(OAuth2Properties.CREDENTIAL)) { + System.out.println("DEBUG: Performing OAuth2 token exchange"); + String token = performOAuth2TokenExchange(credentials.get(OAuth2Properties.CREDENTIAL)); + return ImmutableMap.of("Authorization", "Bearer " + token); + } + + // No authentication credentials found, return empty headers + System.out.println("DEBUG: No authentication credentials found"); + return ImmutableMap.of(); + } + catch (Exception e) { + System.out.println("DEBUG: Exception in getAuthHeaders: " + e.getClass().getSimpleName() + ": " + e.getMessage()); + e.printStackTrace(); + throw new PolarisException("Failed to get authentication headers", e); + } + } + + /** + * Performs OAuth2 token exchange using the same flow as RESTSessionCatalog + */ + private String performOAuth2TokenExchange(String credential) + { + try { + // Parse credential (format: "client_id:client_secret") + String[] parts = credential.split(":", 2); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid credential format. Expected 'client_id:client_secret'"); + } + String clientId = parts[0]; + String clientSecret = parts[1]; + + // Build OAuth2 token request + Map tokenRequest = ImmutableMap.of( + "grant_type", "client_credentials", + "client_id", clientId, + "client_secret", clientSecret, + "scope", securityProperties.get().getOrDefault(OAuth2Properties.SCOPE, "PRINCIPAL_ROLE:ALL")); + + // Create form-encoded body + String body = tokenRequest.entrySet().stream() + .map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(joining("&")); + + // Make token request + URI tokenUri = URI.create(securityProperties.get().get(OAuth2Properties.OAUTH2_SERVER_URI)); + Request request = preparePost() + .setUri(tokenUri) + .setHeader("Content-Type", "application/x-www-form-urlencoded") + .setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(body, UTF_8)) + .build(); + + // Execute token request + return httpClient.execute(request, new ResponseHandler() + { + @Override + public String handleException(Request request, Exception exception) + { + throw new PolarisException("OAuth2 token exchange failed", exception); + } + + @Override + public String handle(Request request, Response response) + { + if (response.getStatusCode() != 200) { + throw new PolarisException("OAuth2 token exchange failed with status: " + response.getStatusCode()); + } + + try { + try (InputStream inputStream = response.getInputStream()) { + String responseBody = new String(inputStream.readAllBytes(), UTF_8); + Map tokenResponse = objectMapper.readValue(responseBody, Map.class); + return (String) tokenResponse.get("access_token"); + } + } + catch (Exception e) { + throw new PolarisException("Failed to parse OAuth2 token response", e); + } + } + }); + } + catch (Exception e) { + throw new PolarisException("OAuth2 token exchange failed", e); + } + } + + /** + * Creates session context using the same credentials as RESTSessionCatalog + * This ensures identical OAuth2 token handling + */ + private SessionCatalog.SessionContext createSessionContext() + { + String sessionId = UUID.randomUUID().toString(); + + // Extract OAuth2 credentials exactly like TrinoIcebergRestCatalogFactory does + Map securityProps = securityProperties.get(); + Map credentials = ImmutableMap.builder() + .putAll(Maps.filterKeys(securityProps, + key -> Set.of(OAuth2Properties.TOKEN, OAuth2Properties.CREDENTIAL).contains(key))) + .buildOrThrow(); + + Map properties = ImmutableMap.of( + "catalog", config.getPrefix(), + "warehouse", config.getUri().toString()); + + return new SessionCatalog.SessionContext(sessionId, "polaris-user", credentials, properties, null); + } + + // GENERIC TABLE OPERATIONS (via HttpClient) + + /** + * Lists Generic (Delta Lake, CSV, etc.) tables using Polaris-specific API + */ + public List listGenericTables(String namespaceName) + { + URI uri = buildUri("/polaris/v1/" + config.getPrefix() + "/namespaces/" + encodeNamespace(namespaceName) + "/generic-tables"); + + Request request = prepareGet() + .setUri(uri) + .addHeaders(buildHeaders(getAuthHeaders())) + .build(); + + return execute(request, new ResponseHandler, RuntimeException>() + { + @Override + public List handleException(Request request, Exception exception) + { + throw new PolarisException("Failed to list generic tables", exception); + } + + @Override + public List handle(Request request, Response response) + { + if (response.getStatusCode() != 200) { + throw new PolarisException("Failed to list generic tables: " + response.getStatusCode()); + } + try { + ListGenericTablesResponse listResponse = objectMapper.readValue(response.getInputStream(), ListGenericTablesResponse.class); + return listResponse.toPolarisTableIdentifiers(); + } + catch (IOException e) { + throw new PolarisException("Failed to parse generic tables response", e); + } + } + }); + } + + /** + * Loads a Generic table using Polaris-specific API + */ + public PolarisGenericTable loadGenericTable(String namespaceName, String tableName) + { + URI uri = buildUri("/polaris/v1/" + config.getPrefix() + "/namespaces/" + encodeNamespace(namespaceName) + "/generic-tables/" + tableName); + + Request request = prepareGet() + .setUri(uri) + .addHeaders(buildHeaders(getAuthHeaders())) + .build(); + + return execute(request, new ResponseHandler() + { + @Override + public PolarisGenericTable handleException(Request request, Exception exception) + { + throw new PolarisException("Failed to load generic table: " + tableName, exception); + } + + @Override + public PolarisGenericTable handle(Request request, Response response) + { + if (response.getStatusCode() == 404) { + throw new TableNotFoundException(new SchemaTableName(namespaceName, tableName)); + } + if (response.getStatusCode() != 200) { + throw new PolarisException("Failed to load generic table: " + response.getStatusCode()); + } + try { + try (InputStream inputStream = response.getInputStream()) { + String responseBody = new String(inputStream.readAllBytes(), UTF_8); + System.out.println("DEBUG: Generic table response body: " + responseBody); + + com.fasterxml.jackson.databind.JsonNode rootNode = objectMapper.readTree(responseBody); + com.fasterxml.jackson.databind.JsonNode tableNode = rootNode.get("table"); + PolarisGenericTable table = objectMapper.treeToValue(tableNode, PolarisGenericTable.class); + System.out.println("DEBUG: Successfully parsed generic table: " + table.getName()); + return table; + } + } + catch (IOException e) { + System.out.println("DEBUG: Jackson parsing error: " + e.getClass().getSimpleName() + ": " + e.getMessage()); + if (e.getCause() != null) { + System.out.println("DEBUG: Caused by: " + e.getCause().getClass().getSimpleName() + ": " + e.getCause().getMessage()); + } + throw new PolarisException("Failed to parse load generic table response", e); + } + } + }); + } + + /** + * Creates a Generic table using Polaris-specific API + */ + public void createGenericTable(String databaseName, PolarisGenericTable genericTable) + { + URI uri = buildUri("/polaris/v1/" + config.getPrefix() + "/namespaces/" + encodeNamespace(databaseName) + "/generic-tables"); + + CreateGenericTableRequest request = new CreateGenericTableRequest( + genericTable.getName(), + genericTable.getFormat(), + genericTable.getBaseLocation().orElse(null), + genericTable.getDoc().orElse(null), + genericTable.getProperties()); + + Request httpRequest = preparePost() + .setUri(uri) + .addHeaders(buildHeaders(getAuthHeaders())) + .addHeader("Content-Type", "application/json") + .setBodyGenerator(createJsonBodyGenerator(request)) + .build(); + + execute(httpRequest, new ResponseHandler() + { + @Override + public Void handleException(Request request, Exception exception) + { + throw new PolarisException("Failed to create generic table: " + genericTable.getName(), exception); + } + + @Override + public Void handle(Request request, Response response) + { + if (response.getStatusCode() == 409) { + throw new PolarisAlreadyExistsException("Generic table already exists: " + genericTable.getName()); + } + if (response.getStatusCode() != 200 && response.getStatusCode() != 201) { + throw new PolarisException("Failed to create generic table: " + response.getStatusCode()); + } + return null; + } + }); + } + + /** + * Drops a Generic table using Polaris-specific API + */ + public void dropGenericTable(String databaseName, String tableName) + { + URI uri = buildUri("/polaris/v1/" + config.getPrefix() + "/namespaces/" + encodeNamespace(databaseName) + "/generic-tables/" + tableName); + + Request request = prepareDelete() + .setUri(uri) + .addHeaders(buildHeaders(getAuthHeaders())) + .build(); + + execute(request, new ResponseHandler() + { + @Override + public Void handleException(Request request, Exception exception) + { + throw new PolarisException("Failed to drop generic table: " + tableName, exception); + } + + @Override + public Void handle(Request request, Response response) + { + if (response.getStatusCode() == 404) { + throw new PolarisNotFoundException("Generic table not found: " + tableName); + } + if (response.getStatusCode() != 204) { + throw new PolarisException("Failed to drop generic table: " + response.getStatusCode()); + } + return null; + } + }); + } + + // HELPER METHODS + + /** + * Converts Iceberg Table to PolarisTableMetadata + */ + private PolarisTableMetadata convertIcebergTableToPolaris(org.apache.iceberg.Table table) + { + // Extract metadata from Iceberg table + TableOperations ops = ((HasTableOperations) table).operations(); + String location = ops.current().location(); + Map properties = ops.current().properties(); + + // Convert Iceberg schema to map representation + Schema icebergSchema = ops.current().schema(); + Map schemaMap = ImmutableMap.of( + "type", "struct", + "schema-id", icebergSchema.schemaId(), + "fields", icebergSchema.columns().stream() + .map(field -> ImmutableMap.of( + "id", field.fieldId(), + "name", field.name(), + "required", field.isRequired(), + "type", field.type().toString())) + .collect(toImmutableList())); + + return new PolarisTableMetadata( + location, + schemaMap, + properties); + } + + /** + * Builds headers for HTTP requests + */ + private com.google.common.collect.Multimap buildHeaders(Map headers) + { + com.google.common.collect.ImmutableMultimap.Builder builder = com.google.common.collect.ImmutableMultimap.builder(); + headers.forEach(builder::put); + return builder.build(); + } + + /** + * Executes HTTP request with error handling + */ + private T execute(Request request, ResponseHandler responseHandler) + { + try { + return httpClient.execute(request, responseHandler); + } + catch (Exception e) { + throw new PolarisException("Request failed: " + e.getMessage(), e); + } + } + + /** + * Creates JSON body generator for HTTP requests + */ + private BodyGenerator createJsonBodyGenerator(Object object) + { + try { + String json = objectMapper.writeValueAsString(object); + return StaticBodyGenerator.createStaticBodyGenerator(json, UTF_8); + } + catch (Exception e) { + throw new PolarisException("Failed to serialize request body", e); + } + } + + /** + * Builds URI for API requests by simply concatenating base URI with path + */ + private URI buildUri(String path) + { + return URI.create(config.getUri() + path); + } + + /** + * Encodes namespace for URL path + */ + private String encodeNamespace(String namespace) + { + try { + return java.net.URLEncoder.encode(namespace, UTF_8); + } + catch (Exception e) { + // Fallback to simple replacement for common cases + return namespace.replace(".", "%2E"); + } + } + + private static class ListGenericTablesResponse + { + private final List identifiers; + + @JsonCreator + public ListGenericTablesResponse(@JsonProperty("identifiers") List identifiers) + { + this.identifiers = identifiers != null ? ImmutableList.copyOf(identifiers) : ImmutableList.of(); + } + + public List toPolarisTableIdentifiers() + { + return identifiers.stream() + .map(TableIdentifierDto::toPolarisTableIdentifier) + .collect(toImmutableList()); + } + } + + private static class TableIdentifierDto + { + private final List namespace; + private final String name; + + @JsonCreator + public TableIdentifierDto( + @JsonProperty("namespace") List namespace, + @JsonProperty("name") String name) + { + this.namespace = requireNonNull(namespace, "namespace is null"); + this.name = requireNonNull(name, "name is null"); + } + + public PolarisTableIdentifier toPolarisTableIdentifier() + { + return new PolarisTableIdentifier(String.join(".", namespace), name); + } + } + + private static class LoadGenericTableResponse + { + private final PolarisGenericTable table; + + @JsonCreator + public LoadGenericTableResponse(@JsonProperty("table") PolarisGenericTable table) + { + this.table = requireNonNull(table, "table is null"); + } + + public PolarisGenericTable getTable() + { + return table; + } + } + + private static class CreateGenericTableRequest + { + private final String name; + private final String format; + private final String baseLocation; + private final String doc; + private final Map properties; + + public CreateGenericTableRequest(String name, String format, String baseLocation, String doc, Map properties) + { + this.name = requireNonNull(name, "name is null"); + this.format = requireNonNull(format, "format is null"); + this.baseLocation = baseLocation; // Optional + this.doc = doc; // Optional + this.properties = properties != null ? ImmutableMap.copyOf(properties) : ImmutableMap.of(); + } + + @JsonProperty("name") + public String getName() + { + return name; + } + + @JsonProperty("format") + public String getFormat() + { + return format; + } + + @JsonProperty("base-location") + public String getBaseLocation() + { + return baseLocation; + } + + @JsonProperty("doc") + public String getDoc() + { + return doc; + } + + @JsonProperty("properties") + public Map getProperties() + { + return properties; + } + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisTableIdentifier.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisTableIdentifier.java new file mode 100644 index 000000000000..5f751d75f74c --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisTableIdentifier.java @@ -0,0 +1,80 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * Represents a table identifier in Polaris (namespace + table name). + */ +public class PolarisTableIdentifier +{ + private final String namespace; + private final String name; + + @JsonCreator + public PolarisTableIdentifier( + @JsonProperty("namespace") String namespace, + @JsonProperty("name") String name) + { + this.namespace = requireNonNull(namespace, "namespace is null"); + this.name = requireNonNull(name, "name is null"); + } + + @JsonProperty + public String getNamespace() + { + return namespace; + } + + @JsonProperty + public String getName() + { + return name; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + PolarisTableIdentifier that = (PolarisTableIdentifier) obj; + return Objects.equals(namespace, that.namespace) && + Objects.equals(name, that.name); + } + + @Override + public int hashCode() + { + return Objects.hash(namespace, name); + } + + @Override + public String toString() + { + return "PolarisTableIdentifier{" + + "namespace='" + namespace + '\'' + + ", name='" + name + '\'' + + '}'; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisTableMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisTableMetadata.java new file mode 100644 index 000000000000..16f00562702c --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/PolarisTableMetadata.java @@ -0,0 +1,94 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * Represents Iceberg table metadata from Polaris. + * This corresponds to the response from the standard Iceberg REST API. + */ +public class PolarisTableMetadata +{ + private final String location; + private final Map schema; + private final Map properties; + + @JsonCreator + public PolarisTableMetadata( + @JsonProperty("location") String location, + @JsonProperty("schema") Map schema, + @JsonProperty("properties") Map properties) + { + this.location = requireNonNull(location, "location is null"); + this.schema = schema != null ? ImmutableMap.copyOf(schema) : ImmutableMap.of(); + this.properties = properties != null ? ImmutableMap.copyOf(properties) : ImmutableMap.of(); + } + + @JsonProperty + public String getLocation() + { + return location; + } + + @JsonProperty + public Map getSchema() + { + return schema; + } + + @JsonProperty + public Map getProperties() + { + return properties; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + PolarisTableMetadata that = (PolarisTableMetadata) obj; + return Objects.equals(location, that.location) && + Objects.equals(schema, that.schema) && + Objects.equals(properties, that.properties); + } + + @Override + public int hashCode() + { + return Objects.hash(location, schema, properties); + } + + @Override + public String toString() + { + return "PolarisTableMetadata{" + + "location='" + location + '\'' + + ", schema=" + schema + + ", properties=" + properties + + '}'; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/SecurityProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/SecurityProperties.java new file mode 100644 index 000000000000..dcd9371c6e6a --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/polaris/SecurityProperties.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.polaris; + +import java.util.Map; + +public interface SecurityProperties +{ + Map get(); +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePlugin.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePlugin.java index 61bc53c15d44..fdaab617d93f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePlugin.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePlugin.java @@ -101,6 +101,30 @@ public void testGlueMetastore() .hasMessageContaining("Error: Configuration property 'hive.metastore.uri' was not used"); } + @Test + public void testPolarisMetastore() + { + ConnectorFactory factory = getHiveConnectorFactory(); + + factory.create( + "test", + ImmutableMap.of( + "hive.metastore", "polaris", + "polaris.uri", "http://localhost:8181", + "bootstrap.quiet", "true"), + new TestingConnectorContext()) + .shutdown(); + + assertThatThrownBy(() -> factory.create( + "test", + ImmutableMap.of( + "hive.metastore", "polaris", + "hive.metastore.uri", "thrift://foo:1234", + "bootstrap.quiet", "true"), + new TestingConnectorContext())) + .hasMessageContaining("Error: Configuration property 'hive.metastore.uri' was not used"); + } + @Test public void testImmutablePartitionsAndInsertOverwriteMutuallyExclusive() { diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java index cd30d99e515a..6ff999ed754e 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java @@ -104,6 +104,7 @@ protected void setup(Binder binder) case THRIFT -> new DeltaLakeThriftMetastoreModule(); case FILE -> new DeltaLakeFileMetastoreModule(); case GLUE -> new DeltaLakeGlueMetastoreModule(); + case POLARIS -> new DeltaLakeFileMetastoreModule(); }); binder.install(new DeltaLakeExecutorModule()); diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java index 7ed96043670a..cd0089a74836 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java @@ -77,6 +77,7 @@ protected void setup(Binder binder) case THRIFT -> new IcebergHiveMetastoreCatalogModule(); case FILE -> new IcebergFileMetastoreCatalogModule(); case GLUE -> new IcebergGlueCatalogModule(); + case POLARIS -> new IcebergHiveMetastoreCatalogModule(); }); binder.install(new IcebergExecutorModule()); From afeb374aa07b6a426077b31f8acf7c8137f29c5a Mon Sep 17 00:00:00 2001 From: William Hyun Date: Mon, 28 Jul 2025 09:26:56 -0700 Subject: [PATCH 2/3] Dependency Fix --- plugin/trino-hive/pom.xml | 21 --------------------- pom.xml | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml index 20e0caac0991..f311ca3bdbea 100644 --- a/plugin/trino-hive/pom.xml +++ b/plugin/trino-hive/pom.xml @@ -33,12 +33,6 @@ com.google.guava guava - - - org.jspecify - jspecify - - @@ -191,12 +185,6 @@ org.apache.iceberg iceberg-core - - - org.jspecify - jspecify - - @@ -619,15 +607,6 @@ - - org.basepom.maven - duplicate-finder-maven-plugin - - - iceberg-build.properties - - - diff --git a/pom.xml b/pom.xml index 8e9e48d3b027..f72dbddd97c5 100644 --- a/pom.xml +++ b/pom.xml @@ -2797,6 +2797,43 @@ + + + + org.apache.iceberg + iceberg-api + + + org.apache.iceberg + iceberg-common + + + org.apache.iceberg + iceberg-core + + + + iceberg-build.properties + + + + + + org.apache.iceberg + iceberg-bundled-guava + + + org.jspecify + jspecify + + + + org.jspecify.annotations.NonNull + org.jspecify.annotations.NullMarked + org.jspecify.annotations.NullUnmarked + org.jspecify.annotations.Nullable + + From aa611ff1d25bf47008b6a3b43f08897651e0337c Mon Sep 17 00:00:00 2001 From: William Hyun Date: Mon, 28 Jul 2025 10:22:13 -0700 Subject: [PATCH 3/3] Fix Test --- .../io/trino/plugin/hive/TestHivePlugin.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePlugin.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePlugin.java index fdaab617d93f..13a929c6049b 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePlugin.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePlugin.java @@ -106,14 +106,16 @@ public void testPolarisMetastore() { ConnectorFactory factory = getHiveConnectorFactory(); - factory.create( - "test", - ImmutableMap.of( - "hive.metastore", "polaris", - "polaris.uri", "http://localhost:8181", - "bootstrap.quiet", "true"), - new TestingConnectorContext()) - .shutdown(); + // This will fail with connection error since no actual Polaris instance is running + assertThatThrownBy(() -> factory.create( + "test", + ImmutableMap.of( + "hive.metastore", "polaris", + "polaris.uri", "http://localhost:8181", + "bootstrap.quiet", "true"), + new TestingConnectorContext())) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Error occurred while processing GET request"); assertThatThrownBy(() -> factory.create( "test",