Skip to content

Commit d3202d3

Browse files
committed
refactor(command): improve command execution with real-time output streaming
- Refactored `_run_command` method in `DagsterTestContext` to utilize threading and queues for real-time output streaming from subprocesses. - Added a helper function `stream_output` to handle output from stdout and stderr concurrently. - Updated test file to utilize `asset_materialisation` method for asset handling, improving clarity in test setup. - Removed commented-out code to clean up the test file.
1 parent b084e9a commit d3202d3

File tree

2 files changed

+72
-16
lines changed

2 files changed

+72
-16
lines changed

dagster_sqlmesh/conftest.py

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ class DagsterTestContext:
479479

480480
project_path: str
481481

482-
def _run_command (self, cmd: list[str]) -> None:
482+
def _run_command(self, cmd: list[str]) -> None:
483483
"""Execute a command and stream its output in real-time.
484484
485485
Args:
@@ -488,32 +488,82 @@ def _run_command (self, cmd: list[str]) -> None:
488488
Raises:
489489
subprocess.CalledProcessError: If the command returns non-zero exit code
490490
"""
491+
import queue
492+
import threading
493+
import typing as t
494+
495+
def stream_output(
496+
pipe: t.IO[str], output_queue: queue.Queue[str | None]
497+
) -> None:
498+
"""Stream output from a pipe to a queue."""
499+
try:
500+
while True:
501+
char = pipe.read(1)
502+
if not char:
503+
break
504+
output_queue.put(char)
505+
finally:
506+
output_queue.put(None) # Signal EOF
507+
491508
print(f"Running command: {' '.join(cmd)}")
492509
process = subprocess.Popen(
493510
cmd,
494511
stdout=subprocess.PIPE,
495512
stderr=subprocess.PIPE,
496513
text=True,
497-
bufsize=1,
498514
universal_newlines=True,
499515
)
500516

501-
# Stream output in real-time
502-
while True:
503-
stdout_line = process.stdout.readline() if process.stdout else ""
504-
stderr_line = process.stderr.readline() if process.stderr else ""
517+
if not process.stdout or not process.stderr:
518+
raise RuntimeError("Failed to open subprocess pipes")
505519

506-
if stdout_line:
507-
print(stdout_line.rstrip())
508-
if stderr_line:
509-
print(stderr_line.rstrip())
520+
# Create queues for stdout and stderr
521+
stdout_queue: queue.Queue[str | None] = queue.Queue()
522+
stderr_queue: queue.Queue[str | None] = queue.Queue()
510523

511-
process_finished = not stdout_line and not stderr_line and process.poll() is not None
512-
if process_finished:
513-
break
524+
# Start threads to read from pipes
525+
stdout_thread = threading.Thread(
526+
target=stream_output, args=(process.stdout, stdout_queue)
527+
)
528+
stderr_thread = threading.Thread(
529+
target=stream_output, args=(process.stderr, stderr_queue)
530+
)
514531

515-
process_failed = process.returncode != 0
516-
if process_failed:
532+
stdout_thread.daemon = True
533+
stderr_thread.daemon = True
534+
stdout_thread.start()
535+
stderr_thread.start()
536+
537+
# Read from queues and print output
538+
stdout_done = False
539+
stderr_done = False
540+
541+
while not (stdout_done and stderr_done):
542+
# Handle stdout
543+
try:
544+
char = stdout_queue.get_nowait()
545+
if char is None:
546+
stdout_done = True
547+
else:
548+
print(char, end="", flush=True)
549+
except queue.Empty:
550+
pass
551+
552+
# Handle stderr
553+
try:
554+
char = stderr_queue.get_nowait()
555+
if char is None:
556+
stderr_done = True
557+
else:
558+
print(char, end="", flush=True)
559+
except queue.Empty:
560+
pass
561+
562+
stdout_thread.join()
563+
stderr_thread.join()
564+
process.wait()
565+
566+
if process.returncode != 0:
517567
raise subprocess.CalledProcessError(process.returncode, cmd)
518568

519569
def asset_materialisation(

dagster_sqlmesh/controller/tests_plan_and_run/test_model_code_change.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,13 @@ def test_given_model_chain_when_running_with_different_flags_then_behaves_as_exp
190190
# "Expected last_updated_at timestamps to remain unchanged when no model changes were made"
191191
# )
192192

193-
sample_dagster_test_context.init_test_source()
193+
# sample_dagster_test_context.init_test_source()
194+
195+
sample_dagster_test_context.asset_materialisation(assets=["seed_model_1"])
196+
197+
# sample_dagster_test_context.asset_materialisation(assets=["test_source", "seed_model_1", "seed_model_2", "staging_model_1", "staging_model_2", "intermediate_model_1", "full_model"])
198+
199+
# sample_dagster_test_context.asset_materialisation(assets=["intermediate_model_1"])
194200

195201

196202
if __name__ == "__main__":

0 commit comments

Comments
 (0)