@@ -37,18 +37,34 @@ def setup_debug_logging_for_tests() -> None:
37
37
38
38
39
39
@pytest .fixture
40
- def sample_sqlmesh_project () -> t .Iterator [str ]:
41
- """Creates a temporary sqlmesh project by copying the sample project """
40
+ def sample_project_root () -> t .Iterator [str ]:
41
+ """Creates a temporary project directory containing both SQLMesh and Dagster projects """
42
42
with tempfile .TemporaryDirectory () as tmp_dir :
43
- project_dir = shutil .copytree (
44
- "sample/sqlmesh_project" , os .path .join (tmp_dir , "project" )
45
- )
46
- db_path = os .path .join (project_dir , "db.db" )
47
- if os .path .exists (db_path ):
48
- os .remove (os .path .join (project_dir , "db.db" ))
43
+ project_dir = shutil .copytree ("sample" , tmp_dir , dirs_exist_ok = True )
44
+ yield project_dir
45
+
46
+
47
+ @pytest .fixture
48
+ def sample_sqlmesh_project (sample_project_root : str ) -> t .Iterator [str ]:
49
+ """Returns path to the SQLMesh project within the sample project"""
50
+ sqlmesh_project_dir = os .path .join (sample_project_root , "sqlmesh_project" )
51
+ db_path = os .path .join (sqlmesh_project_dir , "db.db" )
52
+ if os .path .exists (db_path ):
53
+ os .remove (db_path )
54
+ yield sqlmesh_project_dir
55
+
56
+
57
+ @pytest .fixture
58
+ def sample_dagster_project (sample_project_root : str ) -> t .Iterator [str ]:
59
+ """Returns path to the Dagster project within the sample project"""
60
+ dagster_project_dir = os .path .join (sample_project_root , "dagster_project" )
61
+ sqlmesh_project_dir = os .path .join (sample_project_root , "sqlmesh_project" )
62
+
63
+ db_path = os .path .join (sqlmesh_project_dir , "db.db" )
64
+ if os .path .exists (db_path ):
65
+ os .remove (db_path )
49
66
50
- # Initialize the "source" data
51
- yield str (project_dir )
67
+ yield dagster_project_dir
52
68
53
69
54
70
@dataclass
@@ -463,6 +479,43 @@ class DagsterTestContext:
463
479
464
480
project_path : str
465
481
482
+ def _run_command (self , cmd : list [str ]) -> None :
483
+ """Execute a command and stream its output in real-time.
484
+
485
+ Args:
486
+ cmd: List of command parts to execute
487
+
488
+ Raises:
489
+ subprocess.CalledProcessError: If the command returns non-zero exit code
490
+ """
491
+ print (f"Running command: { ' ' .join (cmd )} " )
492
+ process = subprocess .Popen (
493
+ cmd ,
494
+ stdout = subprocess .PIPE ,
495
+ stderr = subprocess .PIPE ,
496
+ text = True ,
497
+ bufsize = 1 ,
498
+ universal_newlines = True ,
499
+ )
500
+
501
+ # Stream output in real-time
502
+ while True :
503
+ stdout_line = process .stdout .readline () if process .stdout else ""
504
+ stderr_line = process .stderr .readline () if process .stderr else ""
505
+
506
+ if stdout_line :
507
+ print (stdout_line .rstrip ())
508
+ if stderr_line :
509
+ print (stderr_line .rstrip ())
510
+
511
+ process_finished = not stdout_line and not stderr_line and process .poll () is not None
512
+ if process_finished :
513
+ break
514
+
515
+ process_failed = process .returncode != 0
516
+ if process_failed :
517
+ raise subprocess .CalledProcessError (process .returncode , cmd )
518
+
466
519
def asset_materialisation (
467
520
self ,
468
521
assets : list [str ],
@@ -476,8 +529,6 @@ def asset_materialisation(
476
529
plan_options: Optional SQLMesh plan options to pass to the config
477
530
run_options: Optional SQLMesh run options to pass to the config
478
531
"""
479
-
480
- # Construct the base config
481
532
config : dict [str , t .Any ] = {
482
533
"resources" : {
483
534
"sqlmesh" : {
@@ -494,7 +545,6 @@ def asset_materialisation(
494
545
k : v for k , v in plan_options .items () if v is not None
495
546
}
496
547
497
- # Add run options if provided
498
548
if run_options :
499
549
config ["resources" ]["sqlmesh" ]["config" ]["run_options_override" ] = {
500
550
k : v for k , v in run_options .items () if v is not None
@@ -503,7 +553,6 @@ def asset_materialisation(
503
553
# Convert config to JSON string, escaping backslashes for Windows paths
504
554
config_json = json .dumps (config ).replace ("\\ " , "\\ \\ " )
505
555
506
- # Construct the command
507
556
cmd = [
508
557
sys .executable ,
509
558
"-m" ,
@@ -518,8 +567,7 @@ def asset_materialisation(
518
567
config_json ,
519
568
]
520
569
521
- # Run the command
522
- subprocess .run (cmd , check = True )
570
+ self ._run_command (cmd )
523
571
524
572
def reset_assets (self ) -> None :
525
573
"""Resets the assets to the original state"""
@@ -530,27 +578,6 @@ def init_test_source(self) -> None:
530
578
self .asset_materialisation (assets = ["test_source" ])
531
579
532
580
533
-
534
- @pytest .fixture
535
- def sample_dagster_project () -> t .Iterator [str ]:
536
- """Creates a temporary dagster project by copying the sample project"""
537
- with tempfile .TemporaryDirectory () as tmp_dir :
538
- project_dir = shutil .copytree (
539
- "sample" ,
540
- tmp_dir ,
541
- )
542
- dagster_project_dir = os .path .join (project_dir , "dagster_project" )
543
- sqlmesh_project_dir = os .path .join (project_dir , "sqlmesh_project" )
544
-
545
- db_path = os .path .join (sqlmesh_project_dir , "db.db" )
546
- if os .path .exists (db_path ):
547
- os .remove (os .path .join (sqlmesh_project_dir , "db.db" ))
548
-
549
- # Initialize the "source" data
550
- yield str (dagster_project_dir )
551
-
552
-
553
-
554
581
@pytest .fixture
555
582
def sample_dagster_test_context (
556
583
sample_dagster_project : str ,
0 commit comments