From f679a3e0973746c620558c0651763495da525238 Mon Sep 17 00:00:00 2001 From: Luis Carbonell Girones <3169189+lucargir@users.noreply.github.com> Date: Wed, 23 Apr 2025 15:29:29 +0200 Subject: [PATCH 1/9] feat: move translator to config, add support for kinds label and specify group through translator The SQLMeshDagsterTranslator has been simplified to return an AssetKey, the name of the AssetKey or a string equivalent, which deprecates utils.sqlmesh_model_name_to_key and utils.key_to_sqlmesh_model_name. The kinds label is now also added, to show technology labels on the UI, only when Dagster's version allows for it. Groups can now be specified through the translator, instead of having a fixed method. --- README.md | 4 +-- dagster_sqlmesh/asset.py | 7 +++--- dagster_sqlmesh/controller/base.py | 11 +++++++-- dagster_sqlmesh/controller/dagster.py | 35 ++++++++++++++++----------- dagster_sqlmesh/resource.py | 29 ++++++++++++++-------- dagster_sqlmesh/test_asset.py | 4 +-- dagster_sqlmesh/translator.py | 35 ++++++++++++++++----------- dagster_sqlmesh/utils.py | 8 ------ 8 files changed, 77 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index 8e12f17..ffadbc0 100644 --- a/README.md +++ b/README.md @@ -24,13 +24,13 @@ from dagster import ( AssetExecutionContext, Definitions, ) -from dagster_sqlmesh import sqlmesh_assets, SQLMeshContextConfig, SQLMeshResource +from dagster_sqlmesh import sqlmesh_assets, SQLMeshContextConfig, SQLMeshResource, SQLMeshDagsterTranslator sqlmesh_config = SQLMeshContextConfig(path="/home/foo/sqlmesh_project", gateway="name-of-your-gateway") @sqlmesh_assets(environment="dev", config=sqlmesh_config) def sqlmesh_project(context: AssetExecutionContext, sqlmesh: SQLMeshResource): - yield from sqlmesh.run(context) + yield from sqlmesh.run(context, translator=SQLMeshDagsterTranslator()) defs = Definitions( assets=[sqlmesh_project], diff --git a/dagster_sqlmesh/asset.py b/dagster_sqlmesh/asset.py index 8f614bc..ac49883 100644 --- a/dagster_sqlmesh/asset.py +++ b/dagster_sqlmesh/asset.py @@ -25,10 +25,11 @@ def sqlmesh_assets( # For now we don't set this by default enabled_subsetting: bool = False, ) -> t.Callable[[t.Callable[..., t.Any]], AssetsDefinition]: - controller = DagsterSQLMeshController.setup_with_config(config) if not dagster_sqlmesh_translator: - dagster_sqlmesh_translator = SQLMeshDagsterTranslator() - conversion = controller.to_asset_outs(environment, dagster_sqlmesh_translator) + dagster_sqlmesh_translator = dagster_sqlmesh_translator + + controller = DagsterSQLMeshController.setup_with_config(config) + conversion = controller.to_asset_outs(environment, translator=dagster_sqlmesh_translator) return multi_asset( name=name, diff --git a/dagster_sqlmesh/controller/base.py b/dagster_sqlmesh/controller/base.py index 143a512..c2c877d 100644 --- a/dagster_sqlmesh/controller/base.py +++ b/dagster_sqlmesh/controller/base.py @@ -24,6 +24,7 @@ SnapshotCategorizer, ) from ..events import ConsoleGenerator +from ..translator import SQLMeshDagsterTranslator logger = logging.getLogger(__name__) @@ -400,6 +401,7 @@ class SQLMeshController: config: SQLMeshContextConfig console: EventConsole logger: logging.Logger + translator: SQLMeshDagsterTranslator @classmethod def setup( @@ -407,10 +409,11 @@ def setup( path: str, gateway: str = "local", log_override: logging.Logger | None = None, + translator_override: SQLMeshDagsterTranslator | None = None, ) -> "SQLMeshController": return cls.setup_with_config( config=SQLMeshContextConfig(path=path, gateway=gateway), - log_override=log_override, + log_override=log_override, translator_override=translator_override ) @classmethod @@ -418,12 +421,14 @@ def setup_with_config( cls: type[T], config: SQLMeshContextConfig, log_override: logging.Logger | None = None, + translator_override: SQLMeshDagsterTranslator | None = None, ) -> T: console = EventConsole(log_override=log_override) # type: ignore controller = cls( console=console, config=config, log_override=log_override, + translator_override=translator_override ) return controller @@ -432,10 +437,12 @@ def __init__( config: SQLMeshContextConfig, console: EventConsole, log_override: logging.Logger | None = None, + translator_override: SQLMeshDagsterTranslator | None = None, ) -> None: self.config = config self.console = console self.logger = log_override or logger + self.translator = translator_override or SQLMeshDagsterTranslator() self._context_open = False def set_logger(self, logger: logging.Logger) -> None: @@ -457,7 +464,7 @@ def _create_context(self) -> Context: options["config"] = self.config.sqlmesh_config set_console(self.console) return Context(**options) - + @contextmanager def instance( self, environment: str, component: str = "unknown" diff --git a/dagster_sqlmesh/controller/dagster.py b/dagster_sqlmesh/controller/dagster.py index 87cd3b0..30e7182 100644 --- a/dagster_sqlmesh/controller/dagster.py +++ b/dagster_sqlmesh/controller/dagster.py @@ -1,12 +1,12 @@ # pyright: reportPrivateImportUsage=false import logging +from inspect import signature from dagster import AssetDep, AssetKey, AssetOut from dagster._core.definitions.asset_dep import CoercibleToAssetDep from ..translator import SQLMeshDagsterTranslator from ..types import SQLMeshModelDep, SQLMeshMultiAssetOptions -from ..utils import sqlmesh_model_name_to_key from .base import SQLMeshController logger = logging.getLogger(__name__) @@ -16,18 +16,16 @@ class DagsterSQLMeshController(SQLMeshController): """An extension of the sqlmesh controller specifically for dagster use""" def to_asset_outs( - self, environment: str, translator: SQLMeshDagsterTranslator + self, environment: str, translator: SQLMeshDagsterTranslator | None = None, ) -> SQLMeshMultiAssetOptions: with self.instance(environment, "to_asset_outs") as instance: + translator = translator or SQLMeshDagsterTranslator() context = instance.context output = SQLMeshMultiAssetOptions() depsMap: dict[str, CoercibleToAssetDep] = {} for model, deps in instance.non_external_models_dag(): - asset_key = translator.get_asset_key_from_model( - context, - model, - ) + asset_key = translator.get_asset_key(context=context, fqn=model.fqn) model_deps = [ SQLMeshModelDep(fqn=dep, model=context.get_model(dep)) for dep in deps @@ -38,18 +36,27 @@ def to_asset_outs( for dep in model_deps: if dep.model: internal_asset_deps.add( - translator.get_asset_key_from_model(context, dep.model) + translator.get_asset_key(context, dep.model.fqn) ) else: - table = translator.get_fqn_to_table(context, dep.fqn) - key = translator.get_asset_key_fqn(context, dep.fqn) + table = translator.get_asset_key_str(dep.fqn) + key = translator.get_asset_key(context, dep.fqn) internal_asset_deps.add(key) # create an external dep - depsMap[table.name] = AssetDep(key) - model_key = sqlmesh_model_name_to_key(model.name) - output.outs[model_key] = AssetOut( - key=asset_key, tags=asset_tags, is_required=False - ) + depsMap[table] = AssetDep(key) + model_key = translator.get_asset_key_str(model.fqn) + # If current Dagster supports "kinds", add labels for Dagster UI + if "kinds" in signature(AssetOut).parameters: + output.outs[model_key] = AssetOut( + key=asset_key, tags=asset_tags, is_required=False, + group_name=translator.get_group_name(context, model), + kinds={"sqlmesh", translator._get_context_dialect(context).lower()} + ) + else: + output.outs[model_key] = AssetOut( + key=asset_key, tags=asset_tags, is_required=False, + group_name=translator.get_group_name(context, model) + ) output.internal_asset_deps[model_key] = internal_asset_deps output.deps = list(depsMap.values()) diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index dd5889a..4e21159 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -16,7 +16,7 @@ from .config import SQLMeshContextConfig from .controller import PlanOptions, RunOptions from .controller.dagster import DagsterSQLMeshController -from .utils import sqlmesh_model_name_to_key +from .translator import SQLMeshDagsterTranslator class MaterializationTracker: @@ -24,8 +24,9 @@ class MaterializationTracker: order. This is necessary because sqlmesh may skip some materializations that have no changes and those will be reported as completed out of order.""" - def __init__(self, sorted_dag: list[str], logger: logging.Logger) -> None: + def __init__(self, sorted_dag: list[str], logger: logging.Logger, translator: SQLMeshDagsterTranslator) -> None: self.logger = logger + self.translator = translator self._batches: dict[Snapshot, int] = {} self._count: dict[Snapshot, int] = {} self._complete_update_status: dict[str, bool] = {} @@ -114,12 +115,14 @@ def __init__( models_map: dict[str, Model], dag: DAG[t.Any], prefix: str, + translator: SQLMeshDagsterTranslator ) -> None: self._models_map = models_map self._prefix = prefix self._context = context self._logger = context.log - self._tracker = MaterializationTracker(dag.sorted[:], self._logger) + self.translator = translator + self._tracker = MaterializationTracker(sorted_dag=dag.sorted[:], logger=self._logger, translator=self.translator) self._stage = "plan" def process_events(self, event: console.ConsoleEvent) -> None: @@ -143,7 +146,8 @@ def notify_success( # We allow selecting models. That value is mapped to models_map. # If the model is not in models_map, we can skip any notification if model: - output_key = sqlmesh_model_name_to_key(model.name) + # Passing model.fqn to translator + output_key = self.translator.get_asset_key_str(model.fqn) asset_key = self._context.asset_key_for_output(output_key) yield MaterializeResult( asset_key=asset_key, @@ -192,7 +196,7 @@ def report_event(self, event: console.ConsoleEvent) -> None: log_context.info( "Snapshot progress update", { - "asset_key": sqlmesh_model_name_to_key(snapshot.model.name), + "asset_key": self.translator.get_asset_key_str(snapshot.model.name), "progress": f"{done}/{expected}", "duration_ms": duration_ms, }, @@ -263,6 +267,7 @@ def run( self, context: AssetExecutionContext, *, + translator: SQLMeshDagsterTranslator | None = None, environment: str = "dev", start: TimeLike | None = None, end: TimeLike | None = None, @@ -277,9 +282,12 @@ def run( plan_options = plan_options or {} run_options = run_options or {} + if translator is None: + translator = SQLMeshDagsterTranslator() + logger = context.log - controller = self.get_controller(logger) + controller = self.get_controller(logger, translator) with controller.instance(environment) as mesh: dag = mesh.models_dag() @@ -295,7 +303,7 @@ def run( models_map = {} for key, model in models.items(): if ( - sqlmesh_model_name_to_key(model.name) + translator.get_asset_key_str(model.fqn) in context.selected_output_names ): models_map[key] = model @@ -312,7 +320,8 @@ def run( logger.info(f"selected models: {select_models}") event_handler = DagsterSQLMeshEventHandler( - context, models_map, dag, "sqlmesh: " + context=context, models_map=models_map, dag=dag, + prefix="sqlmesh: ", translator=translator ) for event in mesh.plan_and_run( @@ -330,8 +339,8 @@ def run( yield from event_handler.notify_success(mesh.context) def get_controller( - self, log_override: logging.Logger | None = None + self, log_override: logging.Logger | None = None, translator: SQLMeshDagsterTranslator | None = None ) -> DagsterSQLMeshController: return DagsterSQLMeshController.setup_with_config( - self.config, log_override=log_override + self.config, log_override=log_override, translator_override=translator ) diff --git a/dagster_sqlmesh/test_asset.py b/dagster_sqlmesh/test_asset.py index 8a578a7..2be3717 100644 --- a/dagster_sqlmesh/test_asset.py +++ b/dagster_sqlmesh/test_asset.py @@ -1,10 +1,8 @@ -from dagster_sqlmesh.asset import SQLMeshDagsterTranslator from dagster_sqlmesh.conftest import SQLMeshTestContext def test_sqlmesh_context_to_asset_outs(sample_sqlmesh_test_context: SQLMeshTestContext): controller = sample_sqlmesh_test_context.create_controller() - translator = SQLMeshDagsterTranslator() - outs = controller.to_asset_outs("dev", translator) + outs = controller.to_asset_outs("dev") assert len(list(outs.deps)) == 1 assert len(outs.outs) == 9 diff --git a/dagster_sqlmesh/translator.py b/dagster_sqlmesh/translator.py index e3791ba..c4dc066 100644 --- a/dagster_sqlmesh/translator.py +++ b/dagster_sqlmesh/translator.py @@ -1,7 +1,7 @@ +import re +from collections.abc import Sequence -import sqlglot from dagster import AssetKey -from sqlglot import exp from sqlmesh.core.context import Context from sqlmesh.core.model import Model @@ -9,19 +9,26 @@ class SQLMeshDagsterTranslator: """Translates sqlmesh objects for dagster""" - def get_asset_key_from_model(self, context: Context, model: Model) -> AssetKey: + def get_asset_key(self, context: Context, fqn: str) -> AssetKey: """Given the sqlmesh context and a model return the asset key""" - return AssetKey(model.view_name) - - def get_asset_key_fqn(self, context: Context, fqn: str) -> AssetKey: - """Given the sqlmesh context and a fqn of a model return an asset key""" - table = self.get_fqn_to_table(context, fqn) - return AssetKey(table.name) - - def get_fqn_to_table(self, context: Context, fqn: str) -> exp.Table: - """Given the sqlmesh context and a fqn return the table""" - dialect = self._get_context_dialect(context) - return sqlglot.to_table(fqn, dialect=dialect) + path = self.get_asset_key_name(fqn) + return AssetKey(path) + + def get_asset_key_name(self, fqn: str) -> Sequence[str]: + asset_path = re.findall(r"[A-Za-z0-9_\-]+", fqn) + return asset_path + + def get_asset_key_str(self, fqn: str) -> str: + # This is an internal identifier used to map outputs and dependencies + # it will not affect the existing AssetKeys + # Only alphanumeric characters and underscores + path = self.get_asset_key_name(fqn) + + return "__dot__".join(path).replace("-", "__dash__") + + def get_group_name(self, context: Context, model: Model) -> str: + path = self.get_asset_key_name(model.fqn) + return path[-2] def _get_context_dialect(self, context: Context) -> str: return context.engine_adapter.dialect diff --git a/dagster_sqlmesh/utils.py b/dagster_sqlmesh/utils.py index 8a08b2a..5d2cf8c 100644 --- a/dagster_sqlmesh/utils.py +++ b/dagster_sqlmesh/utils.py @@ -1,14 +1,6 @@ from sqlmesh.core.snapshot import SnapshotId -def sqlmesh_model_name_to_key(name: str) -> str: - return name.replace(".", "_dot__") - - -def key_to_sqlmesh_model_name(key: str) -> str: - return key.replace("_dot__", ".") - - def snapshot_id_to_model_name(snapshot_id: SnapshotId) -> str: """Convert a SnapshotId to its model name. From e905905fda6ee8b868391581e0575fd48afd1735 Mon Sep 17 00:00:00 2001 From: Luis Carbonell Girones <3169189+lucargir@users.noreply.github.com> Date: Wed, 30 Apr 2025 11:01:37 +0200 Subject: [PATCH 2/9] chore: merge source for PR --- uv.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index 4b2f579..41d78ab 100644 --- a/uv.lock +++ b/uv.lock @@ -287,7 +287,7 @@ wheels = [ [[package]] name = "dagster-sqlmesh" -version = "0.13.1" +version = "0.15.0" source = { editable = "." } dependencies = [ { name = "dagster" }, From 514446f3e576f13a182d9b30756860f35be99dfd Mon Sep 17 00:00:00 2001 From: Luis Carbonell Girones <3169189+lucargir@users.noreply.github.com> Date: Wed, 30 Apr 2025 18:56:18 +0200 Subject: [PATCH 3/9] fix: rename translator parameters and set defaults --- dagster_sqlmesh/asset.py | 4 +--- dagster_sqlmesh/controller/base.py | 12 ++++++------ dagster_sqlmesh/resource.py | 4 ++-- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/dagster_sqlmesh/asset.py b/dagster_sqlmesh/asset.py index 71a3e42..3a3d7ce 100644 --- a/dagster_sqlmesh/asset.py +++ b/dagster_sqlmesh/asset.py @@ -23,7 +23,7 @@ def sqlmesh_assets( config: SQLMeshContextConfig, context_factory: ContextFactory[ContextCls] = lambda **kwargs: Context(**kwargs), name: str | None = None, - dagster_sqlmesh_translator: SQLMeshDagsterTranslator | None = None, + dagster_sqlmesh_translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), compute_kind: str = "sqlmesh", op_tags: t.Mapping[str, t.Any] | None = None, required_resource_keys: set[str] | None = None, @@ -32,8 +32,6 @@ def sqlmesh_assets( enabled_subsetting: bool = False, ) -> t.Callable[[t.Callable[..., t.Any]], AssetsDefinition]: controller = DagsterSQLMeshController.setup_with_config(config=config, context_factory=context_factory) - if not dagster_sqlmesh_translator: - dagster_sqlmesh_translator = dagster_sqlmesh_translator conversion = controller.to_asset_outs(environment, translator=dagster_sqlmesh_translator) diff --git a/dagster_sqlmesh/controller/base.py b/dagster_sqlmesh/controller/base.py index 1ff764b..5daae23 100644 --- a/dagster_sqlmesh/controller/base.py +++ b/dagster_sqlmesh/controller/base.py @@ -427,12 +427,12 @@ def setup( context_factory: ContextFactory[ContextCls], gateway: str = "local", log_override: logging.Logger | None = None, - translator_override: SQLMeshDagsterTranslator | None = None, + translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), ) -> t.Self: return cls.setup_with_config( config=SQLMeshContextConfig(path=path, gateway=gateway), log_override=log_override, - translator_override=translator_override, + translator=translator, context_factory=context_factory, ) @@ -443,7 +443,7 @@ def setup_with_config( config: SQLMeshContextConfig, context_factory: ContextFactory[ContextCls] = DEFAULT_CONTEXT_FACTORY, log_override: logging.Logger | None = None, - translator_override: SQLMeshDagsterTranslator | None = None, + translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), ) -> t.Self: console = EventConsole(log_override=log_override) # type: ignore controller = cls( @@ -451,7 +451,7 @@ def setup_with_config( config=config, log_override=log_override, context_factory=context_factory, - translator_override=translator_override + translator=translator ) return controller @@ -461,13 +461,13 @@ def __init__( console: EventConsole, context_factory: ContextFactory[ContextCls], log_override: logging.Logger | None = None, - translator_override: SQLMeshDagsterTranslator | None = None, + translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), ) -> None: self.config = config self.console = console self.logger = log_override or logger self._context_factory = context_factory - self.translator = translator_override or SQLMeshDagsterTranslator() + self.translator = translator self._context_open = False def set_logger(self, logger: logging.Logger) -> None: diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index 9d3f5ee..56fd1c6 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -417,11 +417,11 @@ def get_controller( self, context_factory: ContextFactory[ContextCls], log_override: logging.Logger | None = None, - translator: SQLMeshDagsterTranslator | None = None + translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), ) -> DagsterSQLMeshController[ContextCls]: return DagsterSQLMeshController.setup_with_config( config=self.config, context_factory=context_factory, log_override=log_override, - translator_override=translator + translator=translator ) From b62f5e2c22e1727fe696d61260cfec98a8bea8d5 Mon Sep 17 00:00:00 2001 From: Luis Carbonell Girones <3169189+lucargir@users.noreply.github.com> Date: Wed, 7 May 2025 17:58:47 +0200 Subject: [PATCH 4/9] fix: remove translator from controller and use helper for internal ref --- README.md | 4 ++-- dagster_sqlmesh/controller/base.py | 10 +--------- dagster_sqlmesh/controller/dagster.py | 9 +++++---- dagster_sqlmesh/resource.py | 18 +++++++++++------- dagster_sqlmesh/translator.py | 14 ++++---------- dagster_sqlmesh/utils.py | 10 ++++++++++ sample/dagster_project/definitions.py | 4 ++-- 7 files changed, 35 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index ffadbc0..62942f1 100644 --- a/README.md +++ b/README.md @@ -28,9 +28,9 @@ from dagster_sqlmesh import sqlmesh_assets, SQLMeshContextConfig, SQLMeshResourc sqlmesh_config = SQLMeshContextConfig(path="/home/foo/sqlmesh_project", gateway="name-of-your-gateway") -@sqlmesh_assets(environment="dev", config=sqlmesh_config) +@sqlmesh_assets(environment="dev", config=sqlmesh_config, translator=SQLMeshDagsterTranslator()) def sqlmesh_project(context: AssetExecutionContext, sqlmesh: SQLMeshResource): - yield from sqlmesh.run(context, translator=SQLMeshDagsterTranslator()) + yield from sqlmesh.run(context) defs = Definitions( assets=[sqlmesh_project], diff --git a/dagster_sqlmesh/controller/base.py b/dagster_sqlmesh/controller/base.py index 5daae23..6235901 100644 --- a/dagster_sqlmesh/controller/base.py +++ b/dagster_sqlmesh/controller/base.py @@ -23,7 +23,6 @@ SnapshotCategorizer, ) from ..events import ConsoleGenerator -from ..translator import SQLMeshDagsterTranslator logger = logging.getLogger(__name__) @@ -417,7 +416,6 @@ class SQLMeshController(t.Generic[ContextCls]): config: SQLMeshContextConfig console: EventConsole logger: logging.Logger - translator: SQLMeshDagsterTranslator @classmethod def setup( @@ -427,12 +425,10 @@ def setup( context_factory: ContextFactory[ContextCls], gateway: str = "local", log_override: logging.Logger | None = None, - translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), ) -> t.Self: return cls.setup_with_config( config=SQLMeshContextConfig(path=path, gateway=gateway), log_override=log_override, - translator=translator, context_factory=context_factory, ) @@ -443,15 +439,13 @@ def setup_with_config( config: SQLMeshContextConfig, context_factory: ContextFactory[ContextCls] = DEFAULT_CONTEXT_FACTORY, log_override: logging.Logger | None = None, - translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), ) -> t.Self: console = EventConsole(log_override=log_override) # type: ignore controller = cls( console=console, config=config, log_override=log_override, - context_factory=context_factory, - translator=translator + context_factory=context_factory ) return controller @@ -461,13 +455,11 @@ def __init__( console: EventConsole, context_factory: ContextFactory[ContextCls], log_override: logging.Logger | None = None, - translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), ) -> None: self.config = config self.console = console self.logger = log_override or logger self._context_factory = context_factory - self.translator = translator self._context_open = False def set_logger(self, logger: logging.Logger) -> None: diff --git a/dagster_sqlmesh/controller/dagster.py b/dagster_sqlmesh/controller/dagster.py index d5964cb..50566f8 100644 --- a/dagster_sqlmesh/controller/dagster.py +++ b/dagster_sqlmesh/controller/dagster.py @@ -5,6 +5,8 @@ from dagster import AssetDep, AssetKey, AssetOut from dagster._core.definitions.asset_dep import CoercibleToAssetDep +from dagster_sqlmesh.utils import get_asset_key_str + from ..translator import SQLMeshDagsterTranslator from ..types import SQLMeshModelDep, SQLMeshMultiAssetOptions from .base import ContextCls, SQLMeshController @@ -16,10 +18,9 @@ class DagsterSQLMeshController(SQLMeshController[ContextCls]): """An extension of the sqlmesh controller specifically for dagster use""" def to_asset_outs( - self, environment: str, translator: SQLMeshDagsterTranslator | None = None, + self, environment: str, translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), ) -> SQLMeshMultiAssetOptions: with self.instance(environment, "to_asset_outs") as instance: - translator = translator or SQLMeshDagsterTranslator() context = instance.context output = SQLMeshMultiAssetOptions() depsMap: dict[str, CoercibleToAssetDep] = {} @@ -39,12 +40,12 @@ def to_asset_outs( translator.get_asset_key(context, dep.model.fqn) ) else: - table = translator.get_asset_key_str(dep.fqn) + table = get_asset_key_str(dep.fqn) key = translator.get_asset_key(context, dep.fqn) internal_asset_deps.add(key) # create an external dep depsMap[table] = AssetDep(key) - model_key = translator.get_asset_key_str(model.fqn) + model_key = get_asset_key_str(model.fqn) # If current Dagster supports "kinds", add labels for Dagster UI if "kinds" in signature(AssetOut).parameters: output.outs[model_key] = AssetOut( diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index 56fd1c6..7b63712 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -20,6 +20,7 @@ ContextCls, ContextFactory, ) +from dagster_sqlmesh.utils import get_asset_key_str from . import console from .config import SQLMeshContextConfig @@ -177,7 +178,7 @@ def notify_success( # If the model is not in models_map, we can skip any notification if model: # Passing model.fqn to translator - output_key = self.translator.get_asset_key_str(model.fqn) + output_key = get_asset_key_str(model.fqn) if not self._is_testing: # Stupidly dagster when testing cannot use the following # method so we must specifically skip this when testing @@ -231,7 +232,7 @@ def report_event(self, event: console.ConsoleEvent) -> None: log_context.info( "Snapshot progress update", { - "asset_key": self.translator.get_asset_key_str(snapshot.model.name), + "asset_key": get_asset_key_str(snapshot.model.name), "progress": f"{done}/{expected}", "duration_ms": duration_ms, }, @@ -332,7 +333,11 @@ def run( logger = context.log - controller = self.get_controller(context_factory, logger, translator) + controller = self.get_controller( + context_factory=context_factory, + log_override=logger, + translator=translator + ) with controller.instance(environment) as mesh: dag = mesh.models_dag() @@ -404,7 +409,7 @@ def _get_selected_models_from_context( select_models: list[str] = [] models_map = {} for key, model in models.items(): - if translator.get_asset_key_str(model.fqn) in selected_output_names: + if get_asset_key_str(model.fqn) in selected_output_names: models_map[key] = model select_models.append(model.name) return ( @@ -416,12 +421,11 @@ def _get_selected_models_from_context( def get_controller( self, context_factory: ContextFactory[ContextCls], + translator: SQLMeshDagsterTranslator, log_override: logging.Logger | None = None, - translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), ) -> DagsterSQLMeshController[ContextCls]: return DagsterSQLMeshController.setup_with_config( config=self.config, context_factory=context_factory, - log_override=log_override, - translator=translator + log_override=log_override ) diff --git a/dagster_sqlmesh/translator.py b/dagster_sqlmesh/translator.py index c4dc066..2da090d 100644 --- a/dagster_sqlmesh/translator.py +++ b/dagster_sqlmesh/translator.py @@ -1,7 +1,7 @@ -import re from collections.abc import Sequence from dagster import AssetKey +from sqlglot import exp from sqlmesh.core.context import Context from sqlmesh.core.model import Model @@ -15,16 +15,10 @@ def get_asset_key(self, context: Context, fqn: str) -> AssetKey: return AssetKey(path) def get_asset_key_name(self, fqn: str) -> Sequence[str]: - asset_path = re.findall(r"[A-Za-z0-9_\-]+", fqn) - return asset_path + table = exp.to_table(fqn) + asset_key_name = [table.catalog, table.db, table.name] - def get_asset_key_str(self, fqn: str) -> str: - # This is an internal identifier used to map outputs and dependencies - # it will not affect the existing AssetKeys - # Only alphanumeric characters and underscores - path = self.get_asset_key_name(fqn) - - return "__dot__".join(path).replace("-", "__dash__") + return asset_key_name def get_group_name(self, context: Context, model: Model) -> str: path = self.get_asset_key_name(model.fqn) diff --git a/dagster_sqlmesh/utils.py b/dagster_sqlmesh/utils.py index 5d2cf8c..96b2dba 100644 --- a/dagster_sqlmesh/utils.py +++ b/dagster_sqlmesh/utils.py @@ -1,6 +1,16 @@ +from sqlglot import exp from sqlmesh.core.snapshot import SnapshotId +def get_asset_key_str(fqn: str) -> str: + # This is an internal identifier used to map outputs and dependencies + # it will not affect the existing AssetKeys + # Only alphanumeric characters and underscores + table = exp.to_table(fqn) + asset_key_name = [table.catalog, table.db, table.name] + + return "sqlmesh__" + "_".join(asset_key_name) + def snapshot_id_to_model_name(snapshot_id: SnapshotId) -> str: """Convert a SnapshotId to its model name. diff --git a/sample/dagster_project/definitions.py b/sample/dagster_project/definitions.py index 345de54..8719a9e 100644 --- a/sample/dagster_project/definitions.py +++ b/sample/dagster_project/definitions.py @@ -21,7 +21,7 @@ sqlmesh_config = SQLMeshContextConfig(path=SQLMESH_PROJECT_PATH, gateway="local") -@asset +@asset(key=["db", "sources", "reset_asset"]) def reset_asset() -> MaterializeResult: """An asset used for testing this entire workflow. If the duckdb database is found, this will delete it. This allows us to continously test this dag if @@ -34,7 +34,7 @@ def reset_asset() -> MaterializeResult: return MaterializeResult(metadata={"deleted": deleted}) -@asset(deps=[reset_asset]) +@asset(deps=[reset_asset], key=["db", "sources", "test_source"]) def test_source() -> pl.DataFrame: """Sets up the `test_source` table in duckdb that one of the sample sqlmesh models depends on""" From 6509578900e4789d4dca7843b6288085a562a407 Mon Sep 17 00:00:00 2001 From: Luis Carbonell Girones <3169189+lucargir@users.noreply.github.com> Date: Wed, 7 May 2025 18:15:30 +0200 Subject: [PATCH 5/9] chore: remove unused refs to translator --- dagster_sqlmesh/resource.py | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index 7b63712..9ddba44 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -26,7 +26,6 @@ from .config import SQLMeshContextConfig from .controller import PlanOptions, RunOptions from .controller.dagster import DagsterSQLMeshController -from .translator import SQLMeshDagsterTranslator class MaterializationTracker: @@ -34,9 +33,8 @@ class MaterializationTracker: order. This is necessary because sqlmesh may skip some materializations that have no changes and those will be reported as completed out of order.""" - def __init__(self, sorted_dag: list[str], logger: logging.Logger, translator: SQLMeshDagsterTranslator) -> None: + def __init__(self, sorted_dag: list[str], logger: logging.Logger) -> None: self.logger = logger - self.translator = translator self._batches: dict[Snapshot, int] = {} self._count: dict[Snapshot, int] = {} self._complete_update_status: dict[str, bool] = {} @@ -143,15 +141,13 @@ def __init__( models_map: dict[str, Model], dag: DAG[t.Any], prefix: str, - translator: SQLMeshDagsterTranslator, is_testing: bool = False, ) -> None: self._models_map = models_map self._prefix = prefix self._context = context self._logger = context.log - self.translator = translator - self._tracker = MaterializationTracker(sorted_dag=dag.sorted[:], logger=self._logger, translator=self.translator) + self._tracker = MaterializationTracker(sorted_dag=dag.sorted[:], logger=self._logger) self._stage = "plan" self._errors: list[Exception] = [] self._is_testing = is_testing @@ -177,7 +173,7 @@ def notify_success( # We allow selecting models. That value is mapped to models_map. # If the model is not in models_map, we can skip any notification if model: - # Passing model.fqn to translator + # Passing model.fqn to get internal unique asset key output_key = get_asset_key_str(model.fqn) if not self._is_testing: # Stupidly dagster when testing cannot use the following @@ -316,7 +312,6 @@ def run( context: AssetExecutionContext, *, context_factory: ContextFactory[ContextCls] = DEFAULT_CONTEXT_FACTORY, - translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), environment: str = "dev", start: TimeLike | None = None, end: TimeLike | None = None, @@ -335,8 +330,7 @@ def run( controller = self.get_controller( context_factory=context_factory, - log_override=logger, - translator=translator + log_override=logger ) with controller.instance(environment) as mesh: @@ -348,7 +342,10 @@ def run( [model.fqn for model, _ in mesh.non_external_models_dag()] ) selected_models_set, models_map, select_models = ( - self._get_selected_models_from_context(context, models, translator) + self._get_selected_models_from_context( + context=context, + models=models + ) ) if all_available_models == selected_models_set or select_models is None: @@ -362,7 +359,7 @@ def run( event_handler = DagsterSQLMeshEventHandler( context=context, models_map=models_map, dag=dag, - prefix="sqlmesh: ", translator=translator, is_testing=self.is_testing + prefix="sqlmesh: ", is_testing=self.is_testing ) try: @@ -391,8 +388,7 @@ def run( yield from event_handler.notify_success(mesh.context) def _get_selected_models_from_context( - self, context: AssetExecutionContext, models: MappingProxyType[str, Model], - translator: SQLMeshDagsterTranslator + self, context: AssetExecutionContext, models: MappingProxyType[str, Model] ) -> tuple[set[str], dict[str, Model], list[str] | None]: models_map = models.copy() try: @@ -421,7 +417,6 @@ def _get_selected_models_from_context( def get_controller( self, context_factory: ContextFactory[ContextCls], - translator: SQLMeshDagsterTranslator, log_override: logging.Logger | None = None, ) -> DagsterSQLMeshController[ContextCls]: return DagsterSQLMeshController.setup_with_config( From f3a9f4ea50cbd2257aa15e33a8b10c08c007ded9 Mon Sep 17 00:00:00 2001 From: Luis Carbonell Girones <3169189+lucargir@users.noreply.github.com> Date: Wed, 7 May 2025 18:17:55 +0200 Subject: [PATCH 6/9] fix: make translator required param for controller --- dagster_sqlmesh/controller/dagster.py | 2 +- dagster_sqlmesh/test_asset.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dagster_sqlmesh/controller/dagster.py b/dagster_sqlmesh/controller/dagster.py index 50566f8..b0f222d 100644 --- a/dagster_sqlmesh/controller/dagster.py +++ b/dagster_sqlmesh/controller/dagster.py @@ -18,7 +18,7 @@ class DagsterSQLMeshController(SQLMeshController[ContextCls]): """An extension of the sqlmesh controller specifically for dagster use""" def to_asset_outs( - self, environment: str, translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), + self, environment: str, translator: SQLMeshDagsterTranslator, ) -> SQLMeshMultiAssetOptions: with self.instance(environment, "to_asset_outs") as instance: context = instance.context diff --git a/dagster_sqlmesh/test_asset.py b/dagster_sqlmesh/test_asset.py index 76ad329..a41f657 100644 --- a/dagster_sqlmesh/test_asset.py +++ b/dagster_sqlmesh/test_asset.py @@ -1,8 +1,10 @@ +from dagster_sqlmesh.asset import SQLMeshDagsterTranslator from dagster_sqlmesh.conftest import SQLMeshTestContext def test_sqlmesh_context_to_asset_outs(sample_sqlmesh_test_context: SQLMeshTestContext): controller = sample_sqlmesh_test_context.create_controller() - outs = controller.to_asset_outs("dev") + translator = SQLMeshDagsterTranslator() + outs = controller.to_asset_outs("dev", translator) assert len(list(outs.deps)) == 1 assert len(outs.outs) == 10 From da7aa5ee0ab068acecee6ab340389f36be6ed8b4 Mon Sep 17 00:00:00 2001 From: Luis Carbonell Girones <3169189+lucargir@users.noreply.github.com> Date: Wed, 7 May 2025 18:22:03 +0200 Subject: [PATCH 7/9] chore: formatting --- dagster_sqlmesh/controller/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dagster_sqlmesh/controller/base.py b/dagster_sqlmesh/controller/base.py index 6235901..78e7905 100644 --- a/dagster_sqlmesh/controller/base.py +++ b/dagster_sqlmesh/controller/base.py @@ -445,7 +445,7 @@ def setup_with_config( console=console, config=config, log_override=log_override, - context_factory=context_factory + context_factory=context_factory, ) return controller @@ -481,7 +481,7 @@ def _create_context(self) -> ContextCls: options["config"] = self.config.sqlmesh_config set_console(self.console) return self._context_factory(**options) - + @contextmanager def instance( self, environment: str, component: str = "unknown" From 51643c8551bcef48533a62ac001a8fee1cc54b6b Mon Sep 17 00:00:00 2001 From: Luis Carbonell Girones <3169189+lucargir@users.noreply.github.com> Date: Wed, 7 May 2025 18:29:37 +0200 Subject: [PATCH 8/9] chore: update internal module refs to be absolute --- dagster_sqlmesh/asset.py | 3 +-- dagster_sqlmesh/controller/base.py | 6 +++--- dagster_sqlmesh/controller/dagster.py | 7 +++---- dagster_sqlmesh/resource.py | 9 ++++----- dagster_sqlmesh/test_sqlmesh_context.py | 2 +- 5 files changed, 12 insertions(+), 15 deletions(-) diff --git a/dagster_sqlmesh/asset.py b/dagster_sqlmesh/asset.py index 3a3d7ce..bb539a2 100644 --- a/dagster_sqlmesh/asset.py +++ b/dagster_sqlmesh/asset.py @@ -4,6 +4,7 @@ from dagster import AssetsDefinition, RetryPolicy, multi_asset from sqlmesh import Context +from dagster_sqlmesh.config import SQLMeshContextConfig from dagster_sqlmesh.controller import ( ContextCls, ContextFactory, @@ -11,8 +12,6 @@ ) from dagster_sqlmesh.translator import SQLMeshDagsterTranslator -from .config import SQLMeshContextConfig - logger = logging.getLogger(__name__) diff --git a/dagster_sqlmesh/controller/base.py b/dagster_sqlmesh/controller/base.py index 78e7905..51e671b 100644 --- a/dagster_sqlmesh/controller/base.py +++ b/dagster_sqlmesh/controller/base.py @@ -13,8 +13,8 @@ from sqlmesh.utils.dag import DAG from sqlmesh.utils.date import TimeLike -from ..config import SQLMeshContextConfig -from ..console import ( +from dagster_sqlmesh.config import SQLMeshContextConfig +from dagster_sqlmesh.console import ( ConsoleEvent, ConsoleEventHandler, ConsoleException, @@ -22,7 +22,7 @@ Plan, SnapshotCategorizer, ) -from ..events import ConsoleGenerator +from dagster_sqlmesh.events import ConsoleGenerator logger = logging.getLogger(__name__) diff --git a/dagster_sqlmesh/controller/dagster.py b/dagster_sqlmesh/controller/dagster.py index b0f222d..bc858fc 100644 --- a/dagster_sqlmesh/controller/dagster.py +++ b/dagster_sqlmesh/controller/dagster.py @@ -5,12 +5,11 @@ from dagster import AssetDep, AssetKey, AssetOut from dagster._core.definitions.asset_dep import CoercibleToAssetDep +from dagster_sqlmesh.controller.base import ContextCls, SQLMeshController +from dagster_sqlmesh.translator import SQLMeshDagsterTranslator +from dagster_sqlmesh.types import SQLMeshModelDep, SQLMeshMultiAssetOptions from dagster_sqlmesh.utils import get_asset_key_str -from ..translator import SQLMeshDagsterTranslator -from ..types import SQLMeshModelDep, SQLMeshMultiAssetOptions -from .base import ContextCls, SQLMeshController - logger = logging.getLogger(__name__) diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index 9ddba44..99609e7 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -15,18 +15,17 @@ from sqlmesh.utils.date import TimeLike from sqlmesh.utils.errors import SQLMeshError +from dagster_sqlmesh import console +from dagster_sqlmesh.config import SQLMeshContextConfig +from dagster_sqlmesh.controller import PlanOptions, RunOptions from dagster_sqlmesh.controller.base import ( DEFAULT_CONTEXT_FACTORY, ContextCls, ContextFactory, ) +from dagster_sqlmesh.controller.dagster import DagsterSQLMeshController from dagster_sqlmesh.utils import get_asset_key_str -from . import console -from .config import SQLMeshContextConfig -from .controller import PlanOptions, RunOptions -from .controller.dagster import DagsterSQLMeshController - class MaterializationTracker: """Tracks sqlmesh materializations and notifies dagster in the correct diff --git a/dagster_sqlmesh/test_sqlmesh_context.py b/dagster_sqlmesh/test_sqlmesh_context.py index 075a544..75858a9 100644 --- a/dagster_sqlmesh/test_sqlmesh_context.py +++ b/dagster_sqlmesh/test_sqlmesh_context.py @@ -2,7 +2,7 @@ import polars -from .testing import SQLMeshTestContext +from dagster_sqlmesh.testing import SQLMeshTestContext logger = logging.getLogger(__name__) From b166bbee46826875551ba2632fb1dce5431d53d9 Mon Sep 17 00:00:00 2001 From: Luis Carbonell Girones <3169189+lucargir@users.noreply.github.com> Date: Thu, 8 May 2025 09:45:01 +0200 Subject: [PATCH 9/9] fix: undo singleton for translator --- dagster_sqlmesh/asset.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dagster_sqlmesh/asset.py b/dagster_sqlmesh/asset.py index bb539a2..c697b1b 100644 --- a/dagster_sqlmesh/asset.py +++ b/dagster_sqlmesh/asset.py @@ -22,7 +22,7 @@ def sqlmesh_assets( config: SQLMeshContextConfig, context_factory: ContextFactory[ContextCls] = lambda **kwargs: Context(**kwargs), name: str | None = None, - dagster_sqlmesh_translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(), + dagster_sqlmesh_translator: SQLMeshDagsterTranslator | None = None, compute_kind: str = "sqlmesh", op_tags: t.Mapping[str, t.Any] | None = None, required_resource_keys: set[str] | None = None, @@ -31,7 +31,8 @@ def sqlmesh_assets( enabled_subsetting: bool = False, ) -> t.Callable[[t.Callable[..., t.Any]], AssetsDefinition]: controller = DagsterSQLMeshController.setup_with_config(config=config, context_factory=context_factory) - + if not dagster_sqlmesh_translator: + dagster_sqlmesh_translator = SQLMeshDagsterTranslator() conversion = controller.to_asset_outs(environment, translator=dagster_sqlmesh_translator) return multi_asset(