Skip to content

Increase Type Checking #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,9 @@ dbt_packages/
# Python
*.pyc

*.db
*.db

sample/dagster_project/storage/
sample/dagster_project/logs/
sample/dagster_project/history/
sample/dagster_project/schedules/
17 changes: 15 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,26 @@ pyright:
pnpm pyright

# Sample project commands
clean-dagster:
rm -rf sample/dagster_project/storage sample/dagster_project/logs sample/dagster_project/history

clean-db:
$(PYTHON_CMD) -c "import duckdb; conn = duckdb.connect('db.db'); [conn.execute(cmd[0]) for cmd in conn.execute(\"\"\"SELECT 'DROP TABLE ' || table_schema || '.' || table_name || ' CASCADE;' as drop_cmd FROM information_schema.tables WHERE table_schema != 'sources' AND table_schema != 'information_schema' AND table_type = 'BASE TABLE'\"\"\").fetchall()]; [conn.execute(cmd[0]) for cmd in conn.execute(\"\"\"SELECT 'DROP VIEW ' || table_schema || '.' || table_name || ' CASCADE;' as drop_cmd FROM information_schema.tables WHERE table_schema != 'sources' AND table_schema != 'information_schema' AND table_type = 'VIEW'\"\"\").fetchall()]; conn.close()"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, I'd prefer if this was just a python script we called but I'll accept this for now.


dagster-dev: clean-dagster
DAGSTER_HOME=$(CURDIR)/sample/dagster_project $(PYTHON_CMD) -m dagster dev -h 0.0.0.0 -w
@DAGSTER_HOME="$(subst \,/,$(CURDIR))/sample/dagster_project" "$(PYTHON_CMD)" -m dagster dev -f sample/dagster_project/definitions.py -h 0.0.0.0

dev: dagster-dev # Alias for dagster-dev

dagster-materialize:
$(PYTHON_CMD) -m dagster asset materialize -f sample/dagster_project/definitions.py --select '*'

.PHONY: init init-python install-python-deps upgrade-python-deps clean test mypy check-pnpm install-node-deps upgrade-node-deps sample-dev dagster-dev dagster-materialize clean-dagster
sqlmesh-plan:
cd sample/sqlmesh_project && $(SQLMESH_CMD) plan

sqlmesh-run:
cd sample/sqlmesh_project && $(SQLMESH_CMD) run

clean-dev: clean-db clean-dagster dev

.PHONY: init init-python install-python-deps upgrade-python-deps clean test mypy check-pnpm install-node-deps upgrade-node-deps sample-dev dagster-dev dagster-materialize clean-dagster clean-db clean-dev
17 changes: 9 additions & 8 deletions dagster_sqlmesh/asset.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import typing as t
import logging
import typing as t

from dagster import (
multi_asset,
AssetsDefinition,
RetryPolicy,
multi_asset,
)

from dagster_sqlmesh.controller import DagsterSQLMeshController
Expand All @@ -19,15 +20,15 @@ def sqlmesh_assets(
*,
environment: str,
config: SQLMeshContextConfig,
name: t.Optional[str] = None,
dagster_sqlmesh_translator: t.Optional[SQLMeshDagsterTranslator] = None,
name: str | None = None,
dagster_sqlmesh_translator: SQLMeshDagsterTranslator | None = None,
compute_kind: str = "sqlmesh",
op_tags: t.Optional[t.Mapping[str, t.Any]] = None,
required_resource_keys: t.Optional[t.Set[str]] = None,
retry_policy: t.Optional[RetryPolicy] = None,
op_tags: t.Mapping[str, t.Any] | None = None,
required_resource_keys: set[str] | None = None,
retry_policy: RetryPolicy | None = None,
# For now we don't set this by default
enabled_subsetting: bool = False,
):
) -> t.Callable[[t.Callable[..., t.Any]], AssetsDefinition]:
controller = DagsterSQLMeshController.setup_with_config(config)
if not dagster_sqlmesh_translator:
dagster_sqlmesh_translator = SQLMeshDagsterTranslator()
Expand Down
12 changes: 6 additions & 6 deletions dagster_sqlmesh/config.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from typing import Optional, Dict, Any
from dataclasses import dataclass
from typing import Any

from dagster import Config
from sqlmesh.core.config import Config as MeshConfig
from pydantic import Field
from sqlmesh.core.config import Config as MeshConfig


@dataclass
class ConfigOverride:
config_as_dict: Dict
config_as_dict: dict[str, Any]

def dict(self):
def dict(self) -> dict[str, Any]:
return self.config_as_dict


Expand All @@ -24,10 +24,10 @@ class SQLMeshContextConfig(Config):

path: str
gateway: str
config_override: Optional[Dict[str, Any]] = Field(default_factory=lambda: None)
config_override: dict[str, Any] | None = Field(default_factory=lambda: None)

@property
def sqlmesh_config(self):
def sqlmesh_config(self) -> MeshConfig | None:
if self.config_override:
return MeshConfig.parse_obj(self.config_override)
return None
43 changes: 24 additions & 19 deletions dagster_sqlmesh/conftest.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,42 @@
import logging
import os
import shutil
import sys
import tempfile
import shutil
import os
from dataclasses import dataclass
import typing as t
from dataclasses import dataclass

import pytest
import duckdb
import polars
from sqlmesh.utils.date import TimeLike
from sqlmesh.core.console import get_console
import pytest
from sqlmesh.core.config import (
Config as SQLMeshConfig,
GatewayConfig,
DuckDBConnectionConfig,
GatewayConfig,
ModelDefaultsConfig,
)
from sqlmesh.core.console import get_console
from sqlmesh.utils.date import TimeLike

from dagster_sqlmesh.config import SQLMeshContextConfig
from dagster_sqlmesh.events import ConsoleRecorder
from dagster_sqlmesh.console import ConsoleEvent
from dagster_sqlmesh.controller.base import PlanOptions, RunOptions
from dagster_sqlmesh.controller.dagster import DagsterSQLMeshController
from dagster_sqlmesh.events import ConsoleRecorder

logger = logging.getLogger(__name__)


@pytest.fixture(scope="session", autouse=True)
def setup_debug_logging_for_tests():
def setup_debug_logging_for_tests() -> None:
root_logger = logging.getLogger(__name__.split(".")[0])
root_logger.setLevel(logging.DEBUG)

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)


@pytest.fixture
def sample_sqlmesh_project():
def sample_sqlmesh_project() -> t.Iterator[str]:
"""Creates a temporary sqlmesh project by copying the sample project"""
with tempfile.TemporaryDirectory() as tmp_dir:
project_dir = shutil.copytree(
Expand All @@ -56,19 +57,21 @@ class SQLMeshTestContext:
db_path: str
context_config: SQLMeshContextConfig

def create_controller(self, enable_debug_console: bool = False):
def create_controller(
self, enable_debug_console: bool = False
) -> DagsterSQLMeshController:
console = None
if enable_debug_console:
console = get_console()
return DagsterSQLMeshController.setup_with_config(
self.context_config, debug_console=console
)

def query(self, *args, **kwargs):
def query(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
conn = duckdb.connect(self.db_path)
return conn.sql(*args, **kwargs).fetchall()

def initialize_test_source(self):
def initialize_test_source(self) -> None:
conn = duckdb.connect(self.db_path)
conn.sql(
"""
Expand Down Expand Up @@ -102,14 +105,14 @@ def plan_and_run(
self,
*,
environment: str,
execution_time: t.Optional[TimeLike] = None,
execution_time: TimeLike | None = None,
enable_debug_console: bool = False,
start: t.Optional[TimeLike] = None,
end: t.Optional[TimeLike] = None,
select_models: t.Optional[t.List[str]] = None,
start: TimeLike | None = None,
end: TimeLike | None = None,
select_models: list[str] | None = None,
restate_selected: bool = False,
skip_run: bool = False,
):
) -> t.Iterator[ConsoleEvent] | None:
"""Runs plan and run on SQLMesh with the given configuration and record all of the generated events.

Args:
Expand Down Expand Up @@ -152,7 +155,9 @@ def plan_and_run(


@pytest.fixture
def sample_sqlmesh_test_context(sample_sqlmesh_project: str):
def sample_sqlmesh_test_context(
sample_sqlmesh_project: str,
) -> t.Iterator[SQLMeshTestContext]:
db_path = os.path.join(sample_sqlmesh_project, "db.db")
config = SQLMeshConfig(
gateways={
Expand Down
Loading