Skip to content

Commit dfeafdb

Browse files
committed
fix: fixes issues with not raising errors for runs and plans
Some errors don't raise errors immediately, this catches those class of errors
1 parent 092e46e commit dfeafdb

File tree

4 files changed

+143
-27
lines changed

4 files changed

+143
-27
lines changed

dagster_sqlmesh/resource.py

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,9 @@ def __init__(
146146
self._prefix = prefix
147147
self._context = context
148148
self._logger = context.log
149-
self._tracker = MaterializationTracker(sorted_dag=dag.sorted[:], logger=self._logger)
149+
self._tracker = MaterializationTracker(
150+
sorted_dag=dag.sorted[:], logger=self._logger
151+
)
150152
self._stage = "plan"
151153
self._errors: list[Exception] = []
152154
self._is_testing = is_testing
@@ -328,8 +330,7 @@ def run(
328330
logger = context.log
329331

330332
controller = self.get_controller(
331-
context_factory=context_factory,
332-
log_override=logger
333+
context_factory=context_factory, log_override=logger
333334
)
334335

335336
with controller.instance(environment) as mesh:
@@ -341,10 +342,7 @@ def run(
341342
[model.fqn for model, _ in mesh.non_external_models_dag()]
342343
)
343344
selected_models_set, models_map, select_models = (
344-
self._get_selected_models_from_context(
345-
context=context,
346-
models=models
347-
)
345+
self._get_selected_models_from_context(context=context, models=models)
348346
)
349347

350348
if all_available_models == selected_models_set or select_models is None:
@@ -356,11 +354,32 @@ def run(
356354
else:
357355
logger.info(f"selected models: {select_models}")
358356

359-
event_handler = DagsterSQLMeshEventHandler(
360-
context=context, models_map=models_map, dag=dag,
361-
prefix="sqlmesh: ", is_testing=self.is_testing
357+
event_handler = self.create_event_handler(
358+
context=context,
359+
models_map=models_map,
360+
dag=dag,
361+
prefix="sqlmesh: ",
362+
is_testing=self.is_testing,
362363
)
363364

365+
def raise_for_sqlmesh_errors(
366+
event_handler: DagsterSQLMeshEventHandler,
367+
additional_errors: list[Exception] | None = None,
368+
) -> None:
369+
additional_errors = additional_errors or []
370+
errors = event_handler.errors
371+
if len(errors) + len(additional_errors) == 0:
372+
return
373+
for error in errors:
374+
logger.error(
375+
f"sqlmesh encountered the following error during sqlmesh {event_handler.stage}: {error}"
376+
)
377+
raise PlanOrRunFailedError(
378+
event_handler.stage,
379+
f"sqlmesh failed during {event_handler.stage} with {len(event_handler.errors) + 1} errors",
380+
[*errors, *additional_errors],
381+
)
382+
364383
try:
365384
for event in mesh.plan_and_run(
366385
start=start,
@@ -376,16 +395,30 @@ def run(
376395
event_handler.process_events(event)
377396
except SQLMeshError as e:
378397
logger.error(f"sqlmesh error: {e}")
379-
errors = event_handler.errors
380-
for error in errors:
381-
logger.error(f"sqlmesh encountered the following error during sqlmesh {event_handler.stage}: {error}")
382-
raise PlanOrRunFailedError(
383-
event_handler.stage,
384-
f"sqlmesh failed during {event_handler.stage} with {len(event_handler.errors) + 1} errors",
385-
[e, *event_handler.errors],
386-
)
398+
raise_for_sqlmesh_errors(event_handler, [GenericSQLMeshError(str(e))])
399+
# Some errors do not raise exceptions immediately, so we need to check
400+
# the event handler for any errors that may have been collected.
401+
raise_for_sqlmesh_errors(event_handler)
402+
387403
yield from event_handler.notify_success(mesh.context)
388404

405+
def create_event_handler(
406+
self,
407+
*,
408+
context: AssetExecutionContext,
409+
dag: DAG[str],
410+
models_map: dict[str, Model],
411+
prefix: str,
412+
is_testing: bool,
413+
) -> DagsterSQLMeshEventHandler:
414+
return DagsterSQLMeshEventHandler(
415+
context=context,
416+
dag=dag,
417+
models_map=models_map,
418+
prefix=prefix,
419+
is_testing=is_testing,
420+
)
421+
389422
def _get_selected_models_from_context(
390423
self, context: AssetExecutionContext, models: MappingProxyType[str, Model]
391424
) -> tuple[set[str], dict[str, Model], list[str] | None]:
@@ -421,5 +454,5 @@ def get_controller(
421454
return DagsterSQLMeshController.setup_with_config(
422455
config=self.config,
423456
context_factory=context_factory,
424-
log_override=log_override
457+
log_override=log_override,
425458
)

dagster_sqlmesh/test_resource.py

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import typing as t
2+
13
import dagster as dg
24

3-
from dagster_sqlmesh.resource import PlanOrRunFailedError
5+
from dagster_sqlmesh.resource import DagsterSQLMeshEventHandler, PlanOrRunFailedError
46
from dagster_sqlmesh.testing import setup_testing_sqlmesh_test_context
57

68

@@ -14,10 +16,19 @@ def test_sqlmesh_resource_should_report_no_errors(
1416
variables={"enable_model_failure": False}
1517
)
1618
test_context.initialize_test_source()
17-
resource = test_context.create_resource()
19+
resource = test_context.create_resource()
1820

19-
for result in resource.run(dg_context):
20-
pass
21+
success = True
22+
try:
23+
for result in resource.run(dg_context):
24+
pass
25+
except PlanOrRunFailedError as e:
26+
success = False
27+
print(f"Plan or run failed with errors: {e.errors}")
28+
except Exception as e:
29+
success = False
30+
print(f"An unexpected error occurred: {e}")
31+
assert success, "Expected no errors, but an error was raised during the run."
2132

2233

2334
def test_sqlmesh_resource_properly_reports_errors(
@@ -48,3 +59,40 @@ def test_sqlmesh_resource_properly_reports_errors(
4859

4960
assert caught_failure, "Expected an error to be raised, but it was not."
5061

62+
63+
def test_sqlmesh_resource_properly_reports_errors_not_thrown(
64+
sample_sqlmesh_project: str, sample_sqlmesh_db_path: str
65+
):
66+
dg_context = dg.build_asset_context()
67+
test_context = setup_testing_sqlmesh_test_context(
68+
db_path=sample_sqlmesh_db_path,
69+
project_path=sample_sqlmesh_project,
70+
variables={"enable_model_failure": False}
71+
)
72+
test_context.initialize_test_source()
73+
resource = test_context.create_resource()
74+
def event_handler_factory(*args: t.Any, **kwargs: t.Any) -> DagsterSQLMeshEventHandler:
75+
"""Custom event handler factory for the SQLMesh resource."""
76+
handler = DagsterSQLMeshEventHandler(*args, **kwargs)
77+
# Load it with an error
78+
handler._errors = [Exception("testerror")]
79+
return handler
80+
resource.set_event_handler_factory(event_handler_factory)
81+
82+
caught_failure = False
83+
try:
84+
for result in resource.run(dg_context):
85+
pass
86+
except PlanOrRunFailedError as e:
87+
caught_failure = True
88+
89+
expected_error_found = False
90+
for err in e.errors:
91+
print(f"Found error: {err}")
92+
if "testerror" in str(err):
93+
expected_error_found = True
94+
break
95+
assert expected_error_found, "Expected error 'testerror' not found in the error list."
96+
97+
assert caught_failure, "Expected an error to be raised, but it was not."
98+

dagster_sqlmesh/testing/context.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from dagster_sqlmesh.controller.base import PlanOptions, RunOptions
1818
from dagster_sqlmesh.controller.dagster import DagsterSQLMeshController
1919
from dagster_sqlmesh.events import ConsoleRecorder
20-
from dagster_sqlmesh.resource import SQLMeshResource
20+
from dagster_sqlmesh.resource import DagsterSQLMeshEventHandler, SQLMeshResource
2121

2222
logger = logging.getLogger(__name__)
2323

@@ -49,6 +49,41 @@ def setup_testing_sqlmesh_test_context(
4949
return SQLMeshTestContext(db_path=db_path, context_config=context_config)
5050

5151

52+
class TestSQLMeshResource(SQLMeshResource):
53+
"""A test SQLMesh resource that can be used in tests.
54+
55+
This resource is a subclass of SQLMeshResource and is used to run SQLMesh in tests.
56+
It allows for easy setup and teardown of the SQLMesh context.
57+
"""
58+
59+
def __init__(self, config: SQLMeshContextConfig, is_testing: bool = False):
60+
super().__init__(config=config, is_testing=is_testing)
61+
def default_event_handler_factory(*args: t.Any, **kwargs: t.Any) -> DagsterSQLMeshEventHandler:
62+
"""Default event handler factory for the SQLMesh resource."""
63+
return DagsterSQLMeshEventHandler(*args, **kwargs)
64+
self._event_handler_factory = default_event_handler_factory
65+
66+
def set_event_handler_factory(self, event_handler_factory: t.Callable[..., DagsterSQLMeshEventHandler]) -> None:
67+
"""Set the event handler for the SQLMesh resource.
68+
69+
Args:
70+
event_handler (DagsterSQLMeshEventHandler): The event handler to set.
71+
"""
72+
self._event_handler_factory = event_handler_factory
73+
74+
def create_event_handler(self, *args: t.Any, **kwargs: t.Any) -> DagsterSQLMeshEventHandler:
75+
"""Create a new event handler for the SQLMesh resource.
76+
77+
Args:
78+
*args: Positional arguments to pass to the event handler.
79+
**kwargs: Keyword arguments to pass to the event handler.
80+
81+
Returns:
82+
DagsterSQLMeshEventHandler: The created event handler.
83+
"""
84+
return self._event_handler_factory(*args, **kwargs)
85+
86+
5287
@dataclass
5388
class SQLMeshTestContext:
5489
"""A test context for running SQLMesh"""
@@ -61,8 +96,8 @@ def create_controller(self) -> DagsterSQLMeshController[Context]:
6196
config=self.context_config,
6297
)
6398

64-
def create_resource(self) -> SQLMeshResource:
65-
return SQLMeshResource(
99+
def create_resource(self) -> TestSQLMeshResource:
100+
return TestSQLMeshResource(
66101
config=self.context_config, is_testing=True,
67102
)
68103

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)