Skip to content
This repository was archived by the owner on Oct 2, 2024. It is now read-only.

Commit 2ae468f

Browse files
committed
Handle child LSP responses as streams
1 parent 87cc71b commit 2ae468f

File tree

3 files changed

+60
-11
lines changed

3 files changed

+60
-11
lines changed

Cargo.lock

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ readme = "README.md"
1111

1212
[dependencies]
1313
anyhow = "1.0.68"
14+
async-stream = "0.3.3"
1415
clap = { version = "4.1.1", features = ["derive"] }
1516
serde = { version = "1.0.152", features = ["derive"] }
1617
serde_json = "1.0.91"
1718
tokio = { version = "1.24.1", features = ["full"] }
19+
tokio-stream = "0.1.11"
1820
toml_edit = { version = "0.17.1", features = ["easy", "serde"] }
1921
tracing = "0.1.37"
2022
tracing-appender = "0.2.2"

src/main.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::fs;
22
use std::path::PathBuf;
3+
use std::pin::Pin;
34
use std::process::Stdio;
45

56
use anyhow::{bail, Context, Result};
@@ -10,6 +11,7 @@ use tokio::{
1011
process::{ChildStdin, ChildStdout, Command},
1112
sync::{broadcast, mpsc},
1213
};
14+
use tokio_stream::{Stream, StreamExt, StreamMap};
1315
use tracing::{debug, info, trace};
1416
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
1517

@@ -125,18 +127,23 @@ async fn run(config: LspConfig) -> Result<()> {
125127
// read messages from child LSPs
126128
tokio::spawn(async move {
127129
let mut stdout = io::stdout();
128-
loop {
129-
for rx in &mut child_rxs {
130-
if let Some(value) = rx.recv().await {
131-
let message = serde_json::to_string(&value).unwrap();
132-
debug!("received: {}", message);
133-
stdout
134-
.write_all(format!("Content-Length: {}\r\n\r\n", message.len()).as_bytes())
135-
.await
136-
.unwrap();
137-
stdout.write_all(message.as_bytes()).await.unwrap();
130+
let mut map = StreamMap::new();
131+
for (key, mut rx) in child_rxs.into_iter().enumerate() {
132+
let stream = Box::pin(async_stream::stream! {
133+
while let Some(value) = rx.recv().await {
134+
yield value;
138135
}
139-
}
136+
}) as Pin<Box<dyn Stream<Item = Value> + Send>>;
137+
map.insert(key, stream);
138+
}
139+
while let Some((_, value)) = map.next().await {
140+
let message = serde_json::to_string(&value).unwrap();
141+
debug!("received: {}", message);
142+
stdout
143+
.write_all(format!("Content-Length: {}\r\n\r\n", message.len()).as_bytes())
144+
.await
145+
.unwrap();
146+
stdout.write_all(message.as_bytes()).await.unwrap();
140147
}
141148
});
142149

0 commit comments

Comments
 (0)