Skip to content

Commit fe14877

Browse files
authored
Upgrades to sqlmesh 0.174.2 (#36)
* fix!: Update sqlmesh version to 0.174.2 * fix pyright reporting private import * typing fixes * more type fixes * bump version * remove debugging statements
1 parent 9d0be7f commit fe14877

File tree

9 files changed

+344
-584
lines changed

9 files changed

+344
-584
lines changed

dagster_sqlmesh/console.py

Lines changed: 263 additions & 540 deletions
Large diffs are not rendered by default.

dagster_sqlmesh/controller/base.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from typing import TypeVar
88

99
from sqlmesh.core.config import CategorizerConfig
10-
from sqlmesh.core.console import Console, set_console
10+
from sqlmesh.core.console import set_console
1111
from sqlmesh.core.context import Context
1212
from sqlmesh.core.model import Model
1313
from sqlmesh.core.plan import PlanBuilder
@@ -19,8 +19,8 @@
1919
ConsoleEvent,
2020
ConsoleEventHandler,
2121
ConsoleException,
22-
DebugEventConsole,
2322
EventConsole,
23+
Plan,
2424
SnapshotCategorizer,
2525
)
2626
from ..events import ConsoleGenerator
@@ -215,8 +215,12 @@ def run_sqlmesh_thread(
215215
self.logger.debug("waiting for events")
216216
for event in generator.events(thread):
217217
match event:
218-
case ConsoleException(e):
218+
case ConsoleException(exception=e):
219219
raise e
220+
case Plan(plan_builder=plan_builder, auto_apply=auto_apply):
221+
if auto_apply:
222+
plan_builder.apply()
223+
yield event
220224
case _:
221225
yield event
222226

@@ -275,7 +279,7 @@ def run_sqlmesh_thread(
275279

276280
for event in generator.events(thread):
277281
match event:
278-
case ConsoleException(e):
282+
case ConsoleException(exception=e):
279283
raise e
280284
case _:
281285
yield event
@@ -400,25 +404,20 @@ def setup(
400404
cls,
401405
path: str,
402406
gateway: str = "local",
403-
debug_console: Console | None = None,
404407
log_override: logging.Logger | None = None,
405408
) -> "SQLMeshController":
406409
return cls.setup_with_config(
407410
config=SQLMeshContextConfig(path=path, gateway=gateway),
408-
debug_console=debug_console,
409411
log_override=log_override,
410412
)
411413

412414
@classmethod
413415
def setup_with_config(
414416
cls: type[T],
415417
config: SQLMeshContextConfig,
416-
debug_console: Console | None = None,
417418
log_override: logging.Logger | None = None,
418419
) -> T:
419-
console = EventConsole(log_override=log_override)
420-
if debug_console:
421-
console = DebugEventConsole(debug_console)
420+
console = EventConsole(log_override=log_override) # type: ignore
422421
controller = cls(
423422
console=console,
424423
config=config,

dagster_sqlmesh/controller/dagster.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# pyright: reportPrivateImportUsage=false
12
import logging
23

34
from dagster import AssetDep, AssetKey, AssetOut

dagster_sqlmesh/events.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,29 +93,29 @@ def __init__(
9393

9494
def __call__(self, event: console.ConsoleEvent) -> None:
9595
match event:
96-
case console.StartPlanEvaluation(evaluatable_plan):
96+
case console.StartPlanEvaluation(plan=evaluatable_plan):
9797
self.logger.debug("Starting plan evaluation")
9898
print(evaluatable_plan.plan_id)
9999
case console.StartEvaluationProgress(
100-
batches, environment_naming_info, default_catalog
100+
batched_intervals=batches, environment_naming_info=environment_naming_info, default_catalog=default_catalog
101101
):
102102
self.logger.debug("STARTING EVALUATION")
103103
self.logger.debug(batches)
104104
self.logger.debug(environment_naming_info)
105105
self.logger.debug(default_catalog)
106-
case console.UpdatePromotionProgress(snapshot, promoted):
106+
case console.UpdatePromotionProgress(snapshot=snapshot, promoted=promoted):
107107
self.logger.debug("UPDATE PROMOTION PROGRESS")
108108
self.logger.debug(snapshot)
109109
self.logger.debug(promoted)
110-
case console.StopPromotionProgress(success):
110+
case console.StopPromotionProgress(success=success):
111111
self.logger.debug("STOP PROMOTION")
112112
self.logger.debug(success)
113113
self._successful = True
114-
case console.StartSnapshotEvaluationProgress(snapshot):
114+
case console.StartSnapshotEvaluationProgress(snapshot=snapshot):
115115
self.logger.debug("START SNAPSHOT EVALUATION")
116116
self.logger.debug(snapshot.name)
117117
case console.UpdateSnapshotEvaluationProgress(
118-
snapshot, batch_idx, duration_ms
118+
snapshot=snapshot, batch_idx=batch_idx, duration_ms=duration_ms
119119
):
120120
self._updated.append(snapshot)
121121
self.logger.debug("UPDATE SNAPSHOT EVALUATION")
@@ -124,7 +124,7 @@ def __call__(self, event: console.ConsoleEvent) -> None:
124124
self.logger.debug(duration_ms)
125125
case _:
126126
if self._enable_unknown_event_logging:
127-
self.logger.debug("Unhandled event")
127+
self.logger.debug(f"Unhandled event {event.__class__.__name__}")
128128
self.logger.debug(event)
129129

130130
def _show_summary_for(

dagster_sqlmesh/resource.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def report_event(self, event: console.ConsoleEvent) -> None:
158158
log_context = self.log_context(event)
159159

160160
match event:
161-
case console.StartPlanEvaluation(plan):
161+
case console.StartPlanEvaluation(plan=plan):
162162
self._tracker.init_complete_update_status(plan.environment.snapshots)
163163
log_context.info(
164164
"Starting Plan Evaluation",
@@ -169,7 +169,7 @@ def report_event(self, event: console.ConsoleEvent) -> None:
169169
case console.StopPlanEvaluation:
170170
log_context.info("Plan evaluation completed")
171171
case console.StartEvaluationProgress(
172-
batches, environment_naming_info, default_catalog
172+
batched_intervals=batches, environment_naming_info=environment_naming_info, default_catalog=default_catalog
173173
):
174174
self.update_stage("run")
175175
log_context.info(
@@ -185,7 +185,7 @@ def report_event(self, event: console.ConsoleEvent) -> None:
185185
)
186186
self._tracker.plan(batches)
187187
case console.UpdateSnapshotEvaluationProgress(
188-
snapshot, batch_idx, duration_ms
188+
snapshot=snapshot, batch_idx=batch_idx, duration_ms=duration_ms
189189
):
190190
done, expected = self._tracker.update_plan(snapshot, batch_idx)
191191

@@ -197,25 +197,25 @@ def report_event(self, event: console.ConsoleEvent) -> None:
197197
"duration_ms": duration_ms,
198198
},
199199
)
200-
case console.LogSuccess(success):
200+
case console.LogSuccess(success=success):
201201
self.update_stage("done")
202202
if success:
203203
log_context.info("sqlmesh ran successfully")
204204
else:
205205
log_context.error("sqlmesh failed")
206206
raise Exception("sqlmesh failed during run")
207-
case console.LogError(message):
207+
case console.LogError(message=message):
208208
log_context.error(
209209
f"sqlmesh reported an error: {message}",
210210
)
211-
case console.LogFailedModels(models):
211+
case console.LogFailedModels(models=models):
212212
if len(models) != 0:
213213
failed_models = "\n".join(
214214
[f"{model!s}\n{model.__cause__!s}" for model in models]
215215
)
216216
log_context.error(f"sqlmesh failed models: {failed_models}")
217217
raise Exception("sqlmesh has failed models")
218-
case console.UpdatePromotionProgress(snapshot, promoted):
218+
case console.UpdatePromotionProgress(snapshot=snapshot, promoted=promoted):
219219
log_context.info(
220220
"Promotion progress update",
221221
{
@@ -224,7 +224,7 @@ def report_event(self, event: console.ConsoleEvent) -> None:
224224
},
225225
)
226226
self._tracker.update_promotion(snapshot, promoted)
227-
case console.StopPromotionProgress(success):
227+
case console.StopPromotionProgress(success=success):
228228
self._tracker.stop_promotion()
229229
if success:
230230
log_context.info("Promotion completed successfully")

dagster_sqlmesh/scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import typing as t
22

3-
from sqlmesh.core.scheduler import CompletionStatus, Scheduler
3+
from sqlmesh.core.scheduler import Scheduler
4+
from sqlmesh.utils import CompletionStatus
45

56

67
class DagsterSQLMeshScheduler(Scheduler):

dagster_sqlmesh/testing/context.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import duckdb
66
import polars
7-
from sqlmesh.core.console import get_console
87
from sqlmesh.utils.date import TimeLike
98

109
from dagster_sqlmesh.config import SQLMeshContextConfig
@@ -22,12 +21,9 @@ class SQLMeshTestContext:
2221
db_path: str
2322
context_config: SQLMeshContextConfig
2423

25-
def create_controller(self, enable_debug_console: bool = False):
26-
console = None
27-
if enable_debug_console:
28-
console = get_console()
24+
def create_controller(self):
2925
return DagsterSQLMeshController.setup_with_config(
30-
self.context_config, debug_console=console
26+
self.context_config,
3127
)
3228

3329
def query(self, *args: t.Any, **kwargs: t.Any) -> list[t.Any]:
@@ -69,7 +65,6 @@ def plan_and_run(
6965
*,
7066
environment: str,
7167
execution_time: TimeLike | None = None,
72-
enable_debug_console: bool = False,
7368
start: TimeLike | None = None,
7469
end: TimeLike | None = None,
7570
select_models: list[str] | None = None,
@@ -93,7 +88,7 @@ def plan_and_run(
9388
TimeLike can be any time-like object that SQLMesh accepts (datetime, str, etc.).
9489
The function creates a controller and recorder to capture all SQLMesh events during execution.
9590
"""
96-
controller = self.create_controller(enable_debug_console=enable_debug_console)
91+
controller = self.create_controller()
9792
recorder = ConsoleRecorder()
9893
# controller.add_event_handler(ConsoleRecorder())
9994
plan_options = PlanOptions(

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "dagster-sqlmesh"
3-
version = "0.11.2"
3+
version = "0.12.0"
44
description = ""
55
authors = [
66
{name = "Reuven Gonzales", email = "reuven@karibalabs.co"}
@@ -10,7 +10,7 @@ readme = "README.md"
1010
requires-python = ">=3.11,<3.13"
1111
dependencies = [
1212
"dagster>=1.7.8",
13-
"sqlmesh==0.164.0",
13+
"sqlmesh==0.174.2",
1414
"pytest>=8.3.2",
1515
"pyarrow>=18.0.0",
1616
]

0 commit comments

Comments
 (0)