Skip to content

Upgrades to sqlmesh 0.174.2 #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
803 changes: 263 additions & 540 deletions dagster_sqlmesh/console.py

Large diffs are not rendered by default.

19 changes: 9 additions & 10 deletions dagster_sqlmesh/controller/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import TypeVar

from sqlmesh.core.config import CategorizerConfig
from sqlmesh.core.console import Console, set_console
from sqlmesh.core.console import set_console
from sqlmesh.core.context import Context
from sqlmesh.core.model import Model
from sqlmesh.core.plan import PlanBuilder
Expand All @@ -19,8 +19,8 @@
ConsoleEvent,
ConsoleEventHandler,
ConsoleException,
DebugEventConsole,
EventConsole,
Plan,
SnapshotCategorizer,
)
from ..events import ConsoleGenerator
Expand Down Expand Up @@ -215,8 +215,12 @@ def run_sqlmesh_thread(
self.logger.debug("waiting for events")
for event in generator.events(thread):
match event:
case ConsoleException(e):
case ConsoleException(exception=e):
raise e
case Plan(plan_builder=plan_builder, auto_apply=auto_apply):
if auto_apply:
plan_builder.apply()
yield event
case _:
yield event

Expand Down Expand Up @@ -275,7 +279,7 @@ def run_sqlmesh_thread(

for event in generator.events(thread):
match event:
case ConsoleException(e):
case ConsoleException(exception=e):
raise e
case _:
yield event
Expand Down Expand Up @@ -400,25 +404,20 @@ def setup(
cls,
path: str,
gateway: str = "local",
debug_console: Console | None = None,
log_override: logging.Logger | None = None,
) -> "SQLMeshController":
return cls.setup_with_config(
config=SQLMeshContextConfig(path=path, gateway=gateway),
debug_console=debug_console,
log_override=log_override,
)

@classmethod
def setup_with_config(
cls: type[T],
config: SQLMeshContextConfig,
debug_console: Console | None = None,
log_override: logging.Logger | None = None,
) -> T:
console = EventConsole(log_override=log_override)
if debug_console:
console = DebugEventConsole(debug_console)
console = EventConsole(log_override=log_override) # type: ignore
controller = cls(
console=console,
config=config,
Expand Down
1 change: 1 addition & 0 deletions dagster_sqlmesh/controller/dagster.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# pyright: reportPrivateImportUsage=false
import logging

from dagster import AssetDep, AssetKey, AssetOut
Expand Down
14 changes: 7 additions & 7 deletions dagster_sqlmesh/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,29 @@ def __init__(

def __call__(self, event: console.ConsoleEvent) -> None:
match event:
case console.StartPlanEvaluation(evaluatable_plan):
case console.StartPlanEvaluation(plan=evaluatable_plan):
self.logger.debug("Starting plan evaluation")
print(evaluatable_plan.plan_id)
case console.StartEvaluationProgress(
batches, environment_naming_info, default_catalog
batched_intervals=batches, environment_naming_info=environment_naming_info, default_catalog=default_catalog
):
self.logger.debug("STARTING EVALUATION")
self.logger.debug(batches)
self.logger.debug(environment_naming_info)
self.logger.debug(default_catalog)
case console.UpdatePromotionProgress(snapshot, promoted):
case console.UpdatePromotionProgress(snapshot=snapshot, promoted=promoted):
self.logger.debug("UPDATE PROMOTION PROGRESS")
self.logger.debug(snapshot)
self.logger.debug(promoted)
case console.StopPromotionProgress(success):
case console.StopPromotionProgress(success=success):
self.logger.debug("STOP PROMOTION")
self.logger.debug(success)
self._successful = True
case console.StartSnapshotEvaluationProgress(snapshot):
case console.StartSnapshotEvaluationProgress(snapshot=snapshot):
self.logger.debug("START SNAPSHOT EVALUATION")
self.logger.debug(snapshot.name)
case console.UpdateSnapshotEvaluationProgress(
snapshot, batch_idx, duration_ms
snapshot=snapshot, batch_idx=batch_idx, duration_ms=duration_ms
):
self._updated.append(snapshot)
self.logger.debug("UPDATE SNAPSHOT EVALUATION")
Expand All @@ -124,7 +124,7 @@ def __call__(self, event: console.ConsoleEvent) -> None:
self.logger.debug(duration_ms)
case _:
if self._enable_unknown_event_logging:
self.logger.debug("Unhandled event")
self.logger.debug(f"Unhandled event {event.__class__.__name__}")
self.logger.debug(event)

def _show_summary_for(
Expand Down
16 changes: 8 additions & 8 deletions dagster_sqlmesh/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def report_event(self, event: console.ConsoleEvent) -> None:
log_context = self.log_context(event)

match event:
case console.StartPlanEvaluation(plan):
case console.StartPlanEvaluation(plan=plan):
self._tracker.init_complete_update_status(plan.environment.snapshots)
log_context.info(
"Starting Plan Evaluation",
Expand All @@ -169,7 +169,7 @@ def report_event(self, event: console.ConsoleEvent) -> None:
case console.StopPlanEvaluation:
log_context.info("Plan evaluation completed")
case console.StartEvaluationProgress(
batches, environment_naming_info, default_catalog
batched_intervals=batches, environment_naming_info=environment_naming_info, default_catalog=default_catalog
):
self.update_stage("run")
log_context.info(
Expand All @@ -185,7 +185,7 @@ def report_event(self, event: console.ConsoleEvent) -> None:
)
self._tracker.plan(batches)
case console.UpdateSnapshotEvaluationProgress(
snapshot, batch_idx, duration_ms
snapshot=snapshot, batch_idx=batch_idx, duration_ms=duration_ms
):
done, expected = self._tracker.update_plan(snapshot, batch_idx)

Expand All @@ -197,25 +197,25 @@ def report_event(self, event: console.ConsoleEvent) -> None:
"duration_ms": duration_ms,
},
)
case console.LogSuccess(success):
case console.LogSuccess(success=success):
self.update_stage("done")
if success:
log_context.info("sqlmesh ran successfully")
else:
log_context.error("sqlmesh failed")
raise Exception("sqlmesh failed during run")
case console.LogError(message):
case console.LogError(message=message):
log_context.error(
f"sqlmesh reported an error: {message}",
)
case console.LogFailedModels(models):
case console.LogFailedModels(models=models):
if len(models) != 0:
failed_models = "\n".join(
[f"{model!s}\n{model.__cause__!s}" for model in models]
)
log_context.error(f"sqlmesh failed models: {failed_models}")
raise Exception("sqlmesh has failed models")
case console.UpdatePromotionProgress(snapshot, promoted):
case console.UpdatePromotionProgress(snapshot=snapshot, promoted=promoted):
log_context.info(
"Promotion progress update",
{
Expand All @@ -224,7 +224,7 @@ def report_event(self, event: console.ConsoleEvent) -> None:
},
)
self._tracker.update_promotion(snapshot, promoted)
case console.StopPromotionProgress(success):
case console.StopPromotionProgress(success=success):
self._tracker.stop_promotion()
if success:
log_context.info("Promotion completed successfully")
Expand Down
3 changes: 2 additions & 1 deletion dagster_sqlmesh/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import typing as t

from sqlmesh.core.scheduler import CompletionStatus, Scheduler
from sqlmesh.core.scheduler import Scheduler
from sqlmesh.utils import CompletionStatus


class DagsterSQLMeshScheduler(Scheduler):
Expand Down
11 changes: 3 additions & 8 deletions dagster_sqlmesh/testing/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import duckdb
import polars
from sqlmesh.core.console import get_console
from sqlmesh.utils.date import TimeLike

from dagster_sqlmesh.config import SQLMeshContextConfig
Expand All @@ -22,12 +21,9 @@ class SQLMeshTestContext:
db_path: str
context_config: SQLMeshContextConfig

def create_controller(self, enable_debug_console: bool = False):
console = None
if enable_debug_console:
console = get_console()
def create_controller(self):
return DagsterSQLMeshController.setup_with_config(
self.context_config, debug_console=console
self.context_config,
)

def query(self, *args: t.Any, **kwargs: t.Any) -> list[t.Any]:
Expand Down Expand Up @@ -69,7 +65,6 @@ def plan_and_run(
*,
environment: str,
execution_time: TimeLike | None = None,
enable_debug_console: bool = False,
start: TimeLike | None = None,
end: TimeLike | None = None,
select_models: list[str] | None = None,
Expand All @@ -93,7 +88,7 @@ def plan_and_run(
TimeLike can be any time-like object that SQLMesh accepts (datetime, str, etc.).
The function creates a controller and recorder to capture all SQLMesh events during execution.
"""
controller = self.create_controller(enable_debug_console=enable_debug_console)
controller = self.create_controller()
recorder = ConsoleRecorder()
# controller.add_event_handler(ConsoleRecorder())
plan_options = PlanOptions(
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "dagster-sqlmesh"
version = "0.11.2"
version = "0.12.0"
description = ""
authors = [
{name = "Reuven Gonzales", email = "reuven@karibalabs.co"}
Expand All @@ -10,7 +10,7 @@ readme = "README.md"
requires-python = ">=3.11,<3.13"
dependencies = [
"dagster>=1.7.8",
"sqlmesh==0.164.0",
"sqlmesh==0.174.2",
"pytest>=8.3.2",
"pyarrow>=18.0.0",
]
Expand Down
Loading