diff options
author | Egor Karavaev | 2021-06-07 19:11:17 +0000 |
---|---|---|
committer | Blaž Hrastnik | 2021-06-08 01:56:46 +0000 |
commit | ea6667070f05227f5d87eaf977ca0c7ddc982c0a (patch) | |
tree | 3a99d1fc8c2247e4a7cc04339c9f7e62c037bd7e /helix-lsp/src/transport.rs | |
parent | 960bc9f13446e1fbcc36f1a3be3541ec1b9ec5bd (diff) |
helix-lsp cleanup
Diffstat (limited to 'helix-lsp/src/transport.rs')
-rw-r--r-- | helix-lsp/src/transport.rs | 180 |
1 files changed, 87 insertions, 93 deletions
diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index dd42f7bb..c69c1ccf 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -1,15 +1,9 @@ -use std::collections::HashMap; -use std::io; - -use log::{error, info}; - -use crate::Error; - -type Result<T> = core::result::Result<T, Error>; - +use crate::Result; use jsonrpc_core as jsonrpc; +use log::{error, info}; +use serde::{Deserialize, Serialize}; use serde_json::Value; - +use std::collections::HashMap; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, process::{ChildStderr, ChildStdin, ChildStdout}, @@ -26,12 +20,11 @@ pub enum Payload { Response(jsonrpc::Output), } -use serde::{Deserialize, Serialize}; /// A type representing all possible values sent from the server to the client. #[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] #[serde(deny_unknown_fields)] #[serde(untagged)] -enum Message { +enum ServerMessage { /// A regular JSON-RPC request output (single response). Output(jsonrpc::Output), /// A JSON-RPC request or notification. @@ -39,34 +32,32 @@ enum Message { } pub struct Transport { - incoming: UnboundedSender<jsonrpc::Call>, - outgoing: UnboundedReceiver<Payload>, + client_tx: UnboundedSender<jsonrpc::Call>, + client_rx: UnboundedReceiver<Payload>, pending_requests: HashMap<jsonrpc::Id, Sender<Result<Value>>>, - headers: HashMap<String, String>, - writer: BufWriter<ChildStdin>, - reader: BufReader<ChildStdout>, - stderr: BufReader<ChildStderr>, + server_stdin: BufWriter<ChildStdin>, + server_stdout: BufReader<ChildStdout>, + server_stderr: BufReader<ChildStderr>, } impl Transport { pub fn start( - reader: BufReader<ChildStdout>, - writer: BufWriter<ChildStdin>, - stderr: BufReader<ChildStderr>, + server_stdout: BufReader<ChildStdout>, + server_stdin: BufWriter<ChildStdin>, + server_stderr: BufReader<ChildStderr>, ) -> (UnboundedReceiver<jsonrpc::Call>, UnboundedSender<Payload>) { - let (incoming, rx) = unbounded_channel(); - let (tx, outgoing) = unbounded_channel(); + let (client_tx, rx) = unbounded_channel(); + let (tx, client_rx) = unbounded_channel(); let transport = Self { - reader, - writer, - stderr, - incoming, - outgoing, + server_stdout, + server_stdin, + server_stderr, + client_tx, + client_rx, pending_requests: HashMap::default(), - headers: HashMap::default(), }; tokio::spawn(transport.duplex()); @@ -74,105 +65,104 @@ impl Transport { (rx, tx) } - async fn recv( + async fn recv_server_message( reader: &mut (impl AsyncBufRead + Unpin + Send), - headers: &mut HashMap<String, String>, - ) -> core::result::Result<Message, std::io::Error> { - // read headers + buffer: &mut String, + ) -> Result<ServerMessage> { + let mut content_length = None; loop { - let mut header = String::new(); - // detect pipe closed if 0 - reader.read_line(&mut header).await?; - let header = header.trim(); + buffer.truncate(0); + reader.read_line(buffer).await?; + let header = buffer.trim(); if header.is_empty() { break; } - let parts: Vec<&str> = header.split(": ").collect(); - if parts.len() != 2 { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Failed to parse header", - )); + let mut parts = header.split(": "); + + match (parts.next(), parts.next(), parts.next()) { + (Some("Content-Length"), Some(value), None) => { + content_length = Some(value.parse().unwrap()); + } + (Some(_), Some(_), None) => {} + _ => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse header", + ) + .into()); + } } - headers.insert(parts[0].to_string(), parts[1].to_string()); } - // find content-length - let content_length = headers.get("Content-Length").unwrap().parse().unwrap(); + let content_length = content_length.unwrap(); + //TODO: reuse vector let mut content = vec![0; content_length]; reader.read_exact(&mut content).await?; let msg = String::from_utf8(content).unwrap(); - // read data - info!("<- {}", msg); // try parsing as output (server response) or call (server request) - let output: serde_json::Result<Message> = serde_json::from_str(&msg); + let output: serde_json::Result<ServerMessage> = serde_json::from_str(&msg); Ok(output?) } - async fn err( + async fn recv_server_error( err: &mut (impl AsyncBufRead + Unpin + Send), - ) -> core::result::Result<(), std::io::Error> { - let mut line = String::new(); - err.read_line(&mut line).await?; - error!("err <- {}", line); + buffer: &mut String, + ) -> Result<()> { + buffer.truncate(0); + err.read_line(buffer).await?; + error!("err <- {}", buffer); Ok(()) } - pub async fn send_payload(&mut self, payload: Payload) -> io::Result<()> { - match payload { + async fn send_payload_to_server(&mut self, payload: Payload) -> Result<()> { + //TODO: reuse string + let json = match payload { Payload::Request { chan, value } => { self.pending_requests.insert(value.id.clone(), chan); - - let json = serde_json::to_string(&value)?; - self.send(json).await - } - Payload::Notification(value) => { - let json = serde_json::to_string(&value)?; - self.send(json).await + serde_json::to_string(&value)? } - Payload::Response(error) => { - let json = serde_json::to_string(&error)?; - self.send(json).await - } - } + Payload::Notification(value) => serde_json::to_string(&value)?, + Payload::Response(error) => serde_json::to_string(&error)?, + }; + self.send_string_to_server(json).await } - pub async fn send(&mut self, request: String) -> io::Result<()> { + async fn send_string_to_server(&mut self, request: String) -> Result<()> { info!("-> {}", request); // send the headers - self.writer + self.server_stdin .write_all(format!("Content-Length: {}\r\n\r\n", request.len()).as_bytes()) .await?; // send the body - self.writer.write_all(request.as_bytes()).await?; + self.server_stdin.write_all(request.as_bytes()).await?; - self.writer.flush().await?; + self.server_stdin.flush().await?; Ok(()) } - async fn recv_msg(&mut self, msg: Message) -> anyhow::Result<()> { + async fn process_server_message(&mut self, msg: ServerMessage) -> Result<()> { match msg { - Message::Output(output) => self.recv_response(output).await?, - Message::Call(call) => { - self.incoming.send(call).unwrap(); + ServerMessage::Output(output) => self.process_request_response(output).await?, + ServerMessage::Call(call) => { + self.client_tx.send(call).unwrap(); // let notification = Notification::parse(&method, params); } }; Ok(()) } - async fn recv_response(&mut self, output: jsonrpc::Output) -> io::Result<()> { + async fn process_request_response(&mut self, output: jsonrpc::Output) -> Result<()> { let (id, result) = match output { jsonrpc::Output::Success(jsonrpc::Success { id, result, .. }) => { info!("<- {}", result); @@ -200,29 +190,33 @@ impl Transport { Ok(()) } - pub async fn duplex(mut self) { + async fn duplex(mut self) { + let mut recv_buffer = String::new(); + let mut err_buffer = String::new(); loop { tokio::select! { // client -> server - msg = self.outgoing.recv() => { - if msg.is_none() { - break; + msg = self.client_rx.recv() => { + match msg { + Some(msg) => { + self.send_payload_to_server(msg).await.unwrap() + }, + None => break } - let msg = msg.unwrap(); - - self.send_payload(msg).await.unwrap(); } - // server <- client - msg = Self::recv(&mut self.reader, &mut self.headers) => { - if msg.is_err() { - error!("err: <- {:?}", msg); - break; + // server -> client + msg = Self::recv_server_message(&mut self.server_stdout, &mut recv_buffer) => { + match msg { + Ok(msg) => { + self.process_server_message(msg).await.unwrap(); + } + Err(_) => { + error!("err: <- {:?}", msg); + break; + }, } - let msg = msg.unwrap(); - - self.recv_msg(msg).await.unwrap(); } - _msg = Self::err(&mut self.stderr) => {} + _msg = Self::recv_server_error(&mut self.server_stderr, &mut err_buffer) => {} } } } |