Skip to content

Added force_refresh parameter for Assessment #4183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Aug 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7703582
Full refresh all crawlers for each assessment run
pritishpai Jun 25, 2025
841c0a0
Add documentation
pritishpai Jun 25, 2025
e34525b
Make force_refresh of assessment parameter driven
pritishpai Jun 25, 2025
d532555
Add a parameter to the assessment job for force_refresh of data
pritishpai Jun 26, 2025
bb55bf3
Use set for membership test
pritishpai Jun 26, 2025
2b9d42f
Fetching permissions returns groups and users that have access.
pritishpai Jun 30, 2025
913edab
Update readme to reflect parameter change
pritishpai Jul 1, 2025
8992380
Add force_refresh flag to ensure-assessment-run for full refresh of a…
pritishpai Jul 3, 2025
039a2bd
Fetch flag value of force_refresh for assessment rerun
pritishpai Jul 3, 2025
30752af
Add integration test, fetch parameter from current job run
pritishpai Jul 3, 2025
3164a46
Update cli command
pritishpai Jul 3, 2025
14280a4
Fix fmt
pritishpai Jul 4, 2025
d6a4d7f
Add job_parameters to the test run_now to imitate adding it to run_wo…
pritishpai Jul 4, 2025
82c5602
Indicate job_parameters argument is intentionally unused
pritishpai Jul 5, 2025
be1342e
Fix error due to list to set comparison
pritishpai Jul 7, 2025
3e3743d
Use set comprehension directly
pritishpai Jul 7, 2025
3745bca
Fix unit tests and cli command
pritishpai Jul 7, 2025
8237277
Update cli and docs
pritishpai Jul 7, 2025
d5b0331
Revert test change (handled in another PR)
pritishpai Jul 28, 2025
4cd9f77
Add new table to dict instead of recreating the entire dict
pritishpai Jul 28, 2025
54c3e7c
Use add for the dict
pritishpai Jul 28, 2025
8ef23e6
Add force_refresh directly as a aparamter and add default value
pritishpai Jul 30, 2025
a8abef5
Finish refactoring force_refresh for other job_tasks
pritishpai Jul 30, 2025
da2bed8
Refactor using next()
pritishpai Jul 30, 2025
4d8a51b
Use default when force_refresh parameter value is None
pritishpai Jul 30, 2025
5a69eba
Add mock iter return value for list_jobs
pritishpai Jul 30, 2025
400251e
Add a fixture for mocking jobs.list_runs as an iterator
pritishpai Jul 30, 2025
eb0e6cb
Remove unused mock
pritishpai Jul 30, 2025
d2d2c53
PR comments for docs
pritishpai Jul 30, 2025
d6eb70f
Directly fetch force_refresh from ctx
pritishpai Jul 31, 2025
9e313ee
Fmt fix
pritishpai Jul 31, 2025
e715b1f
Add force_refresh for the test notebook
pritishpai Aug 1, 2025
6f869c0
Exception handling for test runner notebook
pritishpai Aug 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/ucx/docs/reference/commands/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -731,13 +731,14 @@ are displayed. To display `DEBUG` logs, use the `--debug` flag.
### `ensure-assessment-run`

```commandline
databricks labs ucx ensure-assessment-run
databricks labs ucx ensure-assessment-run [--force-refresh true/false]
```

