From 996b7821a27c978310a1af918ba808d99e064488 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Mon, 24 Mar 2025 16:23:05 -0400 Subject: [PATCH 1/2] Addressed issues, added tests. --- src/databricks/labs/ucx/assessment/jobs.py | 3 + src/databricks/labs/ucx/progress/workflows.py | 2 +- .../labs/ucx/source_code/linters/jobs.py | 26 ++++++-- tests/unit/source_code/test_jobs.py | 66 ++++++++++++++++++- 4 files changed, 90 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/jobs.py b/src/databricks/labs/ucx/assessment/jobs.py index a0dcd4c9d8..489da76ede 100644 --- a/src/databricks/labs/ucx/assessment/jobs.py +++ b/src/databricks/labs/ucx/assessment/jobs.py @@ -42,6 +42,8 @@ class JobInfo: job_name: str | None = None creator: str | None = None """User-name of the creator of the pipeline, if known.""" + last_run: int | None = None + """Timestamp of the last run of the pipeline, if known.""" __id_attributes__: ClassVar[tuple[str, ...]] = ("job_id",) @@ -54,6 +56,7 @@ def from_job(cls, job: Job): failures="[]", job_name=job_name, creator=job.creator_user_name or None, + last_run=None, ) diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index 8206bcec50..8d27f96288 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -181,7 +181,7 @@ def assess_workflows(self, ctx: RuntimeContext): """Scans all jobs for migration issues in notebooks. Also stores direct filesystem accesses for display in the migration dashboard.""" # TODO: Ensure these are captured in the history log. - ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database) + ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database, last_run_days=30) @job_task( depends_on=[ diff --git a/src/databricks/labs/ucx/source_code/linters/jobs.py b/src/databricks/labs/ucx/source_code/linters/jobs.py index 721fdc8a89..1216e8833f 100644 --- a/src/databricks/labs/ucx/source_code/linters/jobs.py +++ b/src/databricks/labs/ucx/source_code/linters/jobs.py @@ -1,5 +1,4 @@ import dataclasses -import functools import logging from collections.abc import Iterable @@ -56,10 +55,13 @@ def __init__( self._directfs_crawler = directfs_crawler self._used_tables_crawler = used_tables_crawler - def refresh_report(self, sql_backend: SqlBackend, inventory_database: str) -> None: + def refresh_report( + self, sql_backend: SqlBackend, inventory_database: str, /, last_run_days: int | None = None + ) -> None: tasks = [] for job in self._jobs_crawler.snapshot(): - tasks.append(functools.partial(self.lint_job, job.job_id)) + tasks.append(lambda: self.lint_job(job.job_id, last_run_days=last_run_days)) + # TODO: Limit Scope logger.info(f"Running {len(tasks)} linting tasks in parallel...") job_results, errors = Threads.gather('linting workflows', tasks) job_problems: list[JobProblem] = [] @@ -82,12 +84,26 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str) -> No error_messages = "\n".join([str(error) for error in errors]) logger.warning(f"Errors occurred during linting:\n{error_messages}") - def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]: + def lint_job( + self, job_id: int, /, last_run_days: int | None = None + ) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]: try: job = self._ws.jobs.get(job_id) except NotFound: logger.warning(f'Could not find job: {job_id}') - return ([], [], []) + return [], [], [] + + if last_run_days: + current_day_ms = int(datetime.now().timestamp() * 1000) + last_run_day_ms = current_day_ms - (last_run_days * 24 * 60 * 60 * 1000) + runs = self._ws.jobs.list_runs( + job_id=job_id, + limit=1, + start_time_from=last_run_day_ms, + ) + if not runs: + logger.warning(f'Could not find job runs in the last {last_run_days} days: {job_id}') + return [], [], [] problems, dfsas, tables = self._lint_job(job) if len(problems) > 0: diff --git a/tests/unit/source_code/test_jobs.py b/tests/unit/source_code/test_jobs.py index e14d63fd68..ecff87e46c 100644 --- a/tests/unit/source_code/test_jobs.py +++ b/tests/unit/source_code/test_jobs.py @@ -10,7 +10,7 @@ from databricks.sdk import WorkspaceClient from databricks.sdk.errors import NotFound from databricks.sdk.service import compute, jobs -from databricks.sdk.service.jobs import Job, SparkPythonTask +from databricks.sdk.service.jobs import Job, SparkPythonTask, BaseRun, RunResultState from databricks.sdk.service.pipelines import ( GetPipelineResponse, FileLibrary, @@ -589,3 +589,67 @@ def test_workflow_linter_refresh_report(dependency_resolver, mock_path_lookup, m sql_backend.has_rows_written_for('test.workflow_problems') sql_backend.has_rows_written_for('hive_metastore.test.used_tables_in_paths') sql_backend.has_rows_written_for('hive_metastore.test.directfs_in_paths') + + +def test_workflow_linter_refresh_report_time_bound(dependency_resolver, mock_path_lookup, migration_index) -> None: + ws = create_autospec(WorkspaceClient) + ws.workspace.get_status.return_value = ObjectInfo(object_id=123, language=Language.PYTHON) + some_things = mock_path_lookup.resolve(Path("functional/zoo.py")) + assert some_things is not None + ws.workspace.download.return_value = some_things.read_bytes() + notebook_task = jobs.NotebookTask( + notebook_path=some_things.absolute().as_posix(), + base_parameters={"a": "b", "c": "dbfs:/mnt/foo"}, + ) + task = jobs.Task( + task_key="test", + job_cluster_key="main", + notebook_task=notebook_task, + ) + settings = jobs.JobSettings( + tasks=[task], + name='some', + job_clusters=[ + jobs.JobCluster( + job_cluster_key="main", + new_cluster=compute.ClusterSpec( + spark_version="15.2.x-photon-scala2.12", + node_type_id="Standard_F4s", + num_workers=2, + data_security_mode=compute.DataSecurityMode.LEGACY_TABLE_ACL, + spark_conf={"spark.databricks.cluster.profile": "singleNode"}, + ), + ), + ], + ) + ws.jobs.list.return_value = [Job(job_id=1, settings=settings), Job(job_id=2, settings=settings)] + ws.jobs.get.side_effect = [Job(job_id=1, settings=settings), Job(job_id=2, settings=settings)] + ws.jobs.list_runs.side_effect = [ + [ + BaseRun( + state=jobs.RunState(result_state=RunResultState.SUCCESS), + run_id=1, + job_id=2, + run_page_url="http://example.com", + ) + ], + [], + ] + sql_backend = MockBackend() + jobs_crawler = JobsCrawler(ws, sql_backend, 'test') + directfs_crawler = DirectFsAccessCrawler.for_paths(sql_backend, "test") + used_tables_crawler = UsedTablesCrawler.for_paths(sql_backend, "test") + linter = WorkflowLinter( + ws, + jobs_crawler, + dependency_resolver, + mock_path_lookup, + migration_index, + directfs_crawler, + used_tables_crawler, + ) + linter.refresh_report(sql_backend, 'test', last_run_days=30) + + sql_backend.has_rows_written_for('test.workflow_problems') + sql_backend.has_rows_written_for('hive_metastore.test.used_tables_in_paths') + sql_backend.has_rows_written_for('hive_metastore.test.directfs_in_paths') From 00f21cc1096da508e24793725d2bd4d98730750f Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Mon, 28 Apr 2025 10:00:39 -0400 Subject: [PATCH 2/2] Fixed linting issue. --- src/databricks/labs/ucx/source_code/linters/jobs.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/source_code/linters/jobs.py b/src/databricks/labs/ucx/source_code/linters/jobs.py index 1216e8833f..f8558ad6a4 100644 --- a/src/databricks/labs/ucx/source_code/linters/jobs.py +++ b/src/databricks/labs/ucx/source_code/linters/jobs.py @@ -3,6 +3,7 @@ from collections.abc import Iterable from datetime import datetime, timezone +from functools import partial from pathlib import Path from databricks.labs.blueprint.parallel import Threads @@ -59,8 +60,17 @@ def refresh_report( self, sql_backend: SqlBackend, inventory_database: str, /, last_run_days: int | None = None ) -> None: tasks = [] + + def lint_job_limited(job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]: + return self.lint_job(job_id, last_run_days=last_run_days) + for job in self._jobs_crawler.snapshot(): - tasks.append(lambda: self.lint_job(job.job_id, last_run_days=last_run_days)) + try: + job_id = int(job.job_id) + except ValueError: + logger.warning(f"Invalid job id: {job.job_id}") + continue + tasks.append(partial(lint_job_limited, job_id)) # TODO: Limit Scope logger.info(f"Running {len(tasks)} linting tasks in parallel...") job_results, errors = Threads.gather('linting workflows', tasks)