|
10 | 10 | from sqlmesh.core.console import Console, set_console
|
11 | 11 | from sqlmesh.core.context import Context
|
12 | 12 | from sqlmesh.core.model import Model
|
13 |
| -from sqlmesh.core.plan import PlanBuilder |
| 13 | +from sqlmesh.core.plan import Plan, PlanBuilder |
14 | 14 | from sqlmesh.utils.dag import DAG
|
15 | 15 | from sqlmesh.utils.date import TimeLike
|
16 | 16 |
|
@@ -128,19 +128,117 @@ def __init__(
|
128 | 128 | self.logger = logger
|
129 | 129 |
|
130 | 130 | @contextmanager
|
131 |
| - def console_context( |
132 |
| - self, handler: ConsoleEventHandler |
133 |
| - ) -> t.Iterator[None]: |
| 131 | + def console_context(self, handler: ConsoleEventHandler) -> t.Iterator[None]: |
134 | 132 | id = self.console.add_handler(handler)
|
135 | 133 | yield
|
136 | 134 | self.console.remove_handler(id)
|
137 | 135 |
|
| 136 | + def _get_plan_summary(self, plan: Plan) -> str: |
| 137 | + """ |
| 138 | + Get a summary of the plan and return it as a string. |
| 139 | +
|
| 140 | + Args: |
| 141 | + plan (Plan): The plan to summarize. |
| 142 | +
|
| 143 | + Returns: |
| 144 | + str: A summary of the plan. |
| 145 | + """ |
| 146 | + directly_modified = len(plan.directly_modified) |
| 147 | + indirectly_modified = sum( |
| 148 | + len(deps) for deps in plan.indirectly_modified.values() |
| 149 | + ) |
| 150 | + |
| 151 | + plan_summary = [ |
| 152 | + "SQLMesh Plan Summary:", |
| 153 | + f"• Models: {directly_modified} direct changes, {indirectly_modified} indirect changes", |
| 154 | + "• Direct Modifications:", |
| 155 | + ] |
| 156 | + |
| 157 | + # Add directly modified models |
| 158 | + for model in sorted(plan.directly_modified): |
| 159 | + plan_summary.append(f" - {model}") |
| 160 | + |
| 161 | + # Add indirectly modified models and their parents |
| 162 | + if plan.indirectly_modified: |
| 163 | + plan_summary.append("• Indirect Modifications:") |
| 164 | + for parent, children in sorted(plan.indirectly_modified.items()): |
| 165 | + plan_summary.append(f" - Due to {parent}:") |
| 166 | + for child in sorted(children): |
| 167 | + plan_summary.append(f" • {child}") |
| 168 | + |
| 169 | + # Add restatements if any exist |
| 170 | + if plan.restatements: |
| 171 | + plan_summary.append("• Restatements:") |
| 172 | + for model, interval in sorted(plan.restatements.items()): |
| 173 | + plan_summary.append(f" - {model}: {interval}") |
| 174 | + |
| 175 | + plan_summary.extend( |
| 176 | + [ |
| 177 | + f"• Time Range: {plan.provided_start or 'default'} → {plan.provided_end or 'default'}", |
| 178 | + "• Configuration:", |
| 179 | + f" - Skip Backfill: {plan.skip_backfill}", |
| 180 | + f" - Forward Only: {plan.forward_only}", |
| 181 | + f" - No Gaps: {plan.no_gaps}", |
| 182 | + f" - Include Unmodified: {plan.include_unmodified}", |
| 183 | + f" - Empty Backfill: {plan.empty_backfill}", |
| 184 | + f" - End Bounded: {plan.end_bounded}", |
| 185 | + f" - Is Dev Environment: {plan.is_dev}", |
| 186 | + ] |
| 187 | + ) |
| 188 | + |
| 189 | + if plan.skip_backfill: |
| 190 | + plan_summary.append("• Backfill: DISABLED (skip_backfill=True)") |
| 191 | + else: |
| 192 | + plan_summary.append("• Backfill:") |
| 193 | + if plan.selected_models_to_backfill: |
| 194 | + plan_summary.append( |
| 195 | + f" - User Selected: {sorted(plan.selected_models_to_backfill)}" |
| 196 | + ) |
| 197 | + if plan.models_to_backfill: |
| 198 | + additional = plan.models_to_backfill - ( |
| 199 | + plan.selected_models_to_backfill or set() |
| 200 | + ) |
| 201 | + if additional: |
| 202 | + plan_summary.append(f" - Auto-detected: {sorted(additional)}") |
| 203 | + if not (plan.selected_models_to_backfill or plan.models_to_backfill): |
| 204 | + plan_summary.append(" - None required") |
| 205 | + |
| 206 | + return "\n".join(plan_summary) |
| 207 | + |
| 208 | + def _get_builder( |
| 209 | + self, context: Context, environment: str, plan_options: PlanOptions |
| 210 | + ) -> PlanBuilder: |
| 211 | + return context.plan_builder( |
| 212 | + environment=environment, |
| 213 | + **plan_options, |
| 214 | + ) |
| 215 | + |
| 216 | + def _build_plan( |
| 217 | + self, |
| 218 | + builder: PlanBuilder, |
| 219 | + ) -> Plan: |
| 220 | + """Build a SQLMesh plan without applying it. |
| 221 | +
|
| 222 | + Args: |
| 223 | + builder: PlanBuilder instance to use for building the plan |
| 224 | +
|
| 225 | + Returns: |
| 226 | + Plan |
| 227 | + """ |
| 228 | + plan: Plan = builder.build() |
| 229 | + plan_str = self._get_plan_summary(plan) |
| 230 | + |
| 231 | + logger.debug("dagster-sqlmesh: plan") |
| 232 | + logger.info(f"Plan Summary: {plan_str}") |
| 233 | + |
| 234 | + return plan |
| 235 | + |
138 | 236 | def plan(
|
139 | 237 | self,
|
140 | 238 | categorizer: SnapshotCategorizer | None = None,
|
141 | 239 | default_catalog: str | None = None,
|
142 | 240 | **plan_options: t.Unpack[PlanOptions],
|
143 |
| - ) -> t.Iterator[ConsoleEvent]: |
| 241 | + ) -> t.Generator[ConsoleEvent, None, None]: |
144 | 242 | """
|
145 | 243 | Executes a sqlmesh plan operation in a separate thread and yields
|
146 | 244 | console events.
|
@@ -176,14 +274,22 @@ def run_sqlmesh_thread(
|
176 | 274 | ) -> None:
|
177 | 275 | logger.debug("dagster-sqlmesh: thread started")
|
178 | 276 | try:
|
179 |
| - builder = t.cast( |
180 |
| - PlanBuilder, |
181 |
| - context.plan_builder( |
182 |
| - environment=environment, |
183 |
| - **plan_options, |
184 |
| - ), |
| 277 | + builder: PlanBuilder = self._get_builder( |
| 278 | + context=context, |
| 279 | + environment=environment, |
| 280 | + plan_options=plan_options, |
185 | 281 | )
|
| 282 | + |
| 283 | + plan: Plan = self._build_plan( |
| 284 | + builder=builder, |
| 285 | + ) |
| 286 | + plan_str = self._get_plan_summary(plan) |
| 287 | + |
| 288 | + print(f"plan_str: {plan_str}") |
| 289 | + |
186 | 290 | logger.debug("dagster-sqlmesh: plan")
|
| 291 | + logger.info(f"Plan Summary: {plan_str}") |
| 292 | + |
187 | 293 | controller.console.plan(
|
188 | 294 | builder,
|
189 | 295 | auto_apply=True,
|
@@ -224,9 +330,7 @@ def run_sqlmesh_thread(
|
224 | 330 |
|
225 | 331 | thread.join()
|
226 | 332 |
|
227 |
| - def run( |
228 |
| - self, **run_options: t.Unpack[RunOptions] |
229 |
| - ) -> t.Iterator[ConsoleEvent]: |
| 333 | + def run(self, **run_options: t.Unpack[RunOptions]) -> t.Iterator[ConsoleEvent]: |
230 | 334 | """Executes sqlmesh run in a separate thread with console output.
|
231 | 335 |
|
232 | 336 | This method executes SQLMesh operations in a dedicated thread while
|
@@ -295,7 +399,7 @@ def plan_and_run(
|
295 | 399 | end: TimeLike | None = None,
|
296 | 400 | categorizer: SnapshotCategorizer | None = None,
|
297 | 401 | default_catalog: str | None = None,
|
298 |
| - plan_options: PlanOptions | None= None, |
| 402 | + plan_options: PlanOptions | None = None, |
299 | 403 | run_options: RunOptions | None = None,
|
300 | 404 | skip_run: bool = False,
|
301 | 405 | ) -> t.Iterator[ConsoleEvent]:
|
|
0 commit comments