This command ensures that the [assessment workflow](/docs/reference/workflows#assessment-workflow) was run on a workspace.
This command will block until job finishes.
Failed workflows can be fixed with the [`repair-run` command](/docs/reference/commands#repair-run). Workflows and their status can be
listed with the [`workflows` command](/docs/reference/commands#workflows).
The `--force-refresh` flag can be used to force the assessment workflow to run again overwriting the previous assessment results, even if it was already run before. By default, the assessment workflow will not update the output of previously run assessments.

### `update-migration-progress`

Expand Down
6 changes: 3 additions & 3 deletions docs/ucx/docs/reference/workflows/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ See [this guide](/docs/reference/assessment) for more details.
Proceed to the [group migration workflow](/docs/reference/workflows#group-migration-workflow) below or go back to the
[migration process diagram](/docs/process/).

The UCX assessment workflow is designed to only run once, re-running will **not** update the existing results. If the
inventory and findings for a workspace need to be updated then first reinstall UCX by [uninstalling](/docs/installation#uninstall-ucx)
and [installing](/docs/installation) it again.
> ⚠️ Caution: To fully refresh the UCX assessment workflow and overwrite existing results, set the force_refresh parameter to true (or True).
For large workspaces, this process can be time-consuming and resource-intensive. Only use this option if a complete reassessment is absolutely necessary.
Alternatively, you can re run assessment using the [command](/docs/reference/commands#ensure-assessment-run) with the `force-refresh` parameter set to `true` to overwrite existing results.

## Group migration workflow

Expand Down
3 changes: 3 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ commands:
- name: run-as-collection
description: (Optional) Whether to check (and run if necessary) the assessment for the collection of workspaces
with ucx installed. Default is false.
- name: force-refresh
description: (Optional) Force a full refresh of the assessment job, even if it was run recently.
Default is false.

- name: update-migration-progress
description: trigger the `migration-progress-experimental` job to refresh the inventory that tracks the workspace
Expand Down
52 changes: 32 additions & 20 deletions src/databricks/labs/ucx/assessment/workflows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

from databricks.sdk.service.jobs import JobParameterDefinition

from databricks.labs.ucx.contexts.workflow_task import RuntimeContext
from databricks.labs.ucx.framework.tasks import Workflow, job_task

Expand All @@ -9,23 +11,33 @@

class Assessment(Workflow): # pylint: disable=too-many-public-methods
def __init__(self):
super().__init__('assessment')
super().__init__('assessment', [JobParameterDefinition(name="force_refresh", default=False)])

@staticmethod
def _get_force_refresh(ctx: RuntimeContext) -> bool:
"""Extracts the force_refresh parameter from the named parameters of the context."""
force_refresh = False
value = ctx.named_parameters.get("force_refresh")
if isinstance(value, str):
force_refresh = value.lower() in {"true", "1"}
logger.info(f"Using force refresh value of: {force_refresh}")
return force_refresh

@job_task
def crawl_tables(self, ctx: RuntimeContext):
"""Iterates over all tables in the Hive Metastore of the current workspace and persists their metadata, such
as _database name_, _table name_, _table type_, _table location_, etc., in the Delta table named
`$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata
stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
cannot easily be migrated to Unity Catalog."""
ctx.tables_crawler.snapshot()
ctx.tables_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def crawl_udfs(self, ctx: RuntimeContext):
"""Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the
table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for
issues with grants that cannot be migrated to Unit Catalog."""
ctx.udfs_crawler.snapshot()
ctx.udfs_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task(job_cluster="tacl")
def setup_tacl(self, ctx: RuntimeContext):
Expand All @@ -40,15 +52,15 @@ def crawl_grants(self, ctx: RuntimeContext):

Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table
ACLs enabled and available for retrieval."""
ctx.grants_crawler.snapshot()
ctx.grants_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task(depends_on=[crawl_tables])
def estimate_table_size_for_migration(self, ctx: RuntimeContext):
"""Scans the previously created Delta table named `$inventory_database.tables` and locate tables that cannot be
"synced". These tables will have to be cloned in the migration process.
Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes.
The table size is a factor in deciding whether to clone these tables."""
ctx.table_size_crawler.snapshot()
ctx.table_size_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def crawl_mounts(self, ctx: RuntimeContext):
Expand All @@ -58,7 +70,7 @@ def crawl_mounts(self, ctx: RuntimeContext):

The assessment involves scanning the workspace to compile a list of all existing mount points and subsequently
storing this information in the `$inventory.mounts` table. This is crucial for planning the migration."""
ctx.mounts_crawler.snapshot()
ctx.mounts_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task(depends_on=[crawl_mounts, crawl_tables])
def guess_external_locations(self, ctx: RuntimeContext):
Expand All @@ -70,7 +82,7 @@ def guess_external_locations(self, ctx: RuntimeContext):
- Extracting all the locations associated with tables that do not use DBFS directly, but a mount point instead
- Scanning all these locations to identify folders that can act as shared path prefixes
- These identified external locations will be created subsequently prior to the actual table migration"""
ctx.external_locations.snapshot()
ctx.external_locations.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def assess_jobs(self, ctx: RuntimeContext):
Expand All @@ -83,7 +95,7 @@ def assess_jobs(self, ctx: RuntimeContext):
- Clusters with incompatible Spark config tags
- Clusters referencing DBFS locations in one or more config options
"""
ctx.jobs_crawler.snapshot()
ctx.jobs_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def assess_clusters(self, ctx: RuntimeContext):
Expand All @@ -96,7 +108,7 @@ def assess_clusters(self, ctx: RuntimeContext):
- Clusters with incompatible spark config tags
- Clusters referencing DBFS locations in one or more config options
"""
ctx.clusters_crawler.snapshot()
ctx.clusters_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def assess_pipelines(self, ctx: RuntimeContext):
Expand All @@ -109,7 +121,7 @@ def assess_pipelines(self, ctx: RuntimeContext):

Subsequently, a list of all the pipelines with matching configurations are stored in the
`$inventory.pipelines` table."""
ctx.pipelines_crawler.snapshot()
ctx.pipelines_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def assess_incompatible_submit_runs(self, ctx: RuntimeContext):
Expand All @@ -122,7 +134,7 @@ def assess_incompatible_submit_runs(self, ctx: RuntimeContext):
It also combines several submit runs under a single pseudo_id based on hash of the submit run configuration.
Subsequently, a list of all the incompatible runs with failures are stored in the
`$inventory.submit_runs` table."""
ctx.submit_runs_crawler.snapshot()
ctx.submit_runs_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def crawl_cluster_policies(self, ctx: RuntimeContext):
Expand All @@ -133,7 +145,7 @@ def crawl_cluster_policies(self, ctx: RuntimeContext):

Subsequently, a list of all the policies with matching configurations are stored in the
`$inventory.policies` table."""
ctx.policies_crawler.snapshot()
ctx.policies_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task(cloud="azure")
def assess_azure_service_principals(self, ctx: RuntimeContext):
Expand All @@ -147,7 +159,7 @@ def assess_azure_service_principals(self, ctx: RuntimeContext):
Subsequently, the list of all the Azure Service Principals referred in those configurations are saved
in the `$inventory.azure_service_principals` table."""
if ctx.is_azure:
ctx.azure_service_principal_crawler.snapshot()
ctx.azure_service_principal_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def assess_global_init_scripts(self, ctx: RuntimeContext):
Expand All @@ -156,7 +168,7 @@ def assess_global_init_scripts(self, ctx: RuntimeContext):

It looks in:
- the list of all the global init scripts are saved in the `$inventory.global_init_scripts` table."""
ctx.global_init_scripts_crawler.snapshot()
ctx.global_init_scripts_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def workspace_listing(self, ctx: RuntimeContext):
Expand All @@ -168,7 +180,7 @@ def workspace_listing(self, ctx: RuntimeContext):
if not ctx.config.use_legacy_permission_migration:
logger.info("Skipping workspace listing as legacy permission migration is disabled.")
return
ctx.workspace_listing.snapshot()
ctx.workspace_listing.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task(depends_on=[crawl_grants, workspace_listing])
def crawl_permissions(self, ctx: RuntimeContext):
Expand All @@ -182,22 +194,22 @@ def crawl_permissions(self, ctx: RuntimeContext):
return
permission_manager = ctx.permission_manager
permission_manager.reset()
permission_manager.snapshot()
permission_manager.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def crawl_groups(self, ctx: RuntimeContext):
"""Scans all groups for the local group migration scope"""
ctx.group_manager.snapshot()
ctx.group_manager.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def crawl_redash_dashboards(self, ctx: RuntimeContext):
"""Scans all Redash dashboards."""
ctx.redash_crawler.snapshot()
ctx.redash_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task
def crawl_lakeview_dashboards(self, ctx: RuntimeContext):
"""Scans all Lakeview dashboards."""
ctx.lakeview_crawler.snapshot()
ctx.lakeview_crawler.snapshot(force_refresh=self._get_force_refresh(ctx))

@job_task(depends_on=[crawl_redash_dashboards, crawl_lakeview_dashboards])
def assess_dashboards(self, ctx: RuntimeContext):
Expand Down
13 changes: 10 additions & 3 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,20 +219,27 @@ def validate_external_locations(


@ucx.command
def ensure_assessment_run(w: WorkspaceClient, run_as_collection: bool = False, a: AccountClient | None = None):
def ensure_assessment_run(
w: WorkspaceClient, run_as_collection: bool = False, a: AccountClient | None = None, force_refresh: bool = False
):
"""ensure the assessment job was run on a workspace"""
workspace_contexts = _get_workspace_contexts(w, a, run_as_collection)
for ctx in workspace_contexts:
workspace_id = ctx.workspace_client.get_workspace_id()
logger.info(f"Checking assessment workflow in workspace: {workspace_id}")
deployed_workflows = ctx.deployed_workflows
# Note: will block if the workflow is already underway but not completed.
if deployed_workflows.validate_step("assessment"):
if deployed_workflows.validate_step("assessment") and not force_refresh:
logger.info(f"The assessment workflow has successfully completed in workspace: {workspace_id}")
elif force_refresh:
logger.info(f"Re-running assessment workflow in workspace: {workspace_id}")
deployed_workflows.run_workflow(
"assessment", skip_job_wait=run_as_collection, named_parameters={"force_refresh": "true"}
)
else:
logger.info(f"Starting assessment workflow in workspace: {workspace_id}")
# If running for a collection, don't wait for each assessment job to finish as that will take a long time.
deployed_workflows.run_workflow("assessment", skip_job_wait=run_as_collection)
# If running for a collection, don't wait for each assessment job to finish as that will take a long time.


@ucx.command
Expand Down
10 changes: 8 additions & 2 deletions src/databricks/labs/ucx/framework/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
from databricks.sdk.core import Config
from databricks.sdk.service.jobs import CronSchedule
from databricks.sdk.service.jobs import CronSchedule, JobParameterDefinition

from databricks.labs.ucx.config import WorkspaceConfig

Expand Down Expand Up @@ -65,8 +65,9 @@ def parse_args(*argv) -> dict[str, str]:


class Workflow:
def __init__(self, name: str):
def __init__(self, name: str, named_parameters: list[JobParameterDefinition] | None = None):
self._name = name
self._named_parameters = named_parameters

@property
def name(self):
Expand All @@ -77,6 +78,11 @@ def schedule(self) -> CronSchedule | None:
"""The default (cron) schedule for this workflow, or None if it is not scheduled."""
return None

@property
def parameters(self) -> list[JobParameterDefinition] | None:
"""Named parameters for this workflow, or None if there are no parameters."""
return self._named_parameters

def tasks(self) -> Iterable[Task]:
# return __task__ from every method in this class that has this attribute
for attr in dir(self):
Expand Down
21 changes: 18 additions & 3 deletions src/databricks/labs/ucx/installer/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,21 @@

from databricks.labs.ucx.runtime import main

force_refresh = "false"
try:
force_refresh = dbutils.widgets.get("force_refresh")
except Exception:
pass

main(f'--config=/Workspace{config_file}',
f'--workflow=' + dbutils.widgets.get('workflow'),
f'--task=' + dbutils.widgets.get('task'),
f'--job_id=' + dbutils.widgets.get('job_id'),
f'--run_id=' + dbutils.widgets.get('run_id'),
f'--start_time=' + dbutils.widgets.get('start_time'),
f'--attempt=' + dbutils.widgets.get('attempt'),
f'--parent_run_id=' + dbutils.widgets.get('parent_run_id'))
f'--parent_run_id=' + dbutils.widgets.get('parent_run_id'),
f'--force_refresh=' + force_refresh)
"""

EXPORT_TO_EXCEL_NOTEBOOK = """# Databricks notebook source
Expand Down Expand Up @@ -249,13 +256,20 @@ def __init__(self, ws: WorkspaceClient, install_state: InstallState):
self._ws = ws
self._install_state = install_state

def run_workflow(self, step: str, skip_job_wait: bool = False, max_wait: timedelta = timedelta(minutes=20)) -> int:
def run_workflow(
self,
step: str,
skip_job_wait: bool = False,
max_wait: timedelta = timedelta(minutes=20),
named_parameters: dict[str, str] | None = None,
) -> int:
# this dunder variable is hiding this method from tracebacks, making it cleaner
# for the user to see the actual error without too much noise.
__tracebackhide__ = True # pylint: disable=unused-variable
job_id = int(self._install_state.jobs[step])
logger.debug(f"starting {step} job: {self._ws.config.host}#job/{job_id}")
job_initial_run = self._ws.jobs.run_now(job_id)
logger.info(f"Named parameters for {step} job: {named_parameters}")
job_initial_run = self._ws.jobs.run_now(job_id, job_parameters=named_parameters)
run_id = job_initial_run.run_id
if not run_id:
raise NotFound(f"job run not found for {step}")
Expand Down Expand Up @@ -855,6 +869,7 @@ def _job_settings(self, workflow_name: str, remote_wheels: list[str]) -> dict[st
"email_notifications": email_notifications,
"schedule": workflow.schedule,
"tasks": job_tasks,
"parameters": workflow.parameters,
}

def _job_task(self, task: Task, remote_wheels: list[str]) -> jobs.Task:
Expand Down
11 changes: 11 additions & 0 deletions tests/integration/assessment/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,14 @@ def test_running_real_assessment_job(
query = f"SELECT * FROM {installation_ctx.inventory_database}.workflow_problems"
workflow_problems_without_path = [problem for problem in sql_backend.fetch(query) if problem["path"] == "UNKNOWN"]
assert not workflow_problems_without_path

post_assessment_table = installation_ctx.make_table(schema_name=source_schema.name, ctas="SELECT 1 AS one")
installation_ctx.deployed_workflows.run_workflow(
workflow, skip_job_wait=False, named_parameters={"force_refresh": "true"}
)
assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}"
expected_tables.add(post_assessment_table.name)

query = f"SELECT * FROM {installation_ctx.inventory_database}.tables"
tables_after_assessment_rerun = {table.name for table in sql_backend.fetch(query)}
assert tables_after_assessment_rerun == expected_tables
7 changes: 6 additions & 1 deletion tests/unit/assessment/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
from databricks.labs.ucx.assessment.workflows import Assessment, Failing


def test_assess_azure_service_principals(run_workflow):
@pytest.fixture(autouse=True)
def mock_list_runs(ws):
ws.jobs.list_runs.return_value = iter([])


def test_assess_azure_service_principals(ws, run_workflow):
sql_backend = create_autospec(SqlBackend)
sql_backend.fetch.return_value = [
["1", "secret_scope", "secret_key", "tenant_id", "storage_account"],
Expand Down
1 change: 1 addition & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def inner(cb, **replace) -> RuntimeContext:
get_table_mock
)
# pylint: enable=protected-access

current_task(ctx)
return ctx

Expand Down
Loading