From 16326e4d979a0bfcdcce9f3103baf3f646347c3e Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 18 Apr 2025 12:50:48 +0000 Subject: [PATCH 1/6] fix!: Update sqlmesh version to 0.174.2 --- dagster_sqlmesh/console.py | 805 ++++++++++------------------- dagster_sqlmesh/controller/base.py | 19 +- dagster_sqlmesh/events.py | 14 +- dagster_sqlmesh/testing/context.py | 11 +- pyproject.toml | 2 +- uv.lock | 57 +- 6 files changed, 334 insertions(+), 574 deletions(-) diff --git a/dagster_sqlmesh/console.py b/dagster_sqlmesh/console.py index e884573..9fd850a 100644 --- a/dagster_sqlmesh/console.py +++ b/dagster_sqlmesh/console.py @@ -1,16 +1,15 @@ +import inspect import logging +import textwrap import typing as t import unittest import uuid -from collections.abc import Callable from dataclasses import dataclass, field from sqlglot.expressions import Alter from sqlmesh.core.console import Console from sqlmesh.core.context_diff import ContextDiff from sqlmesh.core.environment import EnvironmentNamingInfo -from sqlmesh.core.linter.rule import RuleViolation -from sqlmesh.core.model import Model from sqlmesh.core.plan import EvaluatablePlan, PlanBuilder from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory, SnapshotInfoLike from sqlmesh.core.table_diff import RowDiff, SchemaDiff, TableDiff @@ -18,248 +17,208 @@ logger = logging.getLogger(__name__) +@dataclass(kw_only=True) +class BaseConsoleEvent: + unknown_args: dict[str, t.Any] = field(default_factory=dict) -@dataclass -class StartMigrationProgress: +@dataclass(kw_only=True) +class StartMigrationProgress(BaseConsoleEvent): total_tasks: int - -@dataclass -class UpdateMigrationProgress: +@dataclass(kw_only=True) +class UpdateMigrationProgress(BaseConsoleEvent): num_tasks: int - -@dataclass -class StopMigrationProgress: +@dataclass(kw_only=True) +class StopMigrationProgress(BaseConsoleEvent): pass +@dataclass(kw_only=True) +class StartPlanEvaluation(BaseConsoleEvent): + plan: EvaluatablePlan -@dataclass -class StartPlanEvaluation: - evaluatable_plan: EvaluatablePlan - - -@dataclass -class StopPlanEvaluation: +@dataclass(kw_only=True) +class StopPlanEvaluation(BaseConsoleEvent): pass - -@dataclass -class StartEvaluationProgress: - batches: dict[Snapshot, int] +@dataclass(kw_only=True) +class StartEvaluationProgress(BaseConsoleEvent): + batched_intervals: dict[Snapshot, int] environment_naming_info: EnvironmentNamingInfo default_catalog: str | None - -@dataclass -class StartSnapshotEvaluationProgress: +@dataclass(kw_only=True) +class StartSnapshotEvaluationProgress(BaseConsoleEvent): snapshot: Snapshot - -@dataclass -class UpdateSnapshotEvaluationProgress: +@dataclass(kw_only=True) +class UpdateSnapshotEvaluationProgress(BaseConsoleEvent): snapshot: Snapshot batch_idx: int duration_ms: int | None - -@dataclass -class StopEvaluationProgress: +@dataclass(kw_only=True) +class StopEvaluationProgress(BaseConsoleEvent): success: bool = True - -@dataclass -class StartCreationProgress: - total_tasks: int +@dataclass(kw_only=True) +class StartCreationProgress(BaseConsoleEvent): + snapshots: list[Snapshot] environment_naming_info: EnvironmentNamingInfo default_catalog: str | None - -@dataclass -class UpdateCreationProgress: +@dataclass(kw_only=True) +class UpdateCreationProgress(BaseConsoleEvent): snapshot: SnapshotInfoLike - -@dataclass -class StopCreationProgress: +@dataclass(kw_only=True) +class StopCreationProgress(BaseConsoleEvent): success: bool = True - -@dataclass -class StartCleanup: +@dataclass(kw_only=True) +class StartCleanup(BaseConsoleEvent): ignore_ttl: bool - -@dataclass -class UpdateCleanupProgress: +@dataclass(kw_only=True) +class UpdateCleanupProgress(BaseConsoleEvent): object_name: str - -@dataclass -class StopCleanup: +@dataclass(kw_only=True) +class StopCleanup(BaseConsoleEvent): success: bool = True - -@dataclass -class StartPromotionProgress: +@dataclass(kw_only=True) +class StartPromotionProgress(BaseConsoleEvent): total_tasks: int environment_naming_info: EnvironmentNamingInfo default_catalog: str | None - -@dataclass -class UpdatePromotionProgress: +@dataclass(kw_only=True) +class UpdatePromotionProgress(BaseConsoleEvent): snapshot: SnapshotInfoLike promoted: bool - -@dataclass -class StopPromotionProgress: +@dataclass(kw_only=True) +class StopPromotionProgress(BaseConsoleEvent): success: bool = True - -@dataclass -class UpdateSnapshotMigrationProgress: +@dataclass(kw_only=True) +class UpdateSnapshotMigrationProgress(BaseConsoleEvent): num_tasks: int - -@dataclass -class LogMigrationStatus: +@dataclass(kw_only=True) +class LogMigrationStatus(BaseConsoleEvent): success: bool = True - -@dataclass -class StartSnapshotMigrationProgress: +@dataclass(kw_only=True) +class StartSnapshotMigrationProgress(BaseConsoleEvent): total_tasks: int - -@dataclass -class StopSnapshotMigrationProgress: +@dataclass(kw_only=True) +class StopSnapshotMigrationProgress(BaseConsoleEvent): success: bool = True - -@dataclass -class StartEnvMigrationProgress: +@dataclass(kw_only=True) +class StartEnvMigrationProgress(BaseConsoleEvent): total_tasks: int - -@dataclass -class UpdateEnvMigrationProgress: +@dataclass(kw_only=True) +class UpdateEnvMigrationProgress(BaseConsoleEvent): num_tasks: int - -@dataclass -class StopEnvMigrationProgress: +@dataclass(kw_only=True) +class StopEnvMigrationProgress(BaseConsoleEvent): success: bool = True - -@dataclass -class ShowModelDifferenceSummary: +@dataclass(kw_only=True) +class ShowModelDifferenceSummary(BaseConsoleEvent): context_diff: ContextDiff environment_naming_info: EnvironmentNamingInfo default_catalog: str | None no_diff: bool = True - -@dataclass -class PlanEvent: +@dataclass(kw_only=True) +class Plan(BaseConsoleEvent): plan_builder: PlanBuilder auto_apply: bool default_catalog: str | None no_diff: bool = False no_prompts: bool = False - -@dataclass -class LogTestResults: +@dataclass(kw_only=True) +class LogTestResults(BaseConsoleEvent): result: unittest.result.TestResult output: str | None target_dialect: str - -@dataclass -class ShowSQL: +@dataclass(kw_only=True) +class ShowSQL(BaseConsoleEvent): sql: str - -@dataclass -class LogStatusUpdate: +@dataclass(kw_only=True) +class LogStatusUpdate(BaseConsoleEvent): message: str - -@dataclass -class LogError: +@dataclass(kw_only=True) +class LogError(BaseConsoleEvent): message: str - -@dataclass -class LogWarning: +@dataclass(kw_only=True) +class LogWarning(BaseConsoleEvent): short_message: str long_message: str | None = None - -@dataclass -class LogSuccess: +@dataclass(kw_only=True) +class LogSuccess(BaseConsoleEvent): message: str - -@dataclass -class LogFailedModels: +@dataclass(kw_only=True) +class LogFailedModels(BaseConsoleEvent): errors: list[NodeExecutionFailedError[str]] - -@dataclass -class LogSkippedModels: +@dataclass(kw_only=True) +class LogSkippedModels(BaseConsoleEvent): snapshot_names: set[str] - -@dataclass -class LogDestructiveChange: +@dataclass(kw_only=True) +class LogDestructiveChange(BaseConsoleEvent): snapshot_name: str dropped_column_names: list[str] alter_expressions: list[Alter] dialect: str error: bool = True - -@dataclass -class LoadingStart: +@dataclass(kw_only=True) +class LoadingStart(BaseConsoleEvent): message: str | None = None id: uuid.UUID = field(default_factory=uuid.uuid4) - -@dataclass -class LoadingStop: +@dataclass(kw_only=True) +class LoadingStop(BaseConsoleEvent): id: uuid.UUID - -@dataclass -class ShowSchemaDiff: +@dataclass(kw_only=True) +class ShowSchemaDiff(BaseConsoleEvent): schema_diff: SchemaDiff - -@dataclass -class ShowRowDiff: +@dataclass(kw_only=True) +class ShowRowDiff(BaseConsoleEvent): row_diff: RowDiff show_sample: bool = True skip_grain_check: bool = False - -@dataclass -class ConsoleException: +@dataclass(kw_only=True) +class ConsoleException(BaseConsoleEvent): exception: Exception - -@dataclass -class PrintEnvironments: +@dataclass(kw_only=True) +class PrintEnvironments(BaseConsoleEvent): environments_summary: dict[str, int] - -@dataclass -class ShowTableDiffSummary: +@dataclass(kw_only=True) +class ShowTableDiffSummary(BaseConsoleEvent): table_diff: TableDiff - ConsoleEvent = ( StartPlanEvaluation | StopPlanEvaluation @@ -273,7 +232,7 @@ class ShowTableDiffSummary: | StartCleanup | UpdateCleanupProgress | StopCleanup - | StartPromotionProgress + #| StartPromotionProgress | UpdatePromotionProgress | StopPromotionProgress | UpdateSnapshotMigrationProgress @@ -283,7 +242,7 @@ class ShowTableDiffSummary: | UpdateEnvMigrationProgress | StopEnvMigrationProgress | ShowModelDifferenceSummary - | PlanEvent + | Plan | LogTestResults | ShowSQL | LogStatusUpdate @@ -306,27 +265,123 @@ class ShowTableDiffSummary: | ShowTableDiffSummary ) -ConsoleEventHandler = Callable[[ConsoleEvent], None] +ConsoleEventHandler = t.Callable[[ConsoleEvent], None] SnapshotCategorizer = t.Callable[ [Snapshot, PlanBuilder, str | None], SnapshotChangeCategory ] +T = t.TypeVar("T") + +def get_console_event_by_name( + event_name: str, +) -> type[ConsoleEvent] | None: + """Get the console event class by name.""" + known_events_classes = t.get_args(ConsoleEvent) + console_event_map: dict[str, type[ConsoleEvent]] = { + event.__name__: event for event in known_events_classes + } + return console_event_map.get(event_name) + +class IntrospectingConsole(Console): + """An event console that dynamically implements methods based on the current + sqlmesh console object. If a method is specified it's validated against the + current sqlmesh version's implementation""" + + events: t.ClassVar[list[type[ConsoleEvent]]] + + def __init_subclass__(cls): + super().__init_subclass__() + + known_events_classes = cls.events + known_events: list[str] = [] + for known_event in known_events_classes: + assert inspect.isclass(known_event), "event must be a class" + known_events.append(known_event.__name__) + + + # Iterate through all the available abstract methods in console + for method_name in Console.__abstractmethods__: + # Check if the method is not already implemented + if hasattr(cls, method_name): + if not getattr(getattr(cls, method_name), '__isabstractmethod__', False): + logger.debug(f"Skipping {method_name} as it is abstract") + continue + logger.debug(f"Checking {method_name}") + + # if the method doesn't exist we automatically create a method by + # inspecting the method's arguments. Anything that matches "known" + # events has it's values checked. The dataclass should define the + # required fields and everything else should be sent to a catchall + # argument in the dataclass for the event + + # Convert method name from snake_case to camel case + camel_case_method_name = "".join( + word.capitalize() + for i, word in enumerate(method_name.split("_")) + ) -class EventConsole(Console): - """ - A console implementation that manages and publishes events related to - SQLMesh operations. The sqlmesh console implementation is mostly for it's - CLI application and doesn't take into account using sqlmesh as a library. - This event pub/sub interface allows us to capture events and choose how we - wish to handle it with N number of handlers. - - This class extends the Console class and provides functionality to handle - various events during SQLMesh processes such as plan evaluation, creation, - promotion, migration, and testing. - """ - - categorizer: SnapshotCategorizer | None = None + if camel_case_method_name in known_events: + logger.debug(f"Creating {method_name} for {camel_case_method_name}") + signature = inspect.signature(getattr(Console, method_name)) + handler = cls.create_event_handler(method_name, camel_case_method_name, signature) + setattr(cls, method_name, handler) + else: + logger.debug(f"Creating {method_name} for unknown event") + signature = inspect.signature(getattr(Console, method_name)) + handler = cls.create_unknown_event_handler(method_name, signature) + setattr(cls, method_name, handler) + + @classmethod + def create_event_handler(cls, method_name: str, event_name: str, signature: inspect.Signature): + func_signature, call_params = cls.create_signatures_and_params(signature) + + event_handler_str = textwrap.dedent(f""" + def {method_name}({", ".join(func_signature)}): + self.publish_known_event('{event_name}', {", ".join(call_params)}) + """) + print(event_handler_str) + exec(event_handler_str) + return t.cast(t.Callable[[t.Any], t.Any], locals()[method_name]) + + @classmethod + def create_signatures_and_params(cls, signature: inspect.Signature): + func_signature: list[str] = [] + call_params: list[str] = [] + for param_name, param in signature.parameters.items(): + if param_name == "self": + func_signature.append("self") + continue + + if param.default is inspect._empty: + param_type_name = param.annotation + if not isinstance(param_type_name, str): + param_type_name = param_type_name.__name__ + func_signature.append(f"{param_name}: '{param_type_name}'") + print(func_signature) + else: + default_value = param.default + param_type_name = param.annotation + if not isinstance(param_type_name, str): + param_type_name = param_type_name.__name__ + if isinstance(param.default, str): + default_value = f"'{param.default}'" + func_signature.append(f"{param_name}: '{param_type_name}' = {default_value}") + print(func_signature) + call_params.append(f"{param_name}={param_name}") + return (func_signature, call_params) + + @classmethod + def create_unknown_event_handler(cls, method_name: str, signature: inspect.Signature): + func_signature, call_params = cls.create_signatures_and_params(signature) + + event_handler_str = textwrap.dedent(f""" + def {method_name}({", ".join(func_signature)}): + self.publish_unknown_event('{method_name}', {", ".join(call_params)}) + """) + print(event_handler_str) + exec(event_handler_str) + return t.cast(t.Callable[[t.Any], t.Any], locals()[method_name]) def __init__(self, log_override: logging.Logger | None = None) -> None: self._handlers: dict[str, ConsoleEventHandler] = {} @@ -335,202 +390,22 @@ def __init__(self, log_override: logging.Logger | None = None) -> None: self.logger.debug(f"EventConsole[{self.id}]: created") self.categorizer = None - def add_snapshot_categorizer(self, categorizer: SnapshotCategorizer) -> None: - self.categorizer = categorizer - - def start_plan_evaluation(self, plan: EvaluatablePlan) -> None: - self.publish(StartPlanEvaluation(plan)) - - def stop_plan_evaluation(self) -> None: - self.publish(StopPlanEvaluation()) + def publish_known_event(self, event_name: str, **kwargs: t.Any) -> None: + console_event = get_console_event_by_name(event_name) + assert console_event is not None, f"Event {event_name} not found" + + expected_kwargs_fields = console_event.__dataclass_fields__ + expected_kwargs: dict[str, t.Any] = {} + unknown_args: dict[str, t.Any] = {} + for key, value in kwargs.items(): + if key not in expected_kwargs_fields: + unknown_args[key] = value + else: + expected_kwargs[key] = value + + event = console_event(**expected_kwargs, unknown_args=unknown_args) - def start_evaluation_progress( - self, - batches: dict[Snapshot, int], - environment_naming_info: EnvironmentNamingInfo, - default_catalog: str | None, - ) -> None: - self.publish( - StartEvaluationProgress(batches, environment_naming_info, default_catalog) - ) - - def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: - self.publish(StartSnapshotEvaluationProgress(snapshot)) - - def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: int | None - ) -> None: - self.publish(UpdateSnapshotEvaluationProgress(snapshot, batch_idx, duration_ms)) - - def stop_evaluation_progress(self, success: bool = True) -> None: - self.publish(StopEvaluationProgress(success)) - - def start_creation_progress( - self, - total_tasks: int, - environment_naming_info: EnvironmentNamingInfo, - default_catalog: str | None, - ) -> None: - self.publish( - StartCreationProgress(total_tasks, environment_naming_info, default_catalog) - ) - - def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: - self.publish(UpdateCreationProgress(snapshot)) - - def stop_creation_progress(self, success: bool = True) -> None: - self.publish(StopCreationProgress(success)) - - def start_cleanup(self, ignore_ttl: bool) -> bool: - event = StartCleanup(ignore_ttl) self.publish(event) - return True # Assuming the cleanup should always proceed, or modify as needed - - def update_cleanup_progress(self, object_name: str) -> None: - self.publish(UpdateCleanupProgress(object_name)) - - def stop_cleanup(self, success: bool = True) -> None: - self.publish(StopCleanup(success)) - - def start_promotion_progress( - self, - total_tasks: int, - environment_naming_info: EnvironmentNamingInfo, - default_catalog: str | None, - ) -> None: - self.publish( - StartPromotionProgress( - total_tasks, environment_naming_info, default_catalog - ) - ) - - def update_promotion_progress( - self, snapshot: SnapshotInfoLike, promoted: bool - ) -> None: - self.publish(UpdatePromotionProgress(snapshot, promoted)) - - def stop_promotion_progress(self, success: bool = True) -> None: - self.publish(StopPromotionProgress(success)) - - def start_snapshot_migration_progress(self, total_tasks: int) -> None: - self.publish(StartSnapshotMigrationProgress(total_tasks)) - - def update_snapshot_migration_progress(self, num_tasks: int) -> None: - self.publish(UpdateSnapshotMigrationProgress(num_tasks)) - - def log_migration_status(self, success: bool = True) -> None: - self.publish(LogMigrationStatus(success)) - - def stop_snapshot_migration_progress(self, success: bool = True) -> None: - self.publish(StopSnapshotMigrationProgress(success)) - - def start_env_migration_progress(self, total_tasks: int) -> None: - self.publish(StartEnvMigrationProgress(total_tasks)) - - def update_env_migration_progress(self, num_tasks: int) -> None: - self.publish(UpdateEnvMigrationProgress(num_tasks)) - - def stop_env_migration_progress(self, success: bool = True) -> None: - self.publish(StopEnvMigrationProgress(success)) - - def show_model_difference_summary( - self, - context_diff: ContextDiff, - environment_naming_info: EnvironmentNamingInfo, - default_catalog: str | None, - no_diff: bool = True, - ) -> None: - self.publish( - ShowModelDifferenceSummary( - context_diff, - environment_naming_info, - default_catalog, - no_diff, - ) - ) - - def plan( - self, - plan_builder: PlanBuilder, - auto_apply: bool, - default_catalog: str | None, - no_diff: bool = False, - no_prompts: bool = False, - ) -> None: - self.logger.debug("building plan created") - plan = plan_builder.build() - self.logger.debug(f"plan created: {plan}") - - for snapshot in plan.uncategorized: - if self.categorizer: - plan_builder.set_choice( - snapshot, self.categorizer(snapshot, plan_builder, default_catalog) - ) - - if auto_apply: - plan_builder.apply() - - def log_test_results( - self, - result: unittest.result.TestResult, - output: str | None, - target_dialect: str, - ) -> None: - self.publish(LogTestResults(result, output, target_dialect)) - - def show_sql(self, sql: str) -> None: - self.publish(ShowSQL(sql)) - - def log_status_update(self, message: str) -> None: - self.publish(LogStatusUpdate(message)) - - def log_error(self, message: str) -> None: - self.publish(LogError(message)) - - def log_warning(self, short_message: str, long_message: str | None = None) -> None: - self.publish(LogWarning(short_message, long_message)) - - def log_success(self, message: str) -> None: - self.publish(LogSuccess(message)) - - def log_failed_models(self, errors: list[NodeExecutionFailedError[str]]) -> None: - self.publish(LogFailedModels(errors)) - - def log_skipped_models(self, snapshot_names: set[str]) -> None: - self.publish(LogSkippedModels(snapshot_names)) - - def log_destructive_change( - self, - snapshot_name: str, - dropped_column_names: list[str], - alter_expressions: list[Alter], - dialect: str, - error: bool = True, - ) -> None: - self.publish( - LogDestructiveChange( - snapshot_name, dropped_column_names, alter_expressions, dialect, error - ) - ) - - def loading_start(self, message: str | None = None) -> uuid.UUID: - event_id = uuid.uuid4() - self.publish(LoadingStart(message, event_id)) - return event_id - - def loading_stop(self, id: uuid.UUID) -> None: - self.publish(LoadingStop(id)) - - def show_schema_diff(self, schema_diff: SchemaDiff) -> None: - self.publish(ShowSchemaDiff(schema_diff)) - - def show_row_diff( - self, - row_diff: RowDiff, - show_sample: bool = True, - skip_grain_check: bool = False, - ) -> None: - self.publish(ShowRowDiff(row_diff, show_sample, skip_grain_check)) def publish(self, event: ConsoleEvent) -> None: self.logger.debug( @@ -539,6 +414,12 @@ def publish(self, event: ConsoleEvent) -> None: for handler in self._handlers.values(): handler(event) + def publish_unknown_event(self, event_name: str, **kwargs: t.Any) -> None: + self.logger.debug( + f"EventConsole[{self.id}]: sending unknown event to {len(self._handlers)}" + ) + self.logger.debug(f"EventConsole[{self.id}]: unknown event {event_name} {kwargs}") + def add_handler(self, handler: ConsoleEventHandler) -> str: handler_id = str(uuid.uuid4()) self.logger.debug(f"EventConsole[{self.id}]: Adding handler {handler_id}") @@ -547,30 +428,49 @@ def add_handler(self, handler: ConsoleEventHandler) -> str: def remove_handler(self, handler_id: str) -> None: del self._handlers[handler_id] + - def exception(self, exc: Exception) -> None: - self.publish(ConsoleException(exc)) +class EventConsole(IntrospectingConsole): + """ + A console implementation that manages and publishes events related to + SQLMesh operations. The sqlmesh console implementation is mostly for it's + CLI application and doesn't take into account using sqlmesh as a library. + This event pub/sub interface allows us to capture events and choose how we + wish to handle it with N number of handlers. - def print_environments(self, environments_summary: dict[str, int]) -> None: - self.publish(PrintEnvironments(environments_summary)) + This class extends the Console class and provides functionality to handle + various events during SQLMesh processes such as plan evaluation, creation, + promotion, migration, and testing. + """ - def show_table_diff_summary(self, table_diff: TableDiff) -> None: - self.publish(ShowTableDiffSummary(table_diff)) + categorizer: SnapshotCategorizer | None = None - def show_linter_violations( - self, - violations: list[RuleViolation], - model: Model, - is_error: bool = False, - ) -> None: - """Show linting violations from SQLMesh. + events: t.ClassVar[list[type[ConsoleEvent]]] = [ + Plan, + StartPlanEvaluation, + StartEvaluationProgress, + UpdatePromotionProgress, + StopPromotionProgress, + StartSnapshotEvaluationProgress, + UpdateSnapshotEvaluationProgress, + LogError, + LogWarning, + LogSuccess, + LogFailedModels, + LogSkippedModels, + LogTestResults, + ConsoleException, + PrintEnvironments, + ShowTableDiffSummary, + ] - Args: - violations: List of linting violations to display - model: The model being linted - is_error: Whether the violations are errors - """ - self.publish(LogWarning("Linting violations found", str(violations))) + def exception(self, exc: Exception) -> None: + self.publish(ConsoleException(exception=exc)) + + def add_snapshot_categorizer( + self, categorizer: SnapshotCategorizer + ) -> None: + self.categorizer = categorizer class DebugEventConsole(EventConsole): @@ -579,178 +479,3 @@ class DebugEventConsole(EventConsole): def __init__(self, console: Console): super().__init__() self._console = console - - def start_plan_evaluation(self, plan: EvaluatablePlan) -> None: - super().start_plan_evaluation(plan) - self._console.start_plan_evaluation(plan) - - def stop_plan_evaluation(self) -> None: - super().stop_plan_evaluation() - self._console.stop_plan_evaluation() - - def start_evaluation_progress( - self, - batches: dict[Snapshot, int], - environment_naming_info: EnvironmentNamingInfo, - default_catalog: str | None, - ) -> None: - super().start_evaluation_progress( - batches, environment_naming_info, default_catalog - ) - self._console.start_evaluation_progress( - batches, environment_naming_info, default_catalog - ) - - def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: - super().start_snapshot_evaluation_progress(snapshot) - self._console.start_snapshot_evaluation_progress(snapshot) - - def update_snapshot_evaluation_progress( - self, snapshot: Snapshot, batch_idx: int, duration_ms: int | None - ) -> None: - super().update_snapshot_evaluation_progress(snapshot, batch_idx, duration_ms) - self._console.update_snapshot_evaluation_progress( - snapshot, batch_idx, duration_ms - ) - - def stop_evaluation_progress(self, success: bool = True) -> None: - super().stop_evaluation_progress(success) - self._console.stop_evaluation_progress(success) - - def start_creation_progress( - self, - total_tasks: int, - environment_naming_info: EnvironmentNamingInfo, - default_catalog: str | None, - ) -> None: - super().start_creation_progress( - total_tasks, environment_naming_info, default_catalog - ) - self._console.start_creation_progress( - total_tasks, environment_naming_info, default_catalog - ) - - def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: - super().update_creation_progress(snapshot) - self._console.update_creation_progress(snapshot) - - def stop_creation_progress(self, success: bool = True) -> None: - super().stop_creation_progress(success) - self._console.stop_creation_progress(success) - - def update_cleanup_progress(self, object_name: str) -> None: - super().update_cleanup_progress(object_name) - self._console.update_cleanup_progress(object_name) - - def start_promotion_progress( - self, - total_tasks: int, - environment_naming_info: EnvironmentNamingInfo, - default_catalog: str | None, - ) -> None: - super().start_promotion_progress( - total_tasks, environment_naming_info, default_catalog - ) - self._console.start_promotion_progress( - total_tasks, environment_naming_info, default_catalog - ) - - def update_promotion_progress( - self, snapshot: SnapshotInfoLike, promoted: bool - ) -> None: - super().update_promotion_progress(snapshot, promoted) - self._console.update_promotion_progress(snapshot, promoted) - - def stop_promotion_progress(self, success: bool = True) -> None: - super().stop_promotion_progress(success) - self._console.stop_promotion_progress(success) - - def show_model_difference_summary( - self, - context_diff: ContextDiff, - environment_naming_info: EnvironmentNamingInfo, - default_catalog: str | None, - no_diff: bool = True, - ) -> None: - super().show_model_difference_summary( - context_diff, - environment_naming_info, - default_catalog, - no_diff, - ) - self._console.show_model_difference_summary( - context_diff, - environment_naming_info, - default_catalog, - no_diff, - # ignored_snapshot_ids, - ) - - def plan( - self, - plan_builder: PlanBuilder, - auto_apply: bool, - default_catalog: str | None, - no_diff: bool = False, - no_prompts: bool = False, - ) -> None: - super().plan(plan_builder, auto_apply, default_catalog, no_diff, no_prompts) - self._console.plan( - plan_builder, auto_apply, default_catalog, no_diff, no_prompts - ) - - def log_test_results( - self, - result: unittest.result.TestResult, - output: str | None, - target_dialect: str, - ) -> None: - super().log_test_results(result, output, target_dialect) - self._console.log_test_results(result, output, target_dialect) - - def show_sql(self, sql: str) -> None: - super().show_sql(sql) - self._console.show_sql(sql) - - def log_status_update(self, message: str) -> None: - super().log_status_update(message) - self._console.log_status_update(message) - - def log_error(self, message: str) -> None: - super().log_error(message) - self._console.log_error(message) - - def log_success(self, message: str) -> None: - super().log_success(message) - self._console.log_success(message) - - def loading_start(self, message: str | None = None) -> uuid.UUID: - event_id = super().loading_start(message) - self._console.loading_start(message) - return event_id - - def loading_stop(self, id: uuid.UUID) -> None: - super().loading_stop(id) - self._console.loading_stop(id) - - def show_schema_diff(self, schema_diff: SchemaDiff) -> None: - super().show_schema_diff(schema_diff) - self._console.show_schema_diff(schema_diff) - - def show_row_diff( - self, - row_diff: RowDiff, - show_sample: bool = True, - skip_grain_check: bool = False, - ) -> None: - super().show_row_diff(row_diff, show_sample) - self._console.show_row_diff(row_diff, show_sample, skip_grain_check) - - def show_linter_violations( - self, - violations: list[RuleViolation], - model: Model, - is_error: bool = False, - ) -> None: - super().show_linter_violations(violations, model, is_error) - self._console.show_linter_violations(violations, model, is_error) diff --git a/dagster_sqlmesh/controller/base.py b/dagster_sqlmesh/controller/base.py index 5d66e1d..25db81a 100644 --- a/dagster_sqlmesh/controller/base.py +++ b/dagster_sqlmesh/controller/base.py @@ -7,7 +7,7 @@ from typing import TypeVar from sqlmesh.core.config import CategorizerConfig -from sqlmesh.core.console import Console, set_console +from sqlmesh.core.console import set_console from sqlmesh.core.context import Context from sqlmesh.core.model import Model from sqlmesh.core.plan import PlanBuilder @@ -19,8 +19,8 @@ ConsoleEvent, ConsoleEventHandler, ConsoleException, - DebugEventConsole, EventConsole, + Plan, SnapshotCategorizer, ) from ..events import ConsoleGenerator @@ -215,8 +215,12 @@ def run_sqlmesh_thread( self.logger.debug("waiting for events") for event in generator.events(thread): match event: - case ConsoleException(e): + case ConsoleException(exception=e): raise e + case Plan(plan_builder=plan_builder, auto_apply=auto_apply): + if auto_apply: + plan_builder.apply() + yield event case _: yield event @@ -275,7 +279,7 @@ def run_sqlmesh_thread( for event in generator.events(thread): match event: - case ConsoleException(e): + case ConsoleException(exception=e): raise e case _: yield event @@ -400,12 +404,10 @@ def setup( cls, path: str, gateway: str = "local", - debug_console: Console | None = None, log_override: logging.Logger | None = None, ) -> "SQLMeshController": return cls.setup_with_config( config=SQLMeshContextConfig(path=path, gateway=gateway), - debug_console=debug_console, log_override=log_override, ) @@ -413,12 +415,9 @@ def setup( def setup_with_config( cls: type[T], config: SQLMeshContextConfig, - debug_console: Console | None = None, log_override: logging.Logger | None = None, ) -> T: - console = EventConsole(log_override=log_override) - if debug_console: - console = DebugEventConsole(debug_console) + console = EventConsole(log_override=log_override) # type: ignore controller = cls( console=console, config=config, diff --git a/dagster_sqlmesh/events.py b/dagster_sqlmesh/events.py index a9b36b6..50a62de 100644 --- a/dagster_sqlmesh/events.py +++ b/dagster_sqlmesh/events.py @@ -93,29 +93,29 @@ def __init__( def __call__(self, event: console.ConsoleEvent) -> None: match event: - case console.StartPlanEvaluation(evaluatable_plan): + case console.StartPlanEvaluation(plan=evaluatable_plan): self.logger.debug("Starting plan evaluation") print(evaluatable_plan.plan_id) case console.StartEvaluationProgress( - batches, environment_naming_info, default_catalog + batched_intervals=batches, environment_naming_info=environment_naming_info, default_catalog=default_catalog ): self.logger.debug("STARTING EVALUATION") self.logger.debug(batches) self.logger.debug(environment_naming_info) self.logger.debug(default_catalog) - case console.UpdatePromotionProgress(snapshot, promoted): + case console.UpdatePromotionProgress(snapshot=snapshot, promoted=promoted): self.logger.debug("UPDATE PROMOTION PROGRESS") self.logger.debug(snapshot) self.logger.debug(promoted) - case console.StopPromotionProgress(success): + case console.StopPromotionProgress(success=success): self.logger.debug("STOP PROMOTION") self.logger.debug(success) self._successful = True - case console.StartSnapshotEvaluationProgress(snapshot): + case console.StartSnapshotEvaluationProgress(snapshot=snapshot): self.logger.debug("START SNAPSHOT EVALUATION") self.logger.debug(snapshot.name) case console.UpdateSnapshotEvaluationProgress( - snapshot, batch_idx, duration_ms + snapshot=snapshot, batch_idx=batch_idx, duration_ms=duration_ms ): self._updated.append(snapshot) self.logger.debug("UPDATE SNAPSHOT EVALUATION") @@ -124,7 +124,7 @@ def __call__(self, event: console.ConsoleEvent) -> None: self.logger.debug(duration_ms) case _: if self._enable_unknown_event_logging: - self.logger.debug("Unhandled event") + self.logger.debug(f"Unhandled event {event.__class__.__name__}") self.logger.debug(event) def _show_summary_for( diff --git a/dagster_sqlmesh/testing/context.py b/dagster_sqlmesh/testing/context.py index 2532c12..5ea6363 100644 --- a/dagster_sqlmesh/testing/context.py +++ b/dagster_sqlmesh/testing/context.py @@ -4,7 +4,6 @@ import duckdb import polars -from sqlmesh.core.console import get_console from sqlmesh.utils.date import TimeLike from dagster_sqlmesh.config import SQLMeshContextConfig @@ -22,12 +21,9 @@ class SQLMeshTestContext: db_path: str context_config: SQLMeshContextConfig - def create_controller(self, enable_debug_console: bool = False): - console = None - if enable_debug_console: - console = get_console() + def create_controller(self): return DagsterSQLMeshController.setup_with_config( - self.context_config, debug_console=console + self.context_config, ) def query(self, *args: t.Any, **kwargs: t.Any) -> list[t.Any]: @@ -69,7 +65,6 @@ def plan_and_run( *, environment: str, execution_time: TimeLike | None = None, - enable_debug_console: bool = False, start: TimeLike | None = None, end: TimeLike | None = None, select_models: list[str] | None = None, @@ -93,7 +88,7 @@ def plan_and_run( TimeLike can be any time-like object that SQLMesh accepts (datetime, str, etc.). The function creates a controller and recorder to capture all SQLMesh events during execution. """ - controller = self.create_controller(enable_debug_console=enable_debug_console) + controller = self.create_controller() recorder = ConsoleRecorder() # controller.add_event_handler(ConsoleRecorder()) plan_options = PlanOptions( diff --git a/pyproject.toml b/pyproject.toml index 46d5c5e..4806901 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ readme = "README.md" requires-python = ">=3.11,<3.13" dependencies = [ "dagster>=1.7.8", - "sqlmesh==0.164.0", + "sqlmesh==0.174.2", "pytest>=8.3.2", "pyarrow>=18.0.0", ] diff --git a/uv.lock b/uv.lock index b5a67d0..15e577f 100644 --- a/uv.lock +++ b/uv.lock @@ -313,7 +313,7 @@ requires-dist = [ { name = "dagster", specifier = ">=1.7.8" }, { name = "pyarrow", specifier = ">=18.0.0" }, { name = "pytest", specifier = ">=8.3.2" }, - { name = "sqlmesh", specifier = "==0.164.0" }, + { name = "sqlmesh", specifier = "==0.174.2" }, ] [package.metadata.requires-dev] @@ -735,6 +735,47 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/62/a1/3d680cbfd5f4b8f15abc1d571870c5fc3e594bb582bc3b64ea099db13e56/jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67", size = 134899 }, ] +[[package]] +name = "json-stream" +version = "2.3.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "json-stream-rs-tokenizer" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/80/06/0a700bbd6aa0cbd97639a0336f1de39cf3adb7555c097da2f3c4ee7aab5a/json_stream-2.3.3.tar.gz", hash = "sha256:894444c68c331174926763e224fb34b7ed3f90749d1c165afd0f5930207534c4", size = 31461 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bc/87/67c0de941a08cdafd85c2048ecec52ff5568c0191ead465e966bea78db79/json_stream-2.3.3-py3-none-any.whl", hash = "sha256:65f08c5031d7df145c6fe89e434b718c1574b2bb84b8a0eea974de90916a089d", size = 31187 }, +] + +[[package]] +name = "json-stream-rs-tokenizer" +version = "0.4.29" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/44/51/783424ed021177ea0bdedf9967b1595a433066d5f6b18828e432b97cf31d/json_stream_rs_tokenizer-0.4.29.tar.gz", hash = "sha256:cfb63413cd38cf887d374b7e8969ec635eeb0cbfd136be0ea0c5eae2bb6f6932", size = 28405 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/6a/03/0f000c90144a6e298a8e9e3b21010c031e1086f1800ce74733b41fee924e/json_stream_rs_tokenizer-0.4.29-cp311-cp311-macosx_10_12_x86_64.whl", hash = "sha256:1251fafde4eacb079cbb5c14176f7f4a674f9423b9c4d80d3ac4689bf87c5160", size = 298185 }, + { url = "https://files.pythonhosted.org/packages/d4/66/febae2faf251310ef8d50e91da95e97c47ebf819c912f19f13e81941bd67/json_stream_rs_tokenizer-0.4.29-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a7ff71388e9eeed0f81dd11f71577d68c7167fd9d557c5e7e90e9e1f63af281f", size = 293246 }, + { url = "https://files.pythonhosted.org/packages/90/8a/bc6fe65181c7699e29bdef10c1133e3d8130ee8537017cf834b13243e7af/json_stream_rs_tokenizer-0.4.29-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bbf77aaadf4b7cd8870eb22a08cad307362735812a71b504917292f28682d897", size = 327917 }, + { url = "https://files.pythonhosted.org/packages/7a/ce/342cf0e8ebe6dce32ca9af8fdcd8030651f3e85c857e0497ec085becaf59/json_stream_rs_tokenizer-0.4.29-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dee14edfe54b3a8115ae8162800b2a212ac9bbff3147336ed98cfe2646166890", size = 330799 }, + { url = "https://files.pythonhosted.org/packages/a0/ff/c681a0dc619deb69005eca3bf976942c1d70ae2a84512771fa65850874cf/json_stream_rs_tokenizer-0.4.29-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6afadb86e0202fb24b3ce20556c7b3fe4f19609137582d5b3c88830b98c96b3d", size = 347838 }, + { url = "https://files.pythonhosted.org/packages/69/97/a5fb6d87eed6d24035d53e8b9ee67a946121e282e3388dcf16261884f67b/json_stream_rs_tokenizer-0.4.29-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:be7eb20c2704e577709f2daf268a3cf27961af8659a81bc43aa5d8d6e217040e", size = 390678 }, + { url = "https://files.pythonhosted.org/packages/ae/fe/e439c8dceec94acee2d4da91568e746ce24f8f0f46bf75cee3ce666c081f/json_stream_rs_tokenizer-0.4.29-cp311-cp311-win32.whl", hash = "sha256:10bf45d53999007c72d380373ad76763a1bcd292ec4239ec11d6ccb08454d12e", size = 172852 }, + { url = "https://files.pythonhosted.org/packages/38/49/ad6c83d87c8214b2e74ae897089f159f6774564a2753f7a34e1026be157a/json_stream_rs_tokenizer-0.4.29-cp311-cp311-win_amd64.whl", hash = "sha256:6aa81843ae40ac5e45cdc8410f0cb7d83a124d7334d8768d23ea1d846153c632", size = 183279 }, + { url = "https://files.pythonhosted.org/packages/48/06/7473731a5ec648cc82cd40a82814033e776ed638830321a3af631e35a888/json_stream_rs_tokenizer-0.4.29-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:4a3585bafaaa09c96f9ae619ef72ea4b5a2bb2c2bf665e8db4dd4548a421f5a9", size = 297961 }, + { url = "https://files.pythonhosted.org/packages/d2/7f/8cb4a4be24fceaa330d54b47508acea9ac1f393b66d7467197d1fea78286/json_stream_rs_tokenizer-0.4.29-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:11751a056e58ca0c37eabcfad1c7564c6f8f61e30906639d638db89e2276b6a5", size = 293163 }, + { url = "https://files.pythonhosted.org/packages/41/d6/f9a7634949211fe6f9286246b4e9a4a33da35fb513348a0262e9e73d9003/json_stream_rs_tokenizer-0.4.29-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:149ba5e57148b5287429d5cad72e63b18c8234fca19ae22761f5e2d2593573f7", size = 328147 }, + { url = "https://files.pythonhosted.org/packages/e0/ef/eb49861509d4f4a94dfc4246396eb2adb05e8040c818ec6e410673931355/json_stream_rs_tokenizer-0.4.29-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:05bcde5e86dcc16adb81536a645172b754cd704548e0fc29131c88eba9badcca", size = 330630 }, + { url = "https://files.pythonhosted.org/packages/bb/8c/eac091e3d13d1bcd2e689f442458e5d761c72db45b5033920e95c3390cc3/json_stream_rs_tokenizer-0.4.29-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4702876cf07d3148b5ef91a205546eace2647e3b29afdd0180ef246d57f1f933", size = 347728 }, + { url = "https://files.pythonhosted.org/packages/79/a5/8afcabffc1e87c5cedbf5f8177468e6cd058870433f94860025a8dab0d0d/json_stream_rs_tokenizer-0.4.29-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:17d74f668b22748c97e8921ca6fb74a40b52bce29d168bd4be5949b1b8afb1b0", size = 390757 }, + { url = "https://files.pythonhosted.org/packages/cc/0b/b6f82b245cf6c3a529143d1cf3d8e27149cf53c20747fe332385bee18627/json_stream_rs_tokenizer-0.4.29-cp312-cp312-win32.whl", hash = "sha256:9e1dfd15a5a8c3acee7ee1c328c96330b4015221f78d34c6903b24ea55b11caa", size = 173220 }, + { url = "https://files.pythonhosted.org/packages/0e/38/250093befb206ab76443cd964b001fbb6485d0df10458e4c71a6660f9f52/json_stream_rs_tokenizer-0.4.29-cp312-cp312-win_amd64.whl", hash = "sha256:96da813d0e9bc0731bd3c3e5f7f6fa1b71324719240536e31ac0a2eabd8af74f", size = 183332 }, + { url = "https://files.pythonhosted.org/packages/37/d3/dfb8dd72c52edd22c5b70bac792da99c5844563a9cd18c13dbcbfebe3cae/json_stream_rs_tokenizer-0.4.29-pp311-pypy311_pp73-macosx_10_15_x86_64.whl", hash = "sha256:a0c1bedc771a32c2717b66aad92c3a381abe992d2c9e615e67cccb79073e897f", size = 290369 }, + { url = "https://files.pythonhosted.org/packages/95/0d/0a7840f626541c6d4c771a4812fc7ecffb2ce28136f4ef5d9bf2449a099c/json_stream_rs_tokenizer-0.4.29-pp311-pypy311_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:036b11c1ce3c292c9de4d51408ac67b0f66bb130d294bef1f414b6a93b970e55", size = 320246 }, + { url = "https://files.pythonhosted.org/packages/0f/ad/48c37a38c6ff6d53a275e62c637a2111f7fc277e62c0d23dcedf21fb8e12/json_stream_rs_tokenizer-0.4.29-pp311-pypy311_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:06890ac299e64b368b7eb2dffb97a634298eff6c21c5d70ba3af8b1318522f2e", size = 323339 }, + { url = "https://files.pythonhosted.org/packages/5b/23/dd9ca0915ad740c67805b753e8544eb7100940844a8362350fe2fbe69100/json_stream_rs_tokenizer-0.4.29-pp311-pypy311_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6f88c09c0e04677c15ab3a39c6ab4f8ca93dffe86d502df8b74ba8d4f6c4ed2d", size = 344486 }, + { url = "https://files.pythonhosted.org/packages/1c/de/56580e882f99c9734613e6028d245c318b9dbb757f4f1b95a0f853aafe02/json_stream_rs_tokenizer-0.4.29-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:f0042d542b29608feee25c40c4d6b84364b560eda9efb8cca958bd803a1a7ce0", size = 176853 }, +] + [[package]] name = "jupyterlab-widgets" version = "3.0.13" @@ -1472,11 +1513,11 @@ wheels = [ [[package]] name = "sqlglot" -version = "26.8.0" +version = "26.13.2" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/63/45/3a63686fdc1f8709f3f9cce733967c3b9a9fa4f8359821db95c2436d7858/sqlglot-26.8.0.tar.gz", hash = "sha256:d67e4e136198f103b793c8c3347f0068b9d9ee3393682fff9a1f8077edd8e8e3", size = 5325732 } +sdist = { url = "https://files.pythonhosted.org/packages/16/ba/1328ee0e4fb34b04d63307a7e9f16413096672388ad3f75b8998833f314e/sqlglot-26.13.2.tar.gz", hash = "sha256:fd0062311af3f38f7e419ce241e457a2ab082225b78539d713670e3246e9fc46", size = 5348477 } wheels = [ - { url = "https://files.pythonhosted.org/packages/dd/04/d376224261983a734c946fddc49e72848007f8483c2e4a07cd79dbe24547/sqlglot-26.8.0-py3-none-any.whl", hash = "sha256:ef2959ab81f7c8e920e14b3a2538f0488456f07ef31ee4f8a68d9dff76509550", size = 450646 }, + { url = "https://files.pythonhosted.org/packages/be/e1/8f6d1804df5615b3f3da37e68c3d25d05f07eb7963f66682832852ed236a/sqlglot-26.13.2-py3-none-any.whl", hash = "sha256:edf8fc6c9a3cc03c64adb6eb4531660acce449f3987af5ce6fd8d3a041468fbb", size = 457629 }, ] [package.optional-dependencies] @@ -1514,7 +1555,7 @@ wheels = [ [[package]] name = "sqlmesh" -version = "0.164.0" +version = "0.174.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "astor" }, @@ -1526,19 +1567,19 @@ dependencies = [ { name = "importlib-metadata", marker = "python_full_version < '3.12'" }, { name = "ipywidgets" }, { name = "jinja2" }, + { name = "json-stream" }, { name = "pandas" }, { name = "pydantic" }, { name = "requests" }, { name = "rich", extra = ["jupyter"] }, { name = "ruamel-yaml" }, - { name = "setuptools", marker = "python_full_version >= '3.12'" }, { name = "sqlglot", extra = ["rs"] }, { name = "tenacity" }, { name = "time-machine" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/24/9a/79c7dae48551d223fa3a7188f80912c37aab0461e32002cc7cd820a9569e/sqlmesh-0.164.0.tar.gz", hash = "sha256:c442cd8da2ce78c8dc333c226d68977541e2b4bd3c65ea37e86447a74e4cf85b", size = 9554288 } +sdist = { url = "https://files.pythonhosted.org/packages/0a/6e/d0961560eb1a045a6ca7ccd1fa419e94759ae3643f3c49976f1d9d971717/sqlmesh-0.174.2.tar.gz", hash = "sha256:e4b4d66447d7c53242bccb48343bfc2553625e646cc9728c5abc0e014deae34f", size = 9677704 } wheels = [ - { url = "https://files.pythonhosted.org/packages/f2/cd/cbdbcf1d03b9483c7fb47d1775962a0b4abcbc85aed4807d8b512467eaf6/sqlmesh-0.164.0-py3-none-any.whl", hash = "sha256:36ce7546f929b86884673fa1be68955f895797f07e0629e312f5cdb1da26416b", size = 5378661 }, + { url = "https://files.pythonhosted.org/packages/cd/11/74b174017db93b6a3b46aecc5992e6133f76653e69cda90013a4de9aaaa8/sqlmesh-0.174.2-py3-none-any.whl", hash = "sha256:b7481b44db0a12ee8e0faf92ee8b8b70dd1123c362e5457c149639042cb35801", size = 5411255 }, ] [[package]] From 1e4dca920337f7b0c1bb8817b393362f7295a19e Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 18 Apr 2025 12:52:43 +0000 Subject: [PATCH 2/6] fix pyright reporting private import --- dagster_sqlmesh/controller/dagster.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dagster_sqlmesh/controller/dagster.py b/dagster_sqlmesh/controller/dagster.py index bb66dd6..87cd3b0 100644 --- a/dagster_sqlmesh/controller/dagster.py +++ b/dagster_sqlmesh/controller/dagster.py @@ -1,3 +1,4 @@ +# pyright: reportPrivateImportUsage=false import logging from dagster import AssetDep, AssetKey, AssetOut From 6badd6d716816fb1a4218e280df4e91bd2e316e0 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 18 Apr 2025 12:58:20 +0000 Subject: [PATCH 3/6] typing fixes --- dagster_sqlmesh/console.py | 2 ++ dagster_sqlmesh/resource.py | 16 ++++++++-------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/dagster_sqlmesh/console.py b/dagster_sqlmesh/console.py index 9fd850a..0e04578 100644 --- a/dagster_sqlmesh/console.py +++ b/dagster_sqlmesh/console.py @@ -448,7 +448,9 @@ class EventConsole(IntrospectingConsole): events: t.ClassVar[list[type[ConsoleEvent]]] = [ Plan, StartPlanEvaluation, + StopPlanEvaluation, StartEvaluationProgress, + StopEvaluationProgress, UpdatePromotionProgress, StopPromotionProgress, StartSnapshotEvaluationProgress, diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index 0a41e41..f192849 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -158,7 +158,7 @@ def report_event(self, event: console.ConsoleEvent) -> None: log_context = self.log_context(event) match event: - case console.StartPlanEvaluation(plan): + case console.StartPlanEvaluation(plan=plan): self._tracker.init_complete_update_status(plan.environment.snapshots) log_context.info( "Starting Plan Evaluation", @@ -169,7 +169,7 @@ def report_event(self, event: console.ConsoleEvent) -> None: case console.StopPlanEvaluation: log_context.info("Plan evaluation completed") case console.StartEvaluationProgress( - batches, environment_naming_info, default_catalog + batched_intervals=batches, environment_naming_info=environment_naming_info, default_catalog=default_catalog ): self.update_stage("run") log_context.info( @@ -185,7 +185,7 @@ def report_event(self, event: console.ConsoleEvent) -> None: ) self._tracker.plan(batches) case console.UpdateSnapshotEvaluationProgress( - snapshot, batch_idx, duration_ms + snapshot=snapshot, batch_idx=batch_idx, duration_ms=duration_ms ): done, expected = self._tracker.update_plan(snapshot, batch_idx) @@ -197,25 +197,25 @@ def report_event(self, event: console.ConsoleEvent) -> None: "duration_ms": duration_ms, }, ) - case console.LogSuccess(success): + case console.LogSuccess(success=success): self.update_stage("done") if success: log_context.info("sqlmesh ran successfully") else: log_context.error("sqlmesh failed") raise Exception("sqlmesh failed during run") - case console.LogError(message): + case console.LogError(message=message): log_context.error( f"sqlmesh reported an error: {message}", ) - case console.LogFailedModels(models): + case console.LogFailedModels(models=models): if len(models) != 0: failed_models = "\n".join( [f"{model!s}\n{model.__cause__!s}" for model in models] ) log_context.error(f"sqlmesh failed models: {failed_models}") raise Exception("sqlmesh has failed models") - case console.UpdatePromotionProgress(snapshot, promoted): + case console.UpdatePromotionProgress(snapshot=snapshot, promoted=promoted): log_context.info( "Promotion progress update", { @@ -224,7 +224,7 @@ def report_event(self, event: console.ConsoleEvent) -> None: }, ) self._tracker.update_promotion(snapshot, promoted) - case console.StopPromotionProgress(success): + case console.StopPromotionProgress(success=success): self._tracker.stop_promotion() if success: log_context.info("Promotion completed successfully") From 809ad1d89bed382575fa5af06d5740acedaad166 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 18 Apr 2025 12:59:44 +0000 Subject: [PATCH 4/6] more type fixes --- dagster_sqlmesh/scheduler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dagster_sqlmesh/scheduler.py b/dagster_sqlmesh/scheduler.py index 89e88f1..8db5fcb 100644 --- a/dagster_sqlmesh/scheduler.py +++ b/dagster_sqlmesh/scheduler.py @@ -1,6 +1,7 @@ import typing as t -from sqlmesh.core.scheduler import CompletionStatus, Scheduler +from sqlmesh.core.scheduler import Scheduler +from sqlmesh.utils import CompletionStatus class DagsterSQLMeshScheduler(Scheduler): From bbe04b352b046a3674ac0f747e90688601ad692d Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 18 Apr 2025 13:00:22 +0000 Subject: [PATCH 5/6] bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 4806901..960c167 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "dagster-sqlmesh" -version = "0.11.2" +version = "0.12.0" description = "" authors = [ {name = "Reuven Gonzales", email = "reuven@karibalabs.co"} From bd36cc9a49a6caf764161a771da9eda61be4e786 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Fri, 18 Apr 2025 13:01:54 +0000 Subject: [PATCH 6/6] remove debugging statements --- dagster_sqlmesh/console.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dagster_sqlmesh/console.py b/dagster_sqlmesh/console.py index 0e04578..37c8195 100644 --- a/dagster_sqlmesh/console.py +++ b/dagster_sqlmesh/console.py @@ -340,7 +340,6 @@ def create_event_handler(cls, method_name: str, event_name: str, signature: insp def {method_name}({", ".join(func_signature)}): self.publish_known_event('{event_name}', {", ".join(call_params)}) """) - print(event_handler_str) exec(event_handler_str) return t.cast(t.Callable[[t.Any], t.Any], locals()[method_name]) @@ -358,7 +357,6 @@ def create_signatures_and_params(cls, signature: inspect.Signature): if not isinstance(param_type_name, str): param_type_name = param_type_name.__name__ func_signature.append(f"{param_name}: '{param_type_name}'") - print(func_signature) else: default_value = param.default param_type_name = param.annotation @@ -367,7 +365,6 @@ def create_signatures_and_params(cls, signature: inspect.Signature): if isinstance(param.default, str): default_value = f"'{param.default}'" func_signature.append(f"{param_name}: '{param_type_name}' = {default_value}") - print(func_signature) call_params.append(f"{param_name}={param_name}") return (func_signature, call_params) @@ -379,7 +376,6 @@ def create_unknown_event_handler(cls, method_name: str, signature: inspect.Signa def {method_name}({", ".join(func_signature)}): self.publish_unknown_event('{method_name}', {", ".join(call_params)}) """) - print(event_handler_str) exec(event_handler_str) return t.cast(t.Callable[[t.Any], t.Any], locals()[method_name])