diff options
author | Blaž Hrastnik | 2021-08-07 06:02:12 +0000 |
---|---|---|
committer | Blaž Hrastnik | 2021-08-07 06:04:03 +0000 |
commit | 385a6b5a1adddfc26e917982641530e1a7c7aa81 (patch) | |
tree | fbc0af05a5748ad94479521b7a6fa043702a8659 /helix-lsp/src/transport.rs | |
parent | 8714b71991010565dbbcf7aff5dead6e0a4b2f15 (diff) |
lsp: Refactor duplex to avoid issues with select! + read_exact
read_exact isn't cancellation safe.
Fixes #504
Diffstat (limited to 'helix-lsp/src/transport.rs')
-rw-r--r-- | helix-lsp/src/transport.rs | 132 |
1 files changed, 81 insertions, 51 deletions
diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index df55bbf6..0710b449 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -5,10 +5,14 @@ use log::{error, info}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; +use std::sync::Arc; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, process::{ChildStderr, ChildStdin, ChildStdout}, - sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, + sync::{ + mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, + Mutex, + }, }; #[derive(Debug)] @@ -35,14 +39,7 @@ enum ServerMessage { #[derive(Debug)] pub struct Transport { id: usize, - client_tx: UnboundedSender<(usize, jsonrpc::Call)>, - client_rx: UnboundedReceiver<Payload>, - - pending_requests: HashMap<jsonrpc::Id, Sender<Result<Value>>>, - - server_stdin: BufWriter<ChildStdin>, - server_stdout: BufReader<ChildStdout>, - server_stderr: BufReader<ChildStderr>, + pending_requests: Mutex<HashMap<jsonrpc::Id, Sender<Result<Value>>>>, } impl Transport { @@ -60,15 +57,14 @@ impl Transport { let transport = Self { id, - server_stdout, - server_stdin, - server_stderr, - client_tx, - client_rx, - pending_requests: HashMap::default(), + pending_requests: Mutex::new(HashMap::default()), }; - tokio::spawn(transport.duplex()); + let transport = Arc::new(transport); + + tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx)); + tokio::spawn(Self::err(transport.clone(), server_stderr)); + tokio::spawn(Self::send(transport, server_stdin, client_rx)); (rx, tx) } @@ -109,12 +105,12 @@ impl Transport { //TODO: reuse vector let mut content = vec![0; content_length]; reader.read_exact(&mut content).await?; - let msg = String::from_utf8(content).context("invalid utf8 from server")?; + let msg = std::str::from_utf8(&content).context("invalid utf8 from server")?; info!("<- {}", msg); // try parsing as output (server response) or call (server request) - let output: serde_json::Result<ServerMessage> = serde_json::from_str(&msg); + let output: serde_json::Result<ServerMessage> = serde_json::from_str(msg); Ok(output?) } @@ -130,40 +126,55 @@ impl Transport { Ok(()) } - async fn send_payload_to_server(&mut self, payload: Payload) -> Result<()> { + async fn send_payload_to_server( + &self, + server_stdin: &mut BufWriter<ChildStdin>, + payload: Payload, + ) -> Result<()> { //TODO: reuse string let json = match payload { Payload::Request { chan, value } => { - self.pending_requests.insert(value.id.clone(), chan); + self.pending_requests + .lock() + .await + .insert(value.id.clone(), chan); serde_json::to_string(&value)? } Payload::Notification(value) => serde_json::to_string(&value)?, Payload::Response(error) => serde_json::to_string(&error)?, }; - self.send_string_to_server(json).await + self.send_string_to_server(server_stdin, json).await } - async fn send_string_to_server(&mut self, request: String) -> Result<()> { + async fn send_string_to_server( + &self, + server_stdin: &mut BufWriter<ChildStdin>, + request: String, + ) -> Result<()> { info!("-> {}", request); // send the headers - self.server_stdin + server_stdin .write_all(format!("Content-Length: {}\r\n\r\n", request.len()).as_bytes()) .await?; // send the body - self.server_stdin.write_all(request.as_bytes()).await?; + server_stdin.write_all(request.as_bytes()).await?; - self.server_stdin.flush().await?; + server_stdin.flush().await?; Ok(()) } - async fn process_server_message(&mut self, msg: ServerMessage) -> Result<()> { + async fn process_server_message( + &self, + client_tx: &UnboundedSender<(usize, jsonrpc::Call)>, + msg: ServerMessage, + ) -> Result<()> { match msg { ServerMessage::Output(output) => self.process_request_response(output).await?, ServerMessage::Call(call) => { - self.client_tx + client_tx .send((self.id, call)) .context("failed to send a message to server")?; // let notification = Notification::parse(&method, params); @@ -172,7 +183,7 @@ impl Transport { Ok(()) } - async fn process_request_response(&mut self, output: jsonrpc::Output) -> Result<()> { + async fn process_request_response(&self, output: jsonrpc::Output) -> Result<()> { let (id, result) = match output { jsonrpc::Output::Success(jsonrpc::Success { id, result, .. }) => { info!("<- {}", result); @@ -186,6 +197,8 @@ impl Transport { let tx = self .pending_requests + .lock() + .await .remove(&id) .expect("pending_request with id not found!"); @@ -200,34 +213,51 @@ impl Transport { Ok(()) } - async fn duplex(mut self) { + async fn recv( + transport: Arc<Self>, + mut server_stdout: BufReader<ChildStdout>, + client_tx: UnboundedSender<(usize, jsonrpc::Call)>, + ) { let mut recv_buffer = String::new(); - let mut err_buffer = String::new(); loop { - tokio::select! { - // client -> server - msg = self.client_rx.recv() => { - match msg { - Some(msg) => { - self.send_payload_to_server(msg).await.unwrap() - }, - None => break - } + match Self::recv_server_message(&mut server_stdout, &mut recv_buffer).await { + Ok(msg) => { + transport + .process_server_message(&client_tx, msg) + .await + .unwrap(); } - // server -> client - msg = Self::recv_server_message(&mut self.server_stdout, &mut recv_buffer) => { - match msg { - Ok(msg) => { - self.process_server_message(msg).await.unwrap(); - } - Err(_) => { - error!("err: <- {:?}", msg); - break; - }, - } + Err(err) => { + error!("err: <- {:?}", err); + break; } - _msg = Self::recv_server_error(&mut self.server_stderr, &mut err_buffer) => {} } } } + + async fn err(_transport: Arc<Self>, mut server_stderr: BufReader<ChildStderr>) { + let mut recv_buffer = String::new(); + loop { + match Self::recv_server_error(&mut server_stderr, &mut recv_buffer).await { + Ok(_) => {} + Err(err) => { + error!("err: <- {:?}", err); + break; + } + } + } + } + + async fn send( + transport: Arc<Self>, + mut server_stdin: BufWriter<ChildStdin>, + mut client_rx: UnboundedReceiver<Payload>, + ) { + while let Some(msg) = client_rx.recv().await { + transport + .send_payload_to_server(&mut server_stdin, msg) + .await + .unwrap() + } + } } |