Skip to content

Commit 996b782

Browse files
committed
Addressed issues, added tests.
1 parent 5a09b25 commit 996b782

File tree

4 files changed

+90
-7
lines changed

4 files changed

+90
-7
lines changed

src/databricks/labs/ucx/assessment/jobs.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class JobInfo:
4242
job_name: str | None = None
4343
creator: str | None = None
4444
"""User-name of the creator of the pipeline, if known."""
45+
last_run: int | None = None
46+
"""Timestamp of the last run of the pipeline, if known."""
4547

4648
__id_attributes__: ClassVar[tuple[str, ...]] = ("job_id",)
4749

@@ -54,6 +56,7 @@ def from_job(cls, job: Job):
5456
failures="[]",
5557
job_name=job_name,
5658
creator=job.creator_user_name or None,
59+
last_run=None,
5760
)
5861

5962

src/databricks/labs/ucx/progress/workflows.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def assess_workflows(self, ctx: RuntimeContext):
181181
"""Scans all jobs for migration issues in notebooks.
182182
Also stores direct filesystem accesses for display in the migration dashboard."""
183183
# TODO: Ensure these are captured in the history log.
184-
ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database)
184+
ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database, last_run_days=30)
185185

186186
@job_task(
187187
depends_on=[

src/databricks/labs/ucx/source_code/linters/jobs.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import dataclasses
2-
import functools
32
import logging
43

54
from collections.abc import Iterable
@@ -56,10 +55,13 @@ def __init__(
5655
self._directfs_crawler = directfs_crawler
5756
self._used_tables_crawler = used_tables_crawler
5857

59-
def refresh_report(self, sql_backend: SqlBackend, inventory_database: str) -> None:
58+
def refresh_report(
59+
self, sql_backend: SqlBackend, inventory_database: str, /, last_run_days: int | None = None
60+
) -> None:
6061
tasks = []
6162
for job in self._jobs_crawler.snapshot():
62-
tasks.append(functools.partial(self.lint_job, job.job_id))
63+
tasks.append(lambda: self.lint_job(job.job_id, last_run_days=last_run_days))
64+
# TODO: Limit Scope
6365
logger.info(f"Running {len(tasks)} linting tasks in parallel...")
6466
job_results, errors = Threads.gather('linting workflows', tasks)
6567
job_problems: list[JobProblem] = []
@@ -82,12 +84,26 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str) -> No
8284
error_messages = "\n".join([str(error) for error in errors])
8385
logger.warning(f"Errors occurred during linting:\n{error_messages}")
8486

85-
def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]:
87+
def lint_job(
88+
self, job_id: int, /, last_run_days: int | None = None
89+
) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]:
8690
try:
8791
job = self._ws.jobs.get(job_id)
8892
except NotFound:
8993
logger.warning(f'Could not find job: {job_id}')
90-
return ([], [], [])
94+
return [], [], []
95+
96+
if last_run_days:
97+
current_day_ms = int(datetime.now().timestamp() * 1000)
98+
last_run_day_ms = current_day_ms - (last_run_days * 24 * 60 * 60 * 1000)
99+
runs = self._ws.jobs.list_runs(
100+
job_id=job_id,
101+
limit=1,
102+
start_time_from=last_run_day_ms,
103+
)
104+
if not runs:
105+
logger.warning(f'Could not find job runs in the last {last_run_days} days: {job_id}')
106+
return [], [], []
91107

92108
problems, dfsas, tables = self._lint_job(job)
93109
if len(problems) > 0:

tests/unit/source_code/test_jobs.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from databricks.sdk import WorkspaceClient
1111
from databricks.sdk.errors import NotFound
1212
from databricks.sdk.service import compute, jobs
13-
from databricks.sdk.service.jobs import Job, SparkPythonTask
13+
from databricks.sdk.service.jobs import Job, SparkPythonTask, BaseRun, RunResultState
1414
from databricks.sdk.service.pipelines import (
1515
GetPipelineResponse,
1616
FileLibrary,
@@ -589,3 +589,67 @@ def test_workflow_linter_refresh_report(dependency_resolver, mock_path_lookup, m
589589
sql_backend.has_rows_written_for('test.workflow_problems')
590590
sql_backend.has_rows_written_for('hive_metastore.test.used_tables_in_paths')
591591
sql_backend.has_rows_written_for('hive_metastore.test.directfs_in_paths')
592+
593+
594+
def test_workflow_linter_refresh_report_time_bound(dependency_resolver, mock_path_lookup, migration_index) -> None:
595+
ws = create_autospec(WorkspaceClient)
596+
ws.workspace.get_status.return_value = ObjectInfo(object_id=123, language=Language.PYTHON)
597+
some_things = mock_path_lookup.resolve(Path("functional/zoo.py"))
598+
assert some_things is not None
599+
ws.workspace.download.return_value = some_things.read_bytes()
600+
notebook_task = jobs.NotebookTask(
601+
notebook_path=some_things.absolute().as_posix(),
602+
base_parameters={"a": "b", "c": "dbfs:/mnt/foo"},
603+
)
604+
task = jobs.Task(
605+
task_key="test",
606+
job_cluster_key="main",
607+
notebook_task=notebook_task,
608+
)
609+
settings = jobs.JobSettings(
610+
tasks=[task],
611+
name='some',
612+
job_clusters=[
613+
jobs.JobCluster(
614+
job_cluster_key="main",
615+
new_cluster=compute.ClusterSpec(
616+
spark_version="15.2.x-photon-scala2.12",
617+
node_type_id="Standard_F4s",
618+
num_workers=2,
619+
data_security_mode=compute.DataSecurityMode.LEGACY_TABLE_ACL,
620+
spark_conf={"spark.databricks.cluster.profile": "singleNode"},
621+
),
622+
),
623+
],
624+
)
625+
ws.jobs.list.return_value = [Job(job_id=1, settings=settings), Job(job_id=2, settings=settings)]
626+
ws.jobs.get.side_effect = [Job(job_id=1, settings=settings), Job(job_id=2, settings=settings)]
627+
ws.jobs.list_runs.side_effect = [
628+
[
629+
BaseRun(
630+
state=jobs.RunState(result_state=RunResultState.SUCCESS),
631+
run_id=1,
632+
job_id=2,
633+
run_page_url="http://example.com",
634+
)
635+
],
636+
[],
637+
]
638+
sql_backend = MockBackend()
639+
jobs_crawler = JobsCrawler(ws, sql_backend, 'test')
640+
directfs_crawler = DirectFsAccessCrawler.for_paths(sql_backend, "test")
641+
used_tables_crawler = UsedTablesCrawler.for_paths(sql_backend, "test")
642+
linter = WorkflowLinter(
643+
ws,
644+
jobs_crawler,
645+
dependency_resolver,
646+
mock_path_lookup,
647+
migration_index,
648+
directfs_crawler,
649+
used_tables_crawler,
650+
)
651+
linter.refresh_report(sql_backend, 'test', last_run_days=30)
652+
653+
sql_backend.has_rows_written_for('test.workflow_problems')
654+
sql_backend.has_rows_written_for('hive_metastore.test.used_tables_in_paths')
655+
sql_backend.has_rows_written_for('hive_metastore.test.directfs_in_paths')

0 commit comments

Comments
 (0)