-
Notifications
You must be signed in to change notification settings - Fork 96
Convert WASBS to ABFSS experimental workflow #4031
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
fb633cf
c380f70
088327b
fbf7ab8
1200432
c698ac6
40914fa
9d7e46b
f106706
c4f8acd
31217c6
e9109ec
365493c
48d2828
4fc885d
de8cb1c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -96,6 +96,28 @@ def convert_managed_hms_to_external( | |
logger.info("No managed hms table found to convert to external") | ||
return tasks | ||
|
||
def convert_wasbs_to_adls_gen2(self): | ||
""" | ||
Converts a Hive metastore azure wasbs tables to abfss using spark jvm. | ||
""" | ||
|
||
self._spark = self._spark_session | ||
tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler, False) | ||
tables_in_scope = filter(lambda t: t.src.what == What.EXTERNAL_SYNC, tables_to_migrate) | ||
tasks = [] | ||
for table in tables_in_scope: | ||
if table.src.location and table.src.location.startswith("wasbs://"): | ||
tasks.append( | ||
partial( | ||
self._convert_wasbs_table_to_abfss, | ||
table.src, | ||
) | ||
) | ||
Threads.strict("convert tables to abfss", tasks) | ||
if not tasks: | ||
logger.info("No wasbs table found to convert to abfss") | ||
return tasks | ||
|
||
def migrate_tables( | ||
self, | ||
what: What, | ||
|
@@ -294,6 +316,12 @@ def _catalog_type(self): | |
def _catalog_table(self): | ||
return self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogTable # pylint: disable=protected-access | ||
|
||
@cached_property | ||
def _catalog_storage(self): | ||
return ( | ||
self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat # pylint: disable=protected-access | ||
) | ||
|
||
@staticmethod | ||
def _get_entity_storage_locations(table_metadata): | ||
"""Obtain the entityStorageLocations property for table metadata, if the property is present.""" | ||
|
@@ -350,10 +378,74 @@ def _convert_hms_table_to_external(self, src_table: Table) -> bool: | |
logger.info(f"Converted {src_table.name} to External Table type.") | ||
return True | ||
|
||
def _convert_wasbs_table_to_abfss(self, src_table: Table) -> bool: | ||
""" | ||
Converts a Hive metastore azure wasbs table to abfss using alter table command. | ||
""" | ||
logger.info(f"Changing HMS managed table {src_table.name} to External Table type.") | ||
inventory_table = self._tables_crawler.full_name | ||
database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access | ||
table_identifier = self._table_identifier(src_table.name, database) | ||
new_table_location = ExternalLocations.wasbs_to_abfss(src_table.location) | ||
if not new_table_location: | ||
logger.warning(f"Invalid wasbs location for table {src_table.name}, skipping conversion.") | ||
return False | ||
try: | ||
old_table = self._catalog.getTableMetadata(table_identifier) | ||
entity_storage_locations = self._get_entity_storage_locations(old_table) | ||
table_location = old_table.storage() | ||
new_location = self._catalog_storage( | ||
self._spark._jvm.scala.Some( # pylint: disable=protected-access | ||
self._spark._jvm.java.net.URI(new_table_location) # pylint: disable=protected-access | ||
), | ||
table_location.inputFormat(), | ||
table_location.outputFormat(), | ||
table_location.serde(), | ||
table_location.compressed(), | ||
table_location.properties(), | ||
) | ||
new_table = self._catalog_table( | ||
old_table.identifier(), | ||
old_table.tableType(), | ||
new_location, | ||
old_table.schema(), | ||
old_table.provider(), | ||
old_table.partitionColumnNames(), | ||
old_table.bucketSpec(), | ||
old_table.owner(), | ||
old_table.createTime(), | ||
old_table.lastAccessTime(), | ||
old_table.createVersion(), | ||
old_table.properties(), | ||
old_table.stats(), | ||
old_table.viewText(), | ||
old_table.comment(), | ||
old_table.unsupportedFeatures(), | ||
old_table.tracksPartitionsInCatalog(), | ||
old_table.schemaPreservesCase(), | ||
old_table.ignoredProperties(), | ||
old_table.viewOriginalText(), | ||
# From DBR 16, there's a new constructor argument: entityStorageLocations (Seq[EntityStorageLocation]) | ||
# (We can't detect whether the argument is needed by the constructor, but assume that if the accessor | ||
# is present on the source table then the argument is needed.) | ||
*([entity_storage_locations] if entity_storage_locations is not None else []), | ||
) | ||
self._catalog.alterTable(new_table) | ||
except Exception as e: # pylint: disable=broad-exception-caught | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unless this is intentionally meant to be broad exceptions, we should narrow it down to Py4JJavaError, ValueError that could occur here. |
||
logger.warning(f"Error converting HMS table {src_table.name} to abfss: {e}", exc_info=True) | ||
return False | ||
self._update_table_location(src_table, inventory_table, new_table_location) | ||
logger.info(f"Converted {src_table.name} to External Table type.") | ||
return True | ||
|
||
def _update_table_status(self, src_table: Table, inventory_table: str): | ||
update_sql = f"UPDATE {escape_sql_identifier(inventory_table)} SET object_type = 'EXTERNAL' WHERE catalog='hive_metastore' AND database='{src_table.database}' AND name='{src_table.name}';" | ||
self._sql_backend.execute(update_sql) | ||
|
||
def _update_table_location(self, src_table: Table, inventory_table: str, new_location: str): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. needs return type to enforce linting |
||
update_sql = f"UPDATE {escape_sql_identifier(inventory_table)} SET location = '{new_location}' WHERE catalog='hive_metastore' AND database='{src_table.database}' AND name='{src_table.name}';" | ||
self._sql_backend.execute(update_sql) | ||
|
||
def _migrate_managed_as_external_table(self, src_table: Table, rule: Rule): | ||
target_table_key = rule.as_uc_table_key | ||
table_migrate_sql = src_table.sql_migrate_as_external(target_table_key) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
{ | ||
"src": { | ||
"catalog": "hive_metastore", | ||
"database": "db1_src", | ||
"name": "wasbs_src", | ||
"object_type": "EXTERNAL", | ||
"table_format": "DELTA", | ||
"location": "wasbs://bucket/test/table1" | ||
}, | ||
"rule": { | ||
"workspace_name": "workspace", | ||
"catalog_name": "ucx_default", | ||
"src_schema": "db1_src", | ||
"dst_schema": "db1_dst", | ||
"src_table": "wasbs_src", | ||
"dst_table": "wasbs_dst" | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the next if are both checking for wasbs. Can be consolidated.