Skip to content

fix: fixes issues with not raising errors for runs and plans #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 52 additions & 19 deletions dagster_sqlmesh/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def __init__(
self._prefix = prefix
self._context = context
self._logger = context.log
self._tracker = MaterializationTracker(sorted_dag=dag.sorted[:], logger=self._logger)
self._tracker = MaterializationTracker(
sorted_dag=dag.sorted[:], logger=self._logger
)
self._stage = "plan"
self._errors: list[Exception] = []
self._is_testing = is_testing
Expand Down Expand Up @@ -328,8 +330,7 @@ def run(
logger = context.log

controller = self.get_controller(
context_factory=context_factory,
log_override=logger
context_factory=context_factory, log_override=logger
)

with controller.instance(environment) as mesh:
Expand All @@ -341,10 +342,7 @@ def run(
[model.fqn for model, _ in mesh.non_external_models_dag()]
)
selected_models_set, models_map, select_models = (
self._get_selected_models_from_context(
context=context,
models=models
)
self._get_selected_models_from_context(context=context, models=models)
)

if all_available_models == selected_models_set or select_models is None:
Expand All @@ -356,11 +354,32 @@ def run(
else:
logger.info(f"selected models: {select_models}")

event_handler = DagsterSQLMeshEventHandler(
context=context, models_map=models_map, dag=dag,
prefix="sqlmesh: ", is_testing=self.is_testing
event_handler = self.create_event_handler(
context=context,
models_map=models_map,
dag=dag,
prefix="sqlmesh: ",
is_testing=self.is_testing,
)

def raise_for_sqlmesh_errors(
event_handler: DagsterSQLMeshEventHandler,
additional_errors: list[Exception] | None = None,
) -> None:
additional_errors = additional_errors or []
errors = event_handler.errors
if len(errors) + len(additional_errors) == 0:
return
for error in errors:
logger.error(
f"sqlmesh encountered the following error during sqlmesh {event_handler.stage}: {error}"
)
raise PlanOrRunFailedError(
event_handler.stage,
f"sqlmesh failed during {event_handler.stage} with {len(event_handler.errors) + 1} errors",
[*errors, *additional_errors],
)

try:
for event in mesh.plan_and_run(
start=start,
Expand All @@ -376,16 +395,30 @@ def run(
event_handler.process_events(event)
except SQLMeshError as e:
logger.error(f"sqlmesh error: {e}")
errors = event_handler.errors
for error in errors:
logger.error(f"sqlmesh encountered the following error during sqlmesh {event_handler.stage}: {error}")
raise PlanOrRunFailedError(
event_handler.stage,
f"sqlmesh failed during {event_handler.stage} with {len(event_handler.errors) + 1} errors",
[e, *event_handler.errors],
)
raise_for_sqlmesh_errors(event_handler, [GenericSQLMeshError(str(e))])
# Some errors do not raise exceptions immediately, so we need to check
# the event handler for any errors that may have been collected.
raise_for_sqlmesh_errors(event_handler)

yield from event_handler.notify_success(mesh.context)

def create_event_handler(
self,
*,
context: AssetExecutionContext,
dag: DAG[str],
models_map: dict[str, Model],
prefix: str,
is_testing: bool,
) -> DagsterSQLMeshEventHandler:
return DagsterSQLMeshEventHandler(
context=context,
dag=dag,
models_map=models_map,
prefix=prefix,
is_testing=is_testing,
)

def _get_selected_models_from_context(
self, context: AssetExecutionContext, models: MappingProxyType[str, Model]
) -> tuple[set[str], dict[str, Model], list[str] | None]:
Expand Down Expand Up @@ -421,5 +454,5 @@ def get_controller(
return DagsterSQLMeshController.setup_with_config(
config=self.config,
context_factory=context_factory,
log_override=log_override
log_override=log_override,
)
56 changes: 52 additions & 4 deletions dagster_sqlmesh/test_resource.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import typing as t

import dagster as dg

from dagster_sqlmesh.resource import PlanOrRunFailedError
from dagster_sqlmesh.resource import DagsterSQLMeshEventHandler, PlanOrRunFailedError
from dagster_sqlmesh.testing import setup_testing_sqlmesh_test_context


Expand All @@ -14,10 +16,19 @@ def test_sqlmesh_resource_should_report_no_errors(
variables={"enable_model_failure": False}
)
test_context.initialize_test_source()
resource = test_context.create_resource()
resource = test_context.create_resource()

for result in resource.run(dg_context):
pass
success = True
try:
for result in resource.run(dg_context):
pass
except PlanOrRunFailedError as e:
success = False
print(f"Plan or run failed with errors: {e.errors}")
except Exception as e:
success = False
print(f"An unexpected error occurred: {e}")
assert success, "Expected no errors, but an error was raised during the run."


def test_sqlmesh_resource_properly_reports_errors(
Expand Down Expand Up @@ -48,3 +59,40 @@ def test_sqlmesh_resource_properly_reports_errors(

assert caught_failure, "Expected an error to be raised, but it was not."


def test_sqlmesh_resource_properly_reports_errors_not_thrown(
sample_sqlmesh_project: str, sample_sqlmesh_db_path: str
):
dg_context = dg.build_asset_context()
test_context = setup_testing_sqlmesh_test_context(
db_path=sample_sqlmesh_db_path,
project_path=sample_sqlmesh_project,
variables={"enable_model_failure": False}
)
test_context.initialize_test_source()
resource = test_context.create_resource()
def event_handler_factory(*args: t.Any, **kwargs: t.Any) -> DagsterSQLMeshEventHandler:
"""Custom event handler factory for the SQLMesh resource."""
handler = DagsterSQLMeshEventHandler(*args, **kwargs)
# Load it with an error
handler._errors = [Exception("testerror")]
return handler
resource.set_event_handler_factory(event_handler_factory)

caught_failure = False
try:
for result in resource.run(dg_context):
pass
except PlanOrRunFailedError as e:
caught_failure = True

expected_error_found = False
for err in e.errors:
print(f"Found error: {err}")
if "testerror" in str(err):
expected_error_found = True
break
assert expected_error_found, "Expected error 'testerror' not found in the error list."

assert caught_failure, "Expected an error to be raised, but it was not."

41 changes: 38 additions & 3 deletions dagster_sqlmesh/testing/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dagster_sqlmesh.controller.base import PlanOptions, RunOptions
from dagster_sqlmesh.controller.dagster import DagsterSQLMeshController
from dagster_sqlmesh.events import ConsoleRecorder
from dagster_sqlmesh.resource import SQLMeshResource
from dagster_sqlmesh.resource import DagsterSQLMeshEventHandler, SQLMeshResource

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,6 +49,41 @@ def setup_testing_sqlmesh_test_context(
return SQLMeshTestContext(db_path=db_path, context_config=context_config)


class TestSQLMeshResource(SQLMeshResource):
"""A test SQLMesh resource that can be used in tests.

This resource is a subclass of SQLMeshResource and is used to run SQLMesh in tests.
It allows for easy setup and teardown of the SQLMesh context.
"""

def __init__(self, config: SQLMeshContextConfig, is_testing: bool = False):
super().__init__(config=config, is_testing=is_testing)
def default_event_handler_factory(*args: t.Any, **kwargs: t.Any) -> DagsterSQLMeshEventHandler:
"""Default event handler factory for the SQLMesh resource."""
return DagsterSQLMeshEventHandler(*args, **kwargs)
self._event_handler_factory = default_event_handler_factory

def set_event_handler_factory(self, event_handler_factory: t.Callable[..., DagsterSQLMeshEventHandler]) -> None:
"""Set the event handler for the SQLMesh resource.

Args:
event_handler (DagsterSQLMeshEventHandler): The event handler to set.
"""
self._event_handler_factory = event_handler_factory

def create_event_handler(self, *args: t.Any, **kwargs: t.Any) -> DagsterSQLMeshEventHandler:
"""Create a new event handler for the SQLMesh resource.

Args:
*args: Positional arguments to pass to the event handler.
**kwargs: Keyword arguments to pass to the event handler.

Returns:
DagsterSQLMeshEventHandler: The created event handler.
"""
return self._event_handler_factory(*args, **kwargs)


@dataclass
class SQLMeshTestContext:
"""A test context for running SQLMesh"""
Expand All @@ -61,8 +96,8 @@ def create_controller(self) -> DagsterSQLMeshController[Context]:
config=self.context_config,
)

def create_resource(self) -> SQLMeshResource:
return SQLMeshResource(
def create_resource(self) -> TestSQLMeshResource:
return TestSQLMeshResource(
config=self.context_config, is_testing=True,
)

Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.