Skip to content

Commit 1dd1a12

Browse files
authored
Speedup assessment workflow by making DBFS root table size calculation parallel (#2745)
We were not doing that before and now we do.
1 parent 1951f37 commit 1dd1a12

File tree

5 files changed

+26
-22
lines changed

5 files changed

+26
-22
lines changed

src/databricks/labs/ucx/contexts/workflow_task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def pipelines_crawler(self):
7171

7272
@cached_property
7373
def table_size_crawler(self):
74-
return TableSizeCrawler(self.sql_backend, self.inventory_database)
74+
return TableSizeCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)
7575

7676
@cached_property
7777
def policies_crawler(self):

src/databricks/labs/ucx/hive_metastore/table_size.py

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
import logging
22
from collections.abc import Iterable
33
from dataclasses import dataclass
4+
from functools import partial
45

6+
from databricks.labs.blueprint.parallel import Threads
57
from databricks.labs.lsql.backends import SqlBackend
68

79
from databricks.labs.ucx.framework.crawlers import CrawlerBase
810
from databricks.labs.ucx.framework.utils import escape_sql_identifier
911
from databricks.labs.ucx.hive_metastore import TablesCrawler
12+
from databricks.labs.ucx.hive_metastore.tables import Table
1013

1114
logger = logging.getLogger(__name__)
1215

@@ -40,43 +43,43 @@ def _crawl(self) -> Iterable[TableSize]:
4043
"""Crawls and lists tables using table crawler
4144
Identifies DBFS root tables and calculates the size for these.
4245
"""
46+
tasks = []
4347
for table in self._tables_crawler.snapshot():
4448
if not table.kind == "TABLE":
4549
continue
4650
if not table.is_dbfs_root:
4751
continue
48-
size_in_bytes = self._safe_get_table_size(table.key)
49-
if size_in_bytes is None:
50-
continue # table does not exist anymore or is corrupted
51-
52-
yield TableSize(
53-
catalog=table.catalog, database=table.database, name=table.name, size_in_bytes=size_in_bytes
54-
)
52+
tasks.append(partial(self._safe_get_table_size, table))
53+
return Threads.strict('DBFS root table sizes', tasks)
5554

5655
def _try_fetch(self) -> Iterable[TableSize]:
5756
"""Tries to load table information from the database or throws TABLE_OR_VIEW_NOT_FOUND error"""
5857
for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"):
5958
yield TableSize(*row)
6059

61-
def _safe_get_table_size(self, table_full_name: str) -> int | None:
62-
logger.debug(f"Evaluating {table_full_name} table size.")
60+
def _safe_get_table_size(self, table: Table) -> TableSize | None:
61+
logger.debug(f"Evaluating {table.key} table size.")
6362
try:
6463
# refresh table statistics to avoid stale stats in HMS
65-
self._backend.execute(f"ANALYZE table {escape_sql_identifier(table_full_name)} compute STATISTICS NOSCAN")
66-
# pylint: disable-next=protected-access
67-
return self._spark._jsparkSession.table(table_full_name).queryExecution().analyzed().stats().sizeInBytes()
64+
self._backend.execute(f"ANALYZE table {table.safe_sql_key} compute STATISTICS NOSCAN")
65+
jvm_df = self._spark._jsparkSession.table(table.safe_sql_key) # pylint: disable=protected-access
66+
size_in_bytes = jvm_df.queryExecution().analyzed().stats().sizeInBytes()
67+
return TableSize(
68+
catalog=table.catalog,
69+
database=table.database,
70+
name=table.name,
71+
size_in_bytes=size_in_bytes,
72+
)
6873
except Exception as e: # pylint: disable=broad-exception-caught
6974
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(e) or "[DELTA_TABLE_NOT_FOUND]" in str(e):
70-
logger.warning(f"Failed to evaluate {table_full_name} table size. Table not found.")
75+
logger.warning(f"Failed to evaluate {table.key} table size. Table not found.")
7176
return None
7277
if "[DELTA_INVALID_FORMAT]" in str(e):
73-
logger.warning(
74-
f"Unable to read Delta table {table_full_name}, please check table structure and try again."
75-
)
78+
logger.warning(f"Unable to read Delta table {table.key}, please check table structure and try again.")
7679
return None
7780
if "[DELTA_MISSING_TRANSACTION_LOG]" in str(e):
78-
logger.warning(f"Delta table {table_full_name} is corrupted: missing transaction log.")
81+
logger.warning(f"Delta table {table.key} is corrupt: missing transaction log.")
7982
return None
80-
logger.error(f"Failed to evaluate {table_full_name} table size: ", exc_info=True)
83+
logger.error(f"Failed to evaluate {table.key} table size: ", exc_info=True)
8184

8285
return None

src/databricks/labs/ucx/hive_metastore/tables.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ class MigrationCount:
340340
what_count: dict[What, int]
341341

342342

343-
class TablesCrawler(CrawlerBase):
343+
class TablesCrawler(CrawlerBase[Table]):
344344
def __init__(self, backend: SqlBackend, schema, include_databases: list[str] | None = None):
345345
"""
346346
Initializes a TablesCrawler instance.

src/databricks/labs/ucx/source_code/known.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1830,6 +1830,7 @@
18301830
"databricks-labs-ucx": {
18311831
"databricks.labs.ucx": []
18321832
},
1833+
"databricks-pydabs": {},
18331834
"databricks-sdk": {
18341835
"databricks.sdk": []
18351836
},
@@ -29921,4 +29922,4 @@
2992129922
"zipp.compat.py310": [],
2992229923
"zipp.glob": []
2992329924
}
29924-
}
29925+
}

tests/unit/hive_metastore/test_table_size.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def test_table_size_when_table_corrupted(mocker, caplog):
137137
results = tsc.snapshot()
138138

139139
assert len(results) == 0
140-
assert "Delta table hive_metastore.db1.table1 is corrupted: missing transaction log" in caplog.text
140+
assert "Delta table hive_metastore.db1.table1 is corrupt: missing transaction log" in caplog.text
141141

142142

143143
def test_table_size_when_delta_invalid_format_error(mocker, caplog):

0 commit comments

Comments
 (0)