Skip to content

Commit 29d7eae

Browse files
committed
chore: merge source for PR
2 parents e905905 + 7ad32cf commit 29d7eae

File tree

13 files changed

+304
-92
lines changed

13 files changed

+304
-92
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## 0.16.0 (2025-04-30)
4+
5+
* feat: update sqlmesh pin to 0.178.2 (#45) ([27245d9](https://github.com/opensource-observer/oso/commit/27245d9)), closes [#45](https://github.com/opensource-observer/oso/issues/45)
6+
37
## 0.15.0 (2025-04-23)
48

59
* feat: allow passing in custom context (#42) ([63bd837](https://github.com/opensource-observer/oso/commit/63bd837)), closes [#42](https://github.com/opensource-observer/oso/issues/42)

dagster_sqlmesh/conftest.py

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,12 @@
66
import typing as t
77

88
import pytest
9-
from sqlmesh.core.config import (
10-
Config as SQLMeshConfig,
11-
DuckDBConnectionConfig,
12-
GatewayConfig,
13-
ModelDefaultsConfig,
14-
)
159

1610
from dagster_sqlmesh.config import SQLMeshContextConfig
17-
from dagster_sqlmesh.testing import SQLMeshTestContext
11+
from dagster_sqlmesh.testing import (
12+
SQLMeshTestContext,
13+
setup_testing_sqlmesh_context_config,
14+
)
1815

1916
logger = logging.getLogger(__name__)
2017

@@ -41,23 +38,19 @@ def sample_sqlmesh_project() -> t.Iterator[str]:
4138
# Initialize the "source" data
4239
yield str(project_dir)
4340

41+
@pytest.fixture
42+
def sample_sqlmesh_db_path(sample_sqlmesh_project: str) -> t.Iterator[str]:
43+
db_path = os.path.join(sample_sqlmesh_project, "db.db")
44+
yield db_path
45+
46+
@pytest.fixture
47+
def sample_sqlmesh_test_context_config(sample_sqlmesh_project: str, sample_sqlmesh_db_path: str) -> t.Iterator[SQLMeshContextConfig]:
48+
yield setup_testing_sqlmesh_context_config(db_path=sample_sqlmesh_db_path, project_path=sample_sqlmesh_project)
4449

4550
@pytest.fixture
4651
def sample_sqlmesh_test_context(
47-
sample_sqlmesh_project: str,
52+
sample_sqlmesh_project: str, sample_sqlmesh_test_context_config: SQLMeshContextConfig, sample_sqlmesh_db_path: str
4853
) -> t.Iterator[SQLMeshTestContext]:
49-
db_path = os.path.join(sample_sqlmesh_project, "db.db")
50-
config = SQLMeshConfig(
51-
gateways={
52-
"local": GatewayConfig(connection=DuckDBConnectionConfig(database=db_path)),
53-
},
54-
default_gateway="local",
55-
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
56-
)
57-
config_as_dict = config.dict()
58-
context_config = SQLMeshContextConfig(
59-
path=sample_sqlmesh_project, gateway="local", config_override=config_as_dict
60-
)
61-
test_context = SQLMeshTestContext(db_path=db_path, context_config=context_config)
54+
test_context = SQLMeshTestContext(db_path=sample_sqlmesh_db_path, context_config=sample_sqlmesh_test_context_config)
6255
test_context.initialize_test_source()
6356
yield test_context

dagster_sqlmesh/console.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,14 +405,14 @@ def publish_known_event(self, event_name: str, **kwargs: t.Any) -> None:
405405

406406
def publish(self, event: ConsoleEvent) -> None:
407407
self.logger.debug(
408-
f"EventConsole[{self.id}]: sending event to {len(self._handlers)}"
408+
f"EventConsole[{self.id}]: sending event {event.__class__.__name__} to {len(self._handlers)}"
409409
)
410410
for handler in self._handlers.values():
411411
handler(event)
412412

413413
def publish_unknown_event(self, event_name: str, **kwargs: t.Any) -> None:
414414
self.logger.debug(
415-
f"EventConsole[{self.id}]: sending unknown event to {len(self._handlers)}"
415+
f"EventConsole[{self.id}]: sending unknown '{event_name}' event to {len(self._handlers)} handlers"
416416
)
417417
self.logger.debug(f"EventConsole[{self.id}]: unknown event {event_name} {kwargs}")
418418

dagster_sqlmesh/controller/base.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,17 @@ def run_sqlmesh_thread(
179179
default_catalog: str,
180180
) -> None:
181181
logger.debug("dagster-sqlmesh: thread started")
182+
183+
def auto_execute_plan(event: ConsoleEvent):
184+
if isinstance(event, Plan):
185+
try:
186+
event.plan_builder.apply()
187+
except Exception as e:
188+
controller.console.exception(e)
189+
return None
190+
182191
try:
192+
controller.console.add_handler(auto_execute_plan)
183193
builder = t.cast(
184194
PlanBuilder,
185195
context.plan_builder(
@@ -192,7 +202,7 @@ def run_sqlmesh_thread(
192202
builder,
193203
auto_apply=True,
194204
default_catalog=default_catalog,
195-
)
205+
)
196206
except Exception as e:
197207
controller.console.exception(e)
198208
except: # noqa: E722
@@ -219,16 +229,18 @@ def run_sqlmesh_thread(
219229
thread.start()
220230

221231
self.logger.debug("waiting for events")
222-
for event in generator.events(thread):
223-
match event:
224-
case ConsoleException(exception=e):
225-
raise e
226-
case Plan(plan_builder=plan_builder, auto_apply=auto_apply):
227-
if auto_apply:
228-
plan_builder.apply()
229-
yield event
230-
case _:
231-
yield event
232+
try:
233+
for event in generator.events(thread):
234+
match event:
235+
case ConsoleException(exception=e):
236+
raise e
237+
case _:
238+
yield event
239+
except Exception as e:
240+
import traceback
241+
print("An exception occurred:")
242+
print(traceback.format_exc())
243+
raise
232244

233245
thread.join()
234246

0 commit comments

Comments
 (0)