Skip to content

Commit ae1b8b7

Browse files
committed
feat(tests): enhance testing capabilities with new Dagster context
- Introduced `DagsterTestContext` for improved asset materialization in tests. - Updated `sample_sqlmesh_project` and `sample_dagster_project` fixtures to create temporary projects for testing. - Modified `SQLMeshResource` to support plan and run options overrides. - Refactored existing test cases to utilize the new context and improve clarity. - Updated `Makefile` to change asset materialization command and configuration for better usability.
1 parent cef6ce8 commit ae1b8b7

File tree

4 files changed

+273
-148
lines changed

4 files changed

+273
-148
lines changed

Makefile

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,18 @@ clean-dagster:
5757
rm -rf sample/dagster_project/storage sample/dagster_project/logs sample/dagster_project/history
5858

5959
clean-db:
60-
$(PYTHON_CMD) -c "import duckdb; conn = duckdb.connect('db.db'); [conn.execute(cmd[0]) for cmd in conn.execute(\"\"\"SELECT 'DROP TABLE ' || table_schema || '.' || table_name || ' CASCADE;' as drop_cmd FROM information_schema.tables WHERE table_schema != 'sources' AND table_schema != 'information_schema' AND table_type = 'BASE TABLE'\"\"\").fetchall()]; [conn.execute(cmd[0]) for cmd in conn.execute(\"\"\"SELECT 'DROP VIEW ' || table_schema || '.' || table_name || ' CASCADE;' as drop_cmd FROM information_schema.tables WHERE table_schema != 'sources' AND table_schema != 'information_schema' AND table_type = 'VIEW'\"\"\").fetchall()]; conn.close()"
60+
"$(PYTHON_CMD)" -c "import duckdb; conn = duckdb.connect('db.db'); [conn.execute(cmd[0]) for cmd in conn.execute(\"\"\"SELECT 'DROP TABLE ' || table_schema || '.' || table_name || ' CASCADE;' as drop_cmd FROM information_schema.tables WHERE table_schema != 'sources' AND table_schema != 'information_schema' AND table_type = 'BASE TABLE'\"\"").fetchall()]; [conn.execute(cmd[0]) for cmd in conn.execute(\"\"\"SELECT 'DROP VIEW ' || table_schema || '.' || table_name || ' CASCADE;' as drop_cmd FROM information_schema.tables WHERE table_schema != 'sources' AND table_schema != 'information_schema' AND table_type = 'VIEW'\"\"").fetchall()]; conn.close()"
6161

6262
dagster-dev: clean-dagster
6363
@DAGSTER_HOME="$(subst \,/,$(CURDIR))/sample/dagster_project" "$(PYTHON_CMD)" -m dagster dev -f sample/dagster_project/definitions.py -h 0.0.0.0
6464

6565
dev: dagster-dev # Alias for dagster-dev
6666

6767
dagster-materialize:
68-
"$(PYTHON_CMD)" -m dagster asset materialize -f "$(CURDIR)/sample/dagster_project/definitions.py" --select "test_source"
69-
68+
"$(PYTHON_CMD)" -m dagster asset materialize \
69+
-f "$(CURDIR)/sample/dagster_project/definitions.py" \
70+
--select "intermediate_model_1" \
71+
--config-json '{"resources": {"sqlmesh": {"config": {"config": {"gateway": "local", "path": "C:\\\\Users\\\\kevin\\\\git_repos\\\\dagster-sqlmesh - Copy\\\\sample\\\\sqlmesh_project"}, "plan_options_override": {"skip_backfill": false}}}}}'
7072

7173
sqlmesh-plan:
7274
cd sample/sqlmesh_project && $(SQLMESH_CMD) plan

dagster_sqlmesh/conftest.py

Lines changed: 110 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import json
12
import logging
23
import os
34
import shutil
5+
import subprocess
46
import sys
57
import tempfile
68
import typing as t
@@ -35,7 +37,7 @@ def setup_debug_logging_for_tests() -> None:
3537

3638

