Skip to content

Commit b62f5e2

Browse files
committed
fix: remove translator from controller and use helper for internal ref
1 parent 514446f commit b62f5e2

File tree

7 files changed

+35
-34
lines changed

7 files changed

+35
-34
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ from dagster_sqlmesh import sqlmesh_assets, SQLMeshContextConfig, SQLMeshResourc
2828

2929
sqlmesh_config = SQLMeshContextConfig(path="/home/foo/sqlmesh_project", gateway="name-of-your-gateway")
3030

31-
@sqlmesh_assets(environment="dev", config=sqlmesh_config)
31+
@sqlmesh_assets(environment="dev", config=sqlmesh_config, translator=SQLMeshDagsterTranslator())
3232
def sqlmesh_project(context: AssetExecutionContext, sqlmesh: SQLMeshResource):
33-
yield from sqlmesh.run(context, translator=SQLMeshDagsterTranslator())
33+
yield from sqlmesh.run(context)
3434

3535
defs = Definitions(
3636
assets=[sqlmesh_project],

dagster_sqlmesh/controller/base.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
SnapshotCategorizer,
2424
)
2525
from ..events import ConsoleGenerator
26-
from ..translator import SQLMeshDagsterTranslator
2726

2827
logger = logging.getLogger(__name__)
2928

@@ -417,7 +416,6 @@ class SQLMeshController(t.Generic[ContextCls]):
417416
config: SQLMeshContextConfig
418417
console: EventConsole
419418
logger: logging.Logger
420-
translator: SQLMeshDagsterTranslator
421419

422420
@classmethod
423421
def setup(
@@ -427,12 +425,10 @@ def setup(
427425
context_factory: ContextFactory[ContextCls],
428426
gateway: str = "local",
429427
log_override: logging.Logger | None = None,
430-
translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(),
431428
) -> t.Self:
432429
return cls.setup_with_config(
433430
config=SQLMeshContextConfig(path=path, gateway=gateway),
434431
log_override=log_override,
435-
translator=translator,
436432
context_factory=context_factory,
437433
)
438434

@@ -443,15 +439,13 @@ def setup_with_config(
443439
config: SQLMeshContextConfig,
444440
context_factory: ContextFactory[ContextCls] = DEFAULT_CONTEXT_FACTORY,
445441
log_override: logging.Logger | None = None,
446-
translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(),
447442
) -> t.Self:
448443
console = EventConsole(log_override=log_override) # type: ignore
449444
controller = cls(
450445
console=console,
451446
config=config,
452447
log_override=log_override,
453-
context_factory=context_factory,
454-
translator=translator
448+
context_factory=context_factory
455449
)
456450
return controller
457451

@@ -461,13 +455,11 @@ def __init__(
461455
console: EventConsole,
462456
context_factory: ContextFactory[ContextCls],
463457
log_override: logging.Logger | None = None,
464-
translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(),
465458
) -> None:
466459
self.config = config
467460
self.console = console
468461
self.logger = log_override or logger
469462
self._context_factory = context_factory
470-
self.translator = translator
471463
self._context_open = False
472464

473465
def set_logger(self, logger: logging.Logger) -> None:

dagster_sqlmesh/controller/dagster.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from dagster import AssetDep, AssetKey, AssetOut
66
from dagster._core.definitions.asset_dep import CoercibleToAssetDep
77

8+
from dagster_sqlmesh.utils import get_asset_key_str
9+
810
from ..translator import SQLMeshDagsterTranslator
911
from ..types import SQLMeshModelDep, SQLMeshMultiAssetOptions
1012
from .base import ContextCls, SQLMeshController
@@ -16,10 +18,9 @@ class DagsterSQLMeshController(SQLMeshController[ContextCls]):
1618
"""An extension of the sqlmesh controller specifically for dagster use"""
1719

1820
def to_asset_outs(
19-
self, environment: str, translator: SQLMeshDagsterTranslator | None = None,
21+
self, environment: str, translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(),
2022
) -> SQLMeshMultiAssetOptions:
2123
with self.instance(environment, "to_asset_outs") as instance:
22-
translator = translator or SQLMeshDagsterTranslator()
2324
context = instance.context
2425
output = SQLMeshMultiAssetOptions()
2526
depsMap: dict[str, CoercibleToAssetDep] = {}
@@ -39,12 +40,12 @@ def to_asset_outs(
3940
translator.get_asset_key(context, dep.model.fqn)
4041
)
4142
else:
42-
table = translator.get_asset_key_str(dep.fqn)
43+
table = get_asset_key_str(dep.fqn)
4344
key = translator.get_asset_key(context, dep.fqn)
4445
internal_asset_deps.add(key)
4546
# create an external dep
4647
depsMap[table] = AssetDep(key)
47-
model_key = translator.get_asset_key_str(model.fqn)
48+
model_key = get_asset_key_str(model.fqn)
4849
# If current Dagster supports "kinds", add labels for Dagster UI
4950
if "kinds" in signature(AssetOut).parameters:
5051
output.outs[model_key] = AssetOut(

dagster_sqlmesh/resource.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
ContextCls,
2121
ContextFactory,
2222
)
23+
from dagster_sqlmesh.utils import get_asset_key_str
2324

2425
from . import console
2526
from .config import SQLMeshContextConfig
@@ -177,7 +178,7 @@ def notify_success(
177178
# If the model is not in models_map, we can skip any notification
178179
if model:
179180
# Passing model.fqn to translator
180-
output_key = self.translator.get_asset_key_str(model.fqn)
181+
output_key = get_asset_key_str(model.fqn)
181182
if not self._is_testing:
182183
# Stupidly dagster when testing cannot use the following
183184
# method so we must specifically skip this when testing
@@ -231,7 +232,7 @@ def report_event(self, event: console.ConsoleEvent) -> None:
231232
log_context.info(
232233
"Snapshot progress update",
233234
{
234-
"asset_key": self.translator.get_asset_key_str(snapshot.model.name),
235+
"asset_key": get_asset_key_str(snapshot.model.name),
235236
"progress": f"{done}/{expected}",
236237
"duration_ms": duration_ms,
237238
},
@@ -332,7 +333,11 @@ def run(
332333

333334
logger = context.log
334335

335-
controller = self.get_controller(context_factory, logger, translator)
336+
controller = self.get_controller(
337+
context_factory=context_factory,
338+
log_override=logger,
339+
translator=translator
340+
)
336341

337342
with controller.instance(environment) as mesh:
338343
dag = mesh.models_dag()
@@ -404,7 +409,7 @@ def _get_selected_models_from_context(
404409
select_models: list[str] = []
405410
models_map = {}
406411
for key, model in models.items():
407-
if translator.get_asset_key_str(model.fqn) in selected_output_names:
412+
if get_asset_key_str(model.fqn) in selected_output_names:
408413
models_map[key] = model
409414
select_models.append(model.name)
410415
return (
@@ -416,12 +421,11 @@ def _get_selected_models_from_context(
416421
def get_controller(
417422
self,
418423
context_factory: ContextFactory[ContextCls],
424+
translator: SQLMeshDagsterTranslator,
419425
log_override: logging.Logger | None = None,
420-
translator: SQLMeshDagsterTranslator = SQLMeshDagsterTranslator(),
421426
) -> DagsterSQLMeshController[ContextCls]:
422427
return DagsterSQLMeshController.setup_with_config(
423428
config=self.config,
424429
context_factory=context_factory,
425-
log_override=log_override,
426-
translator=translator
430+
log_override=log_override
427431
)

dagster_sqlmesh/translator.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import re
21
from collections.abc import Sequence
32

43
from dagster import AssetKey
4+
from sqlglot import exp
55
from sqlmesh.core.context import Context
66
from sqlmesh.core.model import Model
77

@@ -15,16 +15,10 @@ def get_asset_key(self, context: Context, fqn: str) -> AssetKey:
1515
return AssetKey(path)
1616

1717
def get_asset_key_name(self, fqn: str) -> Sequence[str]:
18-
asset_path = re.findall(r"[A-Za-z0-9_\-]+", fqn)
19-
return asset_path
18+
table = exp.to_table(fqn)
19+
asset_key_name = [table.catalog, table.db, table.name]
2020

21-
def get_asset_key_str(self, fqn: str) -> str:
22-
# This is an internal identifier used to map outputs and dependencies
23-
# it will not affect the existing AssetKeys
24-
# Only alphanumeric characters and underscores
25-
path = self.get_asset_key_name(fqn)
26-
27-
return "__dot__".join(path).replace("-", "__dash__")
21+
return asset_key_name
2822

2923
def get_group_name(self, context: Context, model: Model) -> str:
3024
path = self.get_asset_key_name(model.fqn)

dagster_sqlmesh/utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
1+
from sqlglot import exp
12
from sqlmesh.core.snapshot import SnapshotId
23

34

5+
def get_asset_key_str(fqn: str) -> str:
6+
# This is an internal identifier used to map outputs and dependencies
7+
# it will not affect the existing AssetKeys
8+
# Only alphanumeric characters and underscores
9+
table = exp.to_table(fqn)
10+
asset_key_name = [table.catalog, table.db, table.name]
11+
12+
return "sqlmesh__" + "_".join(asset_key_name)
13+
414
def snapshot_id_to_model_name(snapshot_id: SnapshotId) -> str:
515
"""Convert a SnapshotId to its model name.
616

sample/dagster_project/definitions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
sqlmesh_config = SQLMeshContextConfig(path=SQLMESH_PROJECT_PATH, gateway="local")
2222

2323

24-
@asset
24+
@asset(key=["db", "sources", "reset_asset"])
2525
def reset_asset() -> MaterializeResult:
2626
"""An asset used for testing this entire workflow. If the duckdb database is
2727
found, this will delete it. This allows us to continously test this dag if
@@ -34,7 +34,7 @@ def reset_asset() -> MaterializeResult:
3434
return MaterializeResult(metadata={"deleted": deleted})
3535

3636

37-
@asset(deps=[reset_asset])
37+
@asset(deps=[reset_asset], key=["db", "sources", "test_source"])
3838
def test_source() -> pl.DataFrame:
3939
"""Sets up the `test_source` table in duckdb that one of the sample sqlmesh
4040
models depends on"""

0 commit comments

Comments
 (0)