From 1787dd152a67d7ce3411cc302c498572c8d17bc5 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Sat, 28 Dec 2024 05:21:59 +0000 Subject: [PATCH 1/5] Fixes Dagster Resource --- dagster_sqlmesh/resource.py | 4 +++- pyproject.toml | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index 2220d75..cc0e389 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -256,4 +256,6 @@ def run( yield from event_handler.process_events(mesh.context, event) def get_controller(self, log_override: t.Optional[logging.Logger] = None): - return SQLMeshController.setup(self.config, log_override=log_override) + return SQLMeshController.setup_with_config( + self.config, log_override=log_override + ) diff --git a/pyproject.toml b/pyproject.toml index a804212..85abcbd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dagster-sqlmesh" -version = "0.3.0" +version = "0.3.1" description = "" authors = ["Reuven Gonzales "] license = "Apache 2.0" From 687a86668615d97a66c54ba654e81c53adce5506 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Fri, 27 Dec 2024 21:28:02 -0800 Subject: [PATCH 2/5] dev publish --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 85abcbd..a992d90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dagster-sqlmesh" -version = "0.3.1" +version = "0.3.1-dev1" description = "" authors = ["Reuven Gonzales "] license = "Apache 2.0" From b551c4b10f7f82f6b7d5366f5120727e5215ecea Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Fri, 27 Dec 2024 23:46:19 -0800 Subject: [PATCH 3/5] Fixed broken usage of instance --- dagster_sqlmesh/controller/base.py | 25 +++++++++++++++++++------ dagster_sqlmesh/controller/dagster.py | 2 +- dagster_sqlmesh/resource.py | 3 +-- pyproject.toml | 2 +- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/dagster_sqlmesh/controller/base.py b/dagster_sqlmesh/controller/base.py index 9cd2e60..8ebe239 100644 --- a/dagster_sqlmesh/controller/base.py +++ b/dagster_sqlmesh/controller/base.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from math import log import typing as t import logging import threading @@ -350,13 +351,19 @@ def setup_with_config( controller = cls( console=console, config=config, + log_override=log_override, ) return controller - def __init__(self, config: SQLMeshContextConfig, console: EventConsole): + def __init__( + self, + config: SQLMeshContextConfig, + console: EventConsole, + log_override: t.Optional[logging.Logger] = None, + ): self.config = config self.console = console - self.logger = logger + self.logger = log_override or logger self._context_open = False def set_logger(self, logger: logging.Logger): @@ -379,7 +386,10 @@ def _create_context(self): return Context(**options) @contextmanager - def instance(self, environment: str): + def instance(self, environment: str, component: str = "unknown"): + self.logger.info( + f"Opening sqlmesh instance for env={environment} component={component}" + ) if self._context_open: raise Exception("Only one sqlmesh instance at a time") @@ -390,6 +400,9 @@ def instance(self, environment: str): environment, self.console, self.config, context, self.logger ) finally: + self.logger.info( + f"Closing sqlmesh instance for env={environment} component={component}" + ) self._context_open = False context.close() @@ -398,7 +411,7 @@ def run( environment: str, **run_options: t.Unpack[RunOptions], ): - with self.instance(environment) as mesh: + with self.instance(environment, "run") as mesh: yield from mesh.run(**run_options) def plan( @@ -408,7 +421,7 @@ def plan( default_catalog: t.Optional[str], plan_options: PlanOptions, ): - with self.instance(environment) as mesh: + with self.instance(environment, "plan") as mesh: yield from mesh.plan(categorizer, default_catalog, **plan_options) def plan_and_run( @@ -419,7 +432,7 @@ def plan_and_run( plan_options: t.Optional[PlanOptions] = None, run_options: t.Optional[RunOptions] = None, ): - with self.instance(environment) as mesh: + with self.instance(environment, "plan_and_run") as mesh: yield from mesh.plan_and_run( categorizer=categorizer, default_catalog=default_catalog, diff --git a/dagster_sqlmesh/controller/dagster.py b/dagster_sqlmesh/controller/dagster.py index 74087ba..9c221a4 100644 --- a/dagster_sqlmesh/controller/dagster.py +++ b/dagster_sqlmesh/controller/dagster.py @@ -22,7 +22,7 @@ class DagsterSQLMeshController(SQLMeshController): def to_asset_outs( self, environment: str, translator: SQLMeshDagsterTranslator ) -> SQLMeshMultiAssetOptions: - with self.instance(environment) as instance: + with self.instance(environment, "to_asset_outs") as instance: context = instance.context dag = context.dag output = SQLMeshMultiAssetOptions() diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index cc0e389..4e6142b 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -248,8 +248,7 @@ def run( context, models_map, dag, "sqlmesh: " ) - for event in controller.plan_and_run( - environment=environment, + for event in mesh.plan_and_run( plan_options=plan_options, run_options=run_options, ): diff --git a/pyproject.toml b/pyproject.toml index a992d90..85abcbd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dagster-sqlmesh" -version = "0.3.1-dev1" +version = "0.3.1" description = "" authors = ["Reuven Gonzales "] license = "Apache 2.0" From ac83f75888e878b65c9f02ce5bee3b088e4c88ac Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Fri, 27 Dec 2024 23:50:03 -0800 Subject: [PATCH 4/5] fix --- dagster_sqlmesh/controller/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dagster_sqlmesh/controller/base.py b/dagster_sqlmesh/controller/base.py index 8ebe239..a7b685d 100644 --- a/dagster_sqlmesh/controller/base.py +++ b/dagster_sqlmesh/controller/base.py @@ -1,5 +1,4 @@ from dataclasses import dataclass -from math import log import typing as t import logging import threading From 87fb56006eada624ed6c82fa0efb122cf20afe33 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Sat, 28 Dec 2024 00:02:22 -0800 Subject: [PATCH 5/5] Better CI --- .github/workflows/ci-default.yml | 7 +++++-- sample/dagster_project/definitions.py | 4 ++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci-default.yml b/.github/workflows/ci-default.yml index f2fc2c4..f0d207c 100644 --- a/.github/workflows/ci-default.yml +++ b/.github/workflows/ci-default.yml @@ -69,5 +69,8 @@ jobs: - name: Test (Python) run: | poetry run pytest - - + + - name: Run the sample dagster project + run: | + poetry run dagster job execute -f sample/dagster_project/definitions.py -j all_assets_job + \ No newline at end of file diff --git a/sample/dagster_project/definitions.py b/sample/dagster_project/definitions.py index ad33421..da72d4d 100644 --- a/sample/dagster_project/definitions.py +++ b/sample/dagster_project/definitions.py @@ -6,6 +6,7 @@ asset, AssetExecutionContext, Definitions, + define_asset_job, ) from dagster_duckdb_polars import DuckDBPolarsIOManager import polars as pl @@ -55,6 +56,8 @@ def sqlmesh_project(context: AssetExecutionContext, sqlmesh: SQLMeshResource): yield from sqlmesh.run(context) +all_assets_job = define_asset_job(name="all_assets_job") + defs = Definitions( assets=[sqlmesh_project, test_source, reset_asset], resources={ @@ -64,4 +67,5 @@ def sqlmesh_project(context: AssetExecutionContext, sqlmesh: SQLMeshResource): schema="sources", ), }, + jobs=[all_assets_job], )