Skip to content

Commit cb8278c

Browse files
authored
Fixes Dagster Resource (#6)
* Fixes Dagster Resource * dev publish * Fixed broken usage of instance * fix * Better CI
1 parent b89d02c commit cb8278c

File tree

6 files changed

+33
-13
lines changed

6 files changed

+33
-13
lines changed

.github/workflows/ci-default.yml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,8 @@ jobs:
6969
- name: Test (Python)
7070
run: |
7171
poetry run pytest
72-
73-
72+
73+
- name: Run the sample dagster project
74+
run: |
75+
poetry run dagster job execute -f sample/dagster_project/definitions.py -j all_assets_job
76+

dagster_sqlmesh/controller/base.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -350,13 +350,19 @@ def setup_with_config(
350350
controller = cls(
351351
console=console,
352352
config=config,
353+
log_override=log_override,
353354
)
354355
return controller
355356

356-
def __init__(self, config: SQLMeshContextConfig, console: EventConsole):
357+
def __init__(
358+
self,
359+
config: SQLMeshContextConfig,
360+
console: EventConsole,
361+
log_override: t.Optional[logging.Logger] = None,
362+
):
357363
self.config = config
358364
self.console = console
359-
self.logger = logger
365+
self.logger = log_override or logger
360366
self._context_open = False
361367

362368
def set_logger(self, logger: logging.Logger):
@@ -379,7 +385,10 @@ def _create_context(self):
379385
return Context(**options)
380386

381387
@contextmanager
382-
def instance(self, environment: str):
388+
def instance(self, environment: str, component: str = "unknown"):
389+
self.logger.info(
390+
f"Opening sqlmesh instance for env={environment} component={component}"
391+
)
383392
if self._context_open:
384393
raise Exception("Only one sqlmesh instance at a time")
385394

@@ -390,6 +399,9 @@ def instance(self, environment: str):
390399
environment, self.console, self.config, context, self.logger
391400
)
392401
finally:
402+
self.logger.info(
403+
f"Closing sqlmesh instance for env={environment} component={component}"
404+
)
393405
self._context_open = False
394406
context.close()
395407

@@ -398,7 +410,7 @@ def run(
398410
environment: str,
399411
**run_options: t.Unpack[RunOptions],
400412
):
401-
with self.instance(environment) as mesh:
413+
with self.instance(environment, "run") as mesh:
402414
yield from mesh.run(**run_options)
403415

404416
def plan(
@@ -408,7 +420,7 @@ def plan(
408420
default_catalog: t.Optional[str],
409421
plan_options: PlanOptions,
410422
):
411-
with self.instance(environment) as mesh:
423+
with self.instance(environment, "plan") as mesh:
412424
yield from mesh.plan(categorizer, default_catalog, **plan_options)
413425

414426
def plan_and_run(
@@ -419,7 +431,7 @@ def plan_and_run(
419431
plan_options: t.Optional[PlanOptions] = None,
420432
run_options: t.Optional[RunOptions] = None,
421433
):
422-
with self.instance(environment) as mesh:
434+
with self.instance(environment, "plan_and_run") as mesh:
423435
yield from mesh.plan_and_run(
424436
categorizer=categorizer,
425437
default_catalog=default_catalog,

dagster_sqlmesh/controller/dagster.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class DagsterSQLMeshController(SQLMeshController):
2222
def to_asset_outs(
2323
self, environment: str, translator: SQLMeshDagsterTranslator
2424
) -> SQLMeshMultiAssetOptions:
25-
with self.instance(environment) as instance:
25+
with self.instance(environment, "to_asset_outs") as instance:
2626
context = instance.context
2727
dag = context.dag
2828
output = SQLMeshMultiAssetOptions()

dagster_sqlmesh/resource.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,12 +248,13 @@ def run(
248248
context, models_map, dag, "sqlmesh: "
249249
)
250250

251-
for event in controller.plan_and_run(
252-
environment=environment,
251+
for event in mesh.plan_and_run(
253252
plan_options=plan_options,
254253
run_options=run_options,
255254
):
256255
yield from event_handler.process_events(mesh.context, event)
257256

258257
def get_controller(self, log_override: t.Optional[logging.Logger] = None):
259-
return SQLMeshController.setup(self.config, log_override=log_override)
258+
return SQLMeshController.setup_with_config(
259+
self.config, log_override=log_override
260+
)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "dagster-sqlmesh"
3-
version = "0.3.0"
3+
version = "0.3.1"
44
description = ""
55
authors = ["Reuven Gonzales <reuven@karibalabs.co>"]
66
license = "Apache 2.0"

sample/dagster_project/definitions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
asset,
77
AssetExecutionContext,
88
Definitions,
9+
define_asset_job,
910
)
1011
from dagster_duckdb_polars import DuckDBPolarsIOManager
1112
import polars as pl
@@ -55,6 +56,8 @@ def sqlmesh_project(context: AssetExecutionContext, sqlmesh: SQLMeshResource):
5556
yield from sqlmesh.run(context)
5657

5758

59+
all_assets_job = define_asset_job(name="all_assets_job")
60+
5861
defs = Definitions(
5962
assets=[sqlmesh_project, test_source, reset_asset],
6063
resources={
@@ -64,4 +67,5 @@ def sqlmesh_project(context: AssetExecutionContext, sqlmesh: SQLMeshResource):
6467
schema="sources",
6568
),
6669
},
70+
jobs=[all_assets_job],
6771
)

0 commit comments

Comments
 (0)