3739
@pytest.fixture
38-
def sample_sqlmesh_project() -> t.Generator[str, None, None]:
40+
def sample_sqlmesh_project() -> t.Iterator[str]:
3941
"""Creates a temporary sqlmesh project by copying the sample project"""
4042
with tempfile.TemporaryDirectory() as tmp_dir:
4143
project_dir = shutil.copytree(
@@ -359,7 +361,7 @@ def plan_and_run(
359361
@pytest.fixture
360362
def sample_sqlmesh_test_context(
361363
sample_sqlmesh_project: str,
362-
) -> t.Generator[SQLMeshTestContext, None, None]:
364+
) -> t.Iterator[SQLMeshTestContext]:
363365
db_path = os.path.join(sample_sqlmesh_project, "db.db")
364366
config = SQLMeshConfig(
365367
gateways={
@@ -417,7 +419,7 @@ def permanent_sqlmesh_project() -> str:
417419
@pytest.fixture
418420
def model_change_test_context(
419421
permanent_sqlmesh_project: str,
420-
) -> t.Generator[SQLMeshTestContext, None, None]:
422+
) -> t.Iterator[SQLMeshTestContext]:
421423
"""FOR DEBUGGING ONLY: Creates a SQLMesh test context specifically for testing model code changes.
422424
423425
This fixture provides a context that allows modifying SQL model files and ensures
@@ -455,5 +457,109 @@ def model_change_test_context(
455457
# test_context.cleanup_modified_files()
456458

457459

460+
@dataclass
461+
class DagsterTestContext:
462+
"""A test context for running Dagster"""
463+
464+
project_path: str
465+
466+
def asset_materialisation(
467+
self,
468+
assets: list[str],
469+
plan_options: PlanOptions | None = None,
470+
run_options: RunOptions | None = None,
471+
) -> None:
472+
"""Materialises the given Dagster assets using CLI command.
473+
474+
Args:
475+
assets: String of comma-separated asset names to materialize
476+
plan_options: Optional SQLMesh plan options to pass to the config
477+
run_options: Optional SQLMesh run options to pass to the config
478+
"""
479+
480+
# Construct the base config
481+
config: dict[str, t.Any] = {
482+
"resources": {
483+
"sqlmesh": {
484+
"config": {
485+
"config": {"gateway": "local", "path": self.project_path}
486+
}
487+
}
488+
}
489+
}
490+
491+
# Add plan options if provided
492+
if plan_options:
493+
config["resources"]["sqlmesh"]["config"]["plan_options_override"] = {
494+
k: v for k, v in plan_options.items() if v is not None
495+
}
496+
497+
# Add run options if provided
498+
if run_options:
499+
config["resources"]["sqlmesh"]["config"]["run_options_override"] = {
500+
k: v for k, v in run_options.items() if v is not None
501+
}
502+
503+
# Convert config to JSON string, escaping backslashes for Windows paths
504+
config_json = json.dumps(config).replace("\\", "\\\\")
505+
506+
# Construct the command
507+
cmd = [
508+
sys.executable,
509+
"-m",
510+
"dagster",
511+
"asset",
512+
"materialize",
513+
"-f",
514+
os.path.join(self.project_path, "definitions.py"),
515+
"--select",
516+
",".join(assets),
517+
"--config-json",
518+
config_json,
519+
]
520+
521+
# Run the command
522+
subprocess.run(cmd, check=True)
523+
524+
def reset_assets(self) -> None:
525+
"""Resets the assets to the original state"""
526+
self.asset_materialisation(assets=["reset_asset"])
527+
528+
def init_test_source(self) -> None:
529+
"""Initialises the test source"""
530+
self.asset_materialisation(assets=["test_source"])
531+
532+
533+
534+
@pytest.fixture
535+
def sample_dagster_project() -> t.Iterator[str]:
536+
"""Creates a temporary dagster project by copying the sample project"""
537+
with tempfile.TemporaryDirectory() as tmp_dir:
538+
project_dir = shutil.copytree(
539+
"sample",
540+
tmp_dir,
541+
)
542+
dagster_project_dir = os.path.join(project_dir, "dagster_project")
543+
sqlmesh_project_dir = os.path.join(project_dir, "sqlmesh_project")
544+
545+
db_path = os.path.join(sqlmesh_project_dir, "db.db")
546+
if os.path.exists(db_path):
547+
os.remove(os.path.join(sqlmesh_project_dir, "db.db"))
548+
549+
# Initialize the "source" data
550+
yield str(dagster_project_dir)
551+
552+
553+
554+
@pytest.fixture
555+
def sample_dagster_test_context(
556+
sample_dagster_project: str,
557+
) -> t.Iterator[DagsterTestContext]:
558+
test_context = DagsterTestContext(
559+
project_path=os.path.join(sample_dagster_project),
560+
)
561+
yield test_context
562+
563+
458564
if __name__ == "__main__":
459-
pytest.main([__file__])
565+
pytest.main([__file__])

0 commit comments

Comments
 (0)