File tree Expand file tree Collapse file tree 5 files changed +8
-6
lines changed Expand file tree Collapse file tree 5 files changed +8
-6
lines changed Original file line number Diff line number Diff line change 17
17
# Define a SQLMesh Asset
18
18
def sqlmesh_assets (
19
19
* ,
20
+ environment : str ,
20
21
config : SQLMeshContextConfig ,
21
22
name : t .Optional [str ] = None ,
22
23
dagster_sqlmesh_translator : t .Optional [SQLMeshDagsterTranslator ] = None ,
@@ -28,7 +29,7 @@ def sqlmesh_assets(
28
29
controller = DagsterSQLMeshController .setup (config )
29
30
if not dagster_sqlmesh_translator :
30
31
dagster_sqlmesh_translator = SQLMeshDagsterTranslator ()
31
- conversion = controller .to_asset_outs (dagster_sqlmesh_translator )
32
+ conversion = controller .to_asset_outs (environment , dagster_sqlmesh_translator )
32
33
33
34
return multi_asset (
34
35
name = name ,
Original file line number Diff line number Diff line change @@ -126,7 +126,7 @@ def run(
126
126
plan_options ["end" ] = end
127
127
run_options ["end" ] = end
128
128
129
- for _context , event in controller .plan_and_run (
129
+ for event in controller .plan_and_run (
130
130
environment ,
131
131
plan_options = plan_options ,
132
132
run_options = run_options ,
Original file line number Diff line number Diff line change @@ -20,9 +20,10 @@ class DagsterSQLMeshController(SQLMeshController):
20
20
"""An extension of the sqlmesh controller specifically for dagster use"""
21
21
22
22
def to_asset_outs (
23
- self , translator : SQLMeshDagsterTranslator
23
+ self , environment : str , translator : SQLMeshDagsterTranslator
24
24
) -> SQLMeshMultiAssetOptions :
25
- with self .context () as context :
25
+ with self .instance (environment ) as instance :
26
+ context = instance .context
26
27
dag = context .dag
27
28
output = SQLMeshMultiAssetOptions ()
28
29
depsMap : t .Dict [str , CoercibleToAssetDep ] = {}
Original file line number Diff line number Diff line change 7
7
def test_sqlmesh_context_to_asset_outs (sample_sqlmesh_test_context : SQLMeshTestContext ):
8
8
controller = sample_sqlmesh_test_context .create_controller ()
9
9
translator = SQLMeshDagsterTranslator ()
10
- outs = controller .to_asset_outs (translator )
10
+ outs = controller .to_asset_outs ("dev" , translator )
11
11
assert len (list (outs .deps )) == 1
12
12
assert len (outs .outs ) == 7
Original file line number Diff line number Diff line change @@ -50,7 +50,7 @@ def test_source() -> pl.DataFrame:
50
50
)
51
51
52
52
53
- @sqlmesh_assets (config = sqlmesh_config )
53
+ @sqlmesh_assets (environment = "dev" , config = sqlmesh_config )
54
54
def sqlmesh_project (context : AssetExecutionContext , sqlmesh : SQLMeshResource ):
55
55
yield from sqlmesh .run (context )
56
56
You can’t perform that action at this time.
0 commit comments