Skip to content

fix: implement missing functions from abstract class #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions dagster_sqlmesh/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import unittest
import logging

from sqlglot.expressions import Alter
from sqlmesh.core.console import Console
from sqlmesh.core.plan import EvaluatablePlan
from sqlmesh.core.context_diff import ContextDiff
Expand All @@ -16,6 +17,7 @@
SnapshotChangeCategory,
SnapshotInfoLike,
)
from sqlmesh.utils.concurrency import NodeExecutionFailedError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -193,11 +195,35 @@ class LogError:
message: str


@dataclass
class LogWarning:
message: str


@dataclass
class LogSuccess:
message: str


@dataclass
class LogFailedModels:
errors: t.List[NodeExecutionFailedError]


@dataclass
class LogSkippedModels:
snapshot_names: t.Set[str]


@dataclass
class LogDestructiveChange:
snapshot_name: str
dropped_column_names: t.List[str]
alter_expressions: t.List[Alter]
dialect: str
error: bool = True


@dataclass
class LoadingStart:
message: t.Optional[str] = None
Expand Down Expand Up @@ -254,7 +280,11 @@ class ConsoleException:
ShowSQL,
LogStatusUpdate,
LogError,
LogWarning,
LogSuccess,
LogFailedModels,
LogSkippedModels,
LogDestructiveChange,
LoadingStart,
LoadingStop,
ShowSchemaDiff,
Expand Down Expand Up @@ -444,9 +474,32 @@ def log_status_update(self, message: str) -> None:
def log_error(self, message: str) -> None:
self.publish(LogError(message))

def log_warning(self, message):
self.publish(LogWarning(message))

def log_success(self, message: str) -> None:
self.publish(LogSuccess(message))

def log_failed_models(self, errors):
self.publish(LogFailedModels(errors))

def log_skipped_models(self, snapshot_names):
self.publish(LogSkippedModels(snapshot_names))

def log_destructive_change(
self,
snapshot_name,
dropped_column_names,
alter_expressions,
dialect,
error=True,
):
self.publish(
LogDestructiveChange(
snapshot_name, dropped_column_names, alter_expressions, dialect, error
)
)

def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
event_id = uuid.uuid4()
self.publish(LoadingStart(message, event_id))
Expand Down
4 changes: 2 additions & 2 deletions dagster_sqlmesh/controller/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from sqlmesh.core.context import Context
from sqlmesh.core.plan import PlanBuilder
from sqlmesh.core.config import CategorizerConfig
from sqlmesh.core.console import Console
from sqlmesh.core.console import Console, set_console
from sqlmesh.core.model import Model

from ..events import ConsoleGenerator
Expand Down Expand Up @@ -378,10 +378,10 @@ def _create_context(self):
options: t.Dict[str, t.Any] = dict(
paths=self.config.path,
gateway=self.config.gateway,
console=self.console,
)
if self.config.sqlmesh_config:
options["config"] = self.config.sqlmesh_config
set_console(self.console)
return Context(**options)

@contextmanager
Expand Down
4 changes: 2 additions & 2 deletions dagster_sqlmesh/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import typing as t

from sqlmesh.core.scheduler import Scheduler
from sqlmesh.core.scheduler import Scheduler, CompletionStatus


class DagsterSQLMeshScheduler(Scheduler):
Expand All @@ -11,7 +11,7 @@ def __init__(self, selected_snapshots: t.Optional[t.Set[str]], *args, **kwargs):
super().__init__(*args, **kwargs)
self._selected_snapshots: t.Set[str] = selected_snapshots or set()

def run(self, *args, **kwargs) -> bool:
def run(self, *args, **kwargs) -> CompletionStatus:
if len(self._selected_snapshots) > 0:
kwargs["selected_snapshots"] = self._selected_snapshots
return super().run(*args, **kwargs)
Loading