Skip to content

Convert WASBS to ABFSS experimental workflow #4031

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
19 changes: 17 additions & 2 deletions src/databricks/labs/ucx/hive_metastore/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

logger = logging.getLogger(__name__)

_EXTERNAL_FILE_LOCATION_SCHEMES = ("s3", "s3a", "s3n", "gcs", "abfss")
_EXTERNAL_FILE_LOCATION_SCHEMES = ("s3", "s3a", "s3n", "gcs", "abfss", "wasbs")


@dataclass
Expand Down Expand Up @@ -215,6 +215,20 @@ def _get_dbfs_root(self) -> ExternalLocation | None:
return None
return None

@staticmethod
def wasbs_to_abfss(location: str | None) -> str | None:
"""
Converts a wasbs:// location to abfss://
"""
if not location:
return None
if not location.startswith("wasbs://"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the next if are both checking for wasbs. Can be consolidated.

return location
parsed = urlparse(location)
if parsed.scheme == "wasbs":
return f"abfss://{parsed.netloc.replace('.blob.core.windows.net','.dfs.core.windows.net')}{parsed.path}"
return location

def _external_locations(self) -> Iterable[ExternalLocation]:
trie = LocationTrie()
for table in self._tables_crawler.snapshot():
Expand Down Expand Up @@ -252,6 +266,7 @@ def _resolve_location(self, table: Table) -> Table:
return table
location = self._resolve_jdbc(table)
location = self.resolve_mount(location)
location = self.wasbs_to_abfss(location)
return dataclasses.replace(table, location=location)

def resolve_mount(self, location: str | None) -> str | None:
Expand Down Expand Up @@ -331,7 +346,7 @@ def _get_ext_location_definitions(missing_locations: list[ExternalLocation]) ->
else:
res_name = loc.location[prefix_len:].rstrip("/").replace("/", "_")
if res_name == "":
# if the cloud storage url doesn't match the above condition or incorrect (example wasb://)
# if the cloud storage url doesn't match the above condition or incorrect
# dont generate tf script and ignore
logger.warning(f"unsupported storage format {loc.location}")
continue
Expand Down
92 changes: 92 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,28 @@ def convert_managed_hms_to_external(
logger.info("No managed hms table found to convert to external")
return tasks

def convert_wasbs_to_adls_gen2(self):
"""
Converts a Hive metastore azure wasbs tables to abfss using spark jvm.
"""

self._spark = self._spark_session
tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler, False)
tables_in_scope = filter(lambda t: t.src.what == What.EXTERNAL_SYNC, tables_to_migrate)
tasks = []
for table in tables_in_scope:
if table.src.location and table.src.location.startswith("wasbs://"):
tasks.append(
partial(
self._convert_wasbs_table_to_abfss,
table.src,
)
)
Threads.strict("convert tables to abfss", tasks)
if not tasks:
logger.info("No wasbs table found to convert to abfss")
return tasks

def migrate_tables(
self,
what: What,
Expand Down Expand Up @@ -294,6 +316,12 @@ def _catalog_type(self):
def _catalog_table(self):
return self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogTable # pylint: disable=protected-access

@cached_property
def _catalog_storage(self):
return (
self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat # pylint: disable=protected-access
)

@staticmethod
def _get_entity_storage_locations(table_metadata):
"""Obtain the entityStorageLocations property for table metadata, if the property is present."""
Expand Down Expand Up @@ -350,10 +378,74 @@ def _convert_hms_table_to_external(self, src_table: Table) -> bool:
logger.info(f"Converted {src_table.name} to External Table type.")
return True

def _convert_wasbs_table_to_abfss(self, src_table: Table) -> bool:
"""
Converts a Hive metastore azure wasbs table to abfss using alter table command.
"""
logger.info(f"Changing HMS managed table {src_table.name} to External Table type.")
inventory_table = self._tables_crawler.full_name
database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access
table_identifier = self._table_identifier(src_table.name, database)
new_table_location = ExternalLocations.wasbs_to_abfss(src_table.location)
if not new_table_location:
logger.warning(f"Invalid wasbs location for table {src_table.name}, skipping conversion.")
return False
try:
old_table = self._catalog.getTableMetadata(table_identifier)
entity_storage_locations = self._get_entity_storage_locations(old_table)
table_location = old_table.storage()
new_location = self._catalog_storage(
self._spark._jvm.scala.Some( # pylint: disable=protected-access
self._spark._jvm.java.net.URI(new_table_location) # pylint: disable=protected-access
),
table_location.inputFormat(),
table_location.outputFormat(),
table_location.serde(),
table_location.compressed(),
table_location.properties(),
)
new_table = self._catalog_table(
old_table.identifier(),
old_table.tableType(),
new_location,
old_table.schema(),
old_table.provider(),
old_table.partitionColumnNames(),
old_table.bucketSpec(),
old_table.owner(),
old_table.createTime(),
old_table.lastAccessTime(),
old_table.createVersion(),
old_table.properties(),
old_table.stats(),
old_table.viewText(),
old_table.comment(),
old_table.unsupportedFeatures(),
old_table.tracksPartitionsInCatalog(),
old_table.schemaPreservesCase(),
old_table.ignoredProperties(),
old_table.viewOriginalText(),
# From DBR 16, there's a new constructor argument: entityStorageLocations (Seq[EntityStorageLocation])
# (We can't detect whether the argument is needed by the constructor, but assume that if the accessor
# is present on the source table then the argument is needed.)
*([entity_storage_locations] if entity_storage_locations is not None else []),
)
self._catalog.alterTable(new_table)
except Exception as e: # pylint: disable=broad-exception-caught
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless this is intentionally meant to be broad exceptions, we should narrow it down to Py4JJavaError, ValueError that could occur here.

logger.warning(f"Error converting HMS table {src_table.name} to abfss: {e}", exc_info=True)
return False
self._update_table_location(src_table, inventory_table, new_table_location)
logger.info(f"Converted {src_table.name} to External Table type.")
return True

def _update_table_status(self, src_table: Table, inventory_table: str):
update_sql = f"UPDATE {escape_sql_identifier(inventory_table)} SET object_type = 'EXTERNAL' WHERE catalog='hive_metastore' AND database='{src_table.database}' AND name='{src_table.name}';"
self._sql_backend.execute(update_sql)

def _update_table_location(self, src_table: Table, inventory_table: str, new_location: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs return type to enforce linting

update_sql = f"UPDATE {escape_sql_identifier(inventory_table)} SET location = '{new_location}' WHERE catalog='hive_metastore' AND database='{src_table.database}' AND name='{src_table.name}';"
self._sql_backend.execute(update_sql)

def _migrate_managed_as_external_table(self, src_table: Table, rule: Rule):
target_table_key = rule.as_uc_table_key
table_migrate_sql = src_table.sql_migrate_as_external(target_table_key)
Expand Down
10 changes: 10 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,13 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None:
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class ConvertWASBSToADLSGen2(Workflow):
def __init__(self):
super().__init__('convert-wasbs-to-adls-gen2-experimental')

@job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables])
def convert_wasbs_to_adls_gen2(self, ctx: RuntimeContext):
"""This workflow task converts WASBS paths to ADLS Gen2 paths in the Hive Metastore."""
ctx.tables_migrator.convert_wasbs_to_adls_gen2()
2 changes: 2 additions & 0 deletions src/databricks/labs/ucx/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
MigrateTablesInMounts,
MigrateHiveSerdeTablesInPlace,
MigrateExternalTablesCTAS,
ConvertWASBSToADLSGen2,
)
from databricks.labs.ucx.progress.workflows import MigrationProgress
from databricks.labs.ucx.recon.workflows import MigrationRecon
Expand Down Expand Up @@ -47,6 +48,7 @@ def all(cls):
RemoveWorkspaceLocalGroups(),
ScanTablesInMounts(),
MigrateTablesInMounts(),
ConvertWASBSToADLSGen2(),
PermissionsMigrationAPI(),
MigrationRecon(),
Failing(),
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/hive_metastore/tables/wasbs_to_adls.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"src": {
"catalog": "hive_metastore",
"database": "db1_src",
"name": "wasbs_src",
"object_type": "EXTERNAL",
"table_format": "DELTA",
"location": "wasbs://bucket/test/table1"
},
"rule": {
"workspace_name": "workspace",
"catalog_name": "ucx_default",
"src_schema": "db1_src",
"dst_schema": "db1_dst",
"src_table": "wasbs_src",
"dst_table": "wasbs_dst"
}
}
5 changes: 5 additions & 0 deletions tests/unit/hive_metastore/test_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"s3n://bucket-name",
"gcs://test_location2/test2/table2",
"abfss://cont1@storagetest1.dfs.core.windows.net/test2/table3",
"wasbs://container2@test.blob.core.windows.net/test3/table4",
],
)
def test_location_trie_valid_and_full_location(location):
Expand Down Expand Up @@ -170,6 +171,8 @@ def test_external_locations():
table_factory(["s3://us-east-1-dev-account-staging-uc-ext-loc-bucket-23/testloc/Table3", ""]),
table_factory(["s3://us-east-1-dev-account-staging-uc-ext-loc-bucket-23/anotherloc/Table4", ""]),
table_factory(["s3://root_location", ""]),
table_factory(["abfss://container1@storagetest1.dfs.core.windows.net/test2/table3", ""]),
table_factory(["wasbs://container2@storagetest2.blob.core.windows.net/test3/table4", ""]),
table_factory(["gcs://test_location2/a/b/table2", ""]),
table_factory(["dbfs:/mnt/ucx/database1/table1", ""]),
table_factory(["/dbfs/mnt/ucx/database2/table2", ""]),
Expand Down Expand Up @@ -218,6 +221,8 @@ def test_external_locations():
sql_backend = MockBackend()
crawler = ExternalLocations(Mock(), sql_backend, "test", tables_crawler, mounts_crawler)
assert crawler.snapshot() == [
ExternalLocation('abfss://container1@storagetest1.dfs.core.windows.net/test2', 1),
ExternalLocation('abfss://container2@storagetest2.dfs.core.windows.net/test3', 1),
ExternalLocation('gcs://test_location2/a/b', 1),
ExternalLocation(
'jdbc:databricks://dbc-test1-aa11.cloud.databricks.com;httpPath=/sql/1.0/warehouses/65b52fb5bd86a7be', 1
Expand Down
21 changes: 21 additions & 0 deletions tests/unit/hive_metastore/test_table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,27 @@ def test_migrate_managed_table_as_external_tables_with_conversion(ws, mock_pyspa
]


def test_convert_wasbs_to_adls_gen2(ws, mock_pyspark):
errors = {}
rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]}
crawler_backend = MockBackend(fails_on_first=errors, rows=rows)
backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(crawler_backend, "inventory_database")
table_mapping = mock_table_mapping(["wasbs_to_adls"])
migration_status_refresher = TableMigrationStatusRefresher(ws, backend, "inventory_database", table_crawler)
migrate_grants = create_autospec(MigrateGrants)
external_locations = create_autospec(ExternalLocations)
table_migrate = TablesMigrator(
table_crawler, ws, backend, table_mapping, migration_status_refresher, migrate_grants, external_locations
)
table_migrate.convert_wasbs_to_adls_gen2()
migrate_grants.apply.assert_not_called()
external_locations.resolve_mount.assert_not_called()
assert backend.queries == [
"UPDATE `hive_metastore`.`inventory_database`.`tables` SET location = 'abfss://bucket/test/table1' WHERE catalog='hive_metastore' AND database='db1_src' AND name='wasbs_src';"
]


def test_migrate_managed_table_as_external_tables_without_conversion(ws, mock_pyspark):
errors = {}
rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]}
Expand Down
Loading