From 32f813b58f14e245d2f4c875cca171b700a8dceb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=8Dcaro=20Guerra?= Date: Mon, 24 Feb 2025 12:42:06 -0700 Subject: [PATCH] feat: log sqlmesh errors to dagster --- dagster_sqlmesh/resource.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index 4e6142b..0499e66 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -87,7 +87,7 @@ def warning(self, message: str, obj: t.Optional[t.Dict[str, t.Any]] = None): self.log("warning", message, obj) def error(self, message: str, obj: t.Optional[t.Dict[str, t.Any]] = None): - self.log("warning", message, obj) + self.log("error", message, obj) def log(self, level: str | int, message: str, obj: t.Optional[t.Dict[str, t.Any]]): self._handler.log(level, message, self.ensure_standard_obj(obj)) @@ -180,7 +180,6 @@ def report_event(self, event: console.ConsoleEvent): "duration_ms": duration_ms, }, ) - case console.LogSuccess(success): self.update_stage("done") if success: @@ -188,7 +187,10 @@ def report_event(self, event: console.ConsoleEvent): else: log_context.error("sqlmesh failed") raise Exception("sqlmesh failed during run") - + case console.LogError(message): + log_context.error( + message, + ) case _: log_context.debug("Received event") @@ -201,6 +203,10 @@ def log( message: str, obj: t.Optional[t.Dict[str, t.Any]] = None, ): + if level == "error": + self._logger.error(message) + return + obj = obj or {} final_obj = obj.copy() final_obj["message"] = message