From ea6667070f05227f5d87eaf977ca0c7ddc982c0a Mon Sep 17 00:00:00 2001 From: Egor Karavaev Date: Mon, 7 Jun 2021 22:11:17 +0300 Subject: helix-lsp cleanup --- Cargo.lock | 16 ---- helix-lsp/Cargo.toml | 21 ++---- helix-lsp/src/client.rs | 61 +++++---------- helix-lsp/src/lib.rs | 27 +++---- helix-lsp/src/transport.rs | 180 ++++++++++++++++++++++----------------------- 5 files changed, 125 insertions(+), 180 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9136db2f..7e1db781 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -237,12 +237,6 @@ dependencies = [ "wasi", ] -[[package]] -name = "glob" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" - [[package]] name = "globset" version = "0.4.6" @@ -282,19 +276,15 @@ dependencies = [ "anyhow", "futures-executor", "futures-util", - "glob", "helix-core", "jsonrpc-core", "log", "lsp-types", - "once_cell", - "pathdiff", "serde", "serde_json", "thiserror", "tokio", "tokio-stream", - "url", ] [[package]] @@ -596,12 +586,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "pathdiff" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877630b3de15c0b64cc52f659345724fbf6bdad9bd9566699fc53688f3c34a34" - [[package]] name = "percent-encoding" version = "2.1.0" diff --git a/helix-lsp/Cargo.toml b/helix-lsp/Cargo.toml index f4d2849f..1384ce67 100644 --- a/helix-lsp/Cargo.toml +++ b/helix-lsp/Cargo.toml @@ -10,19 +10,14 @@ license = "MPL-2.0" [dependencies] helix-core = { path = "../helix-core" } -once_cell = "1.4" - +anyhow = "1.0" +futures-executor = "0.3" +futures-util = { version = "0.3", features = ["std", "async-await"], default-features = false } +jsonrpc-core = { version = "17.1", default-features = false } # don't pull in all of futures +log = "0.4" lsp-types = { version = "0.89", features = ["proposed"] } -tokio = { version = "1", features = ["full"] } -tokio-stream = "0.1.5" -futures-executor = { version = "0.3" } -url = "2" -pathdiff = "0.2" -glob = "0.3" -anyhow = "1" -serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } -jsonrpc-core = { version = "17.1", default-features = false } # don't pull in all of futures -futures-util = { version = "0.3", features = ["std", "async-await"], default-features = false } +serde_json = "1.0" thiserror = "1.0" -log = "~0.4" +tokio = { version = "1.6", features = ["full"] } +tokio-stream = "0.1.6" \ No newline at end of file diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index 41c38fd9..9c85beef 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -4,30 +4,22 @@ use crate::{ }; use helix_core::{find_root, ChangeSet, Rope}; - -// use std::collections::HashMap; -use std::future::Future; -use std::sync::atomic::{AtomicU64, Ordering}; - use jsonrpc_core as jsonrpc; use lsp_types as lsp; use serde_json::Value; - +use std::future::Future; use std::process::Stdio; +use std::sync::atomic::{AtomicU64, Ordering}; use tokio::{ io::{BufReader, BufWriter}, - // prelude::*, process::{Child, Command}, sync::mpsc::{channel, UnboundedReceiver, UnboundedSender}, }; pub struct Client { _process: Child, - - outgoing: UnboundedSender, - // pub incoming: Receiver, - pub request_counter: AtomicU64, - + server_tx: UnboundedSender, + request_counter: AtomicU64, capabilities: Option, offset_encoding: OffsetEncoding, } @@ -43,40 +35,27 @@ impl Client { .kill_on_drop(true) .spawn(); - // use std::io::ErrorKind; - let mut process = match process { - Ok(process) => process, - Err(err) => match err.kind() { - // ErrorKind::NotFound | ErrorKind::PermissionDenied => { - // return Err(Error::Other(err.into())) - // } - _kind => return Err(Error::Other(err.into())), - }, - }; + let mut process = process?; // TODO: do we need bufreader/writer here? or do we use async wrappers on unblock? let writer = BufWriter::new(process.stdin.take().expect("Failed to open stdin")); let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr")); - let (incoming, outgoing) = Transport::start(reader, writer, stderr); + let (server_rx, server_tx) = Transport::start(reader, writer, stderr); let client = Self { _process: process, - - outgoing, - // incoming, + server_tx, request_counter: AtomicU64::new(0), - capabilities: None, - // diagnostics: HashMap::new(), offset_encoding: OffsetEncoding::Utf8, }; // TODO: async client.initialize() // maybe use an arc flag - Ok((client, incoming)) + Ok((client, server_rx)) } fn next_request_id(&self) -> jsonrpc::Id { @@ -106,7 +85,7 @@ impl Client { } /// Execute a RPC request on the language server. - pub async fn request(&self, params: R::Params) -> Result + async fn request(&self, params: R::Params) -> Result where R::Params: serde::Serialize, R::Result: core::fmt::Debug, // TODO: temporary @@ -118,17 +97,20 @@ impl Client { } /// Execute a RPC request on the language server. - pub fn call( + fn call( &self, params: R::Params, ) -> impl Future> where R::Params: serde::Serialize, { - let outgoing = self.outgoing.clone(); + let server_tx = self.server_tx.clone(); let id = self.next_request_id(); async move { + use std::time::Duration; + use tokio::time::timeout; + let params = serde_json::to_value(params)?; let request = jsonrpc::MethodCall { @@ -140,16 +122,13 @@ impl Client { let (tx, mut rx) = channel::>(1); - outgoing + server_tx .send(Payload::Request { chan: tx, value: request, }) .map_err(|e| Error::Other(e.into()))?; - use std::time::Duration; - use tokio::time::timeout; - timeout(Duration::from_secs(2), rx.recv()) .await .map_err(|_| Error::Timeout)? // return Timeout @@ -158,14 +137,14 @@ impl Client { } /// Send a RPC notification to the language server. - pub fn notify( + fn notify( &self, params: R::Params, ) -> impl Future> where R::Params: serde::Serialize, { - let outgoing = self.outgoing.clone(); + let server_tx = self.server_tx.clone(); async move { let params = serde_json::to_value(params)?; @@ -176,7 +155,7 @@ impl Client { params: Self::value_into_params(params), }; - outgoing + server_tx .send(Payload::Notification(notification)) .map_err(|e| Error::Other(e.into()))?; @@ -205,7 +184,7 @@ impl Client { }), }; - self.outgoing + self.server_tx .send(Payload::Response(output)) .map_err(|e| Error::Other(e.into()))?; @@ -216,7 +195,7 @@ impl Client { // General messages // ------------------------------------------------------------------------------------------- - pub async fn initialize(&mut self) -> Result<()> { + pub(crate) async fn initialize(&mut self) -> Result<()> { // TODO: delay any requests that are triggered prior to initialize let root = find_root(None).and_then(|root| lsp::Url::from_file_path(root).ok()); diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index af8c9005..5c482774 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -1,28 +1,27 @@ mod client; mod transport; -pub use jsonrpc_core as jsonrpc; -pub use lsp_types as lsp; - pub use client::Client; +pub use futures_executor::block_on; +pub use jsonrpc::Call; +pub use jsonrpc_core as jsonrpc; pub use lsp::{Position, Url}; +pub use lsp_types as lsp; -pub type Result = core::result::Result; - +use futures_util::stream::select_all::SelectAll; use helix_core::syntax::LanguageConfiguration; -use thiserror::Error; - use std::{ collections::{hash_map::Entry, HashMap}, sync::Arc, }; use serde::{Deserialize, Serialize}; - +use thiserror::Error; use tokio_stream::wrappers::UnboundedReceiverStream; -pub use futures_executor::block_on; +pub type Result = core::result::Result; +type LanguageId = String; #[derive(Error, Debug)] pub enum Error { @@ -30,6 +29,8 @@ pub enum Error { Rpc(#[from] jsonrpc::Error), #[error("failed to parse: {0}")] Parse(#[from] serde_json::Error), + #[error("IO Error: {0}")] + IO(#[from] std::io::Error), #[error("request timed out")] Timeout, #[error("server closed the stream")] @@ -126,8 +127,6 @@ pub mod util { }), ) } - - // apply_insert_replace_edit } #[derive(Debug, PartialEq, Clone)] @@ -173,12 +172,6 @@ impl Notification { } } -pub use jsonrpc::Call; - -type LanguageId = String; - -use futures_util::stream::select_all::SelectAll; - pub struct Registry { inner: HashMap>, diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index dd42f7bb..c69c1ccf 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -1,15 +1,9 @@ -use std::collections::HashMap; -use std::io; - -use log::{error, info}; - -use crate::Error; - -type Result = core::result::Result; - +use crate::Result; use jsonrpc_core as jsonrpc; +use log::{error, info}; +use serde::{Deserialize, Serialize}; use serde_json::Value; - +use std::collections::HashMap; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, process::{ChildStderr, ChildStdin, ChildStdout}, @@ -26,12 +20,11 @@ pub enum Payload { Response(jsonrpc::Output), } -use serde::{Deserialize, Serialize}; /// A type representing all possible values sent from the server to the client. #[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] #[serde(deny_unknown_fields)] #[serde(untagged)] -enum Message { +enum ServerMessage { /// A regular JSON-RPC request output (single response). Output(jsonrpc::Output), /// A JSON-RPC request or notification. @@ -39,34 +32,32 @@ enum Message { } pub struct Transport { - incoming: UnboundedSender, - outgoing: UnboundedReceiver, + client_tx: UnboundedSender, + client_rx: UnboundedReceiver, pending_requests: HashMap>>, - headers: HashMap, - writer: BufWriter, - reader: BufReader, - stderr: BufReader, + server_stdin: BufWriter, + server_stdout: BufReader, + server_stderr: BufReader, } impl Transport { pub fn start( - reader: BufReader, - writer: BufWriter, - stderr: BufReader, + server_stdout: BufReader, + server_stdin: BufWriter, + server_stderr: BufReader, ) -> (UnboundedReceiver, UnboundedSender) { - let (incoming, rx) = unbounded_channel(); - let (tx, outgoing) = unbounded_channel(); + let (client_tx, rx) = unbounded_channel(); + let (tx, client_rx) = unbounded_channel(); let transport = Self { - reader, - writer, - stderr, - incoming, - outgoing, + server_stdout, + server_stdin, + server_stderr, + client_tx, + client_rx, pending_requests: HashMap::default(), - headers: HashMap::default(), }; tokio::spawn(transport.duplex()); @@ -74,105 +65,104 @@ impl Transport { (rx, tx) } - async fn recv( + async fn recv_server_message( reader: &mut (impl AsyncBufRead + Unpin + Send), - headers: &mut HashMap, - ) -> core::result::Result { - // read headers + buffer: &mut String, + ) -> Result { + let mut content_length = None; loop { - let mut header = String::new(); - // detect pipe closed if 0 - reader.read_line(&mut header).await?; - let header = header.trim(); + buffer.truncate(0); + reader.read_line(buffer).await?; + let header = buffer.trim(); if header.is_empty() { break; } - let parts: Vec<&str> = header.split(": ").collect(); - if parts.len() != 2 { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Failed to parse header", - )); + let mut parts = header.split(": "); + + match (parts.next(), parts.next(), parts.next()) { + (Some("Content-Length"), Some(value), None) => { + content_length = Some(value.parse().unwrap()); + } + (Some(_), Some(_), None) => {} + _ => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse header", + ) + .into()); + } } - headers.insert(parts[0].to_string(), parts[1].to_string()); } - // find content-length - let content_length = headers.get("Content-Length").unwrap().parse().unwrap(); + let content_length = content_length.unwrap(); + //TODO: reuse vector let mut content = vec![0; content_length]; reader.read_exact(&mut content).await?; let msg = String::from_utf8(content).unwrap(); - // read data - info!("<- {}", msg); // try parsing as output (server response) or call (server request) - let output: serde_json::Result = serde_json::from_str(&msg); + let output: serde_json::Result = serde_json::from_str(&msg); Ok(output?) } - async fn err( + async fn recv_server_error( err: &mut (impl AsyncBufRead + Unpin + Send), - ) -> core::result::Result<(), std::io::Error> { - let mut line = String::new(); - err.read_line(&mut line).await?; - error!("err <- {}", line); + buffer: &mut String, + ) -> Result<()> { + buffer.truncate(0); + err.read_line(buffer).await?; + error!("err <- {}", buffer); Ok(()) } - pub async fn send_payload(&mut self, payload: Payload) -> io::Result<()> { - match payload { + async fn send_payload_to_server(&mut self, payload: Payload) -> Result<()> { + //TODO: reuse string + let json = match payload { Payload::Request { chan, value } => { self.pending_requests.insert(value.id.clone(), chan); - - let json = serde_json::to_string(&value)?; - self.send(json).await - } - Payload::Notification(value) => { - let json = serde_json::to_string(&value)?; - self.send(json).await + serde_json::to_string(&value)? } - Payload::Response(error) => { - let json = serde_json::to_string(&error)?; - self.send(json).await - } - } + Payload::Notification(value) => serde_json::to_string(&value)?, + Payload::Response(error) => serde_json::to_string(&error)?, + }; + self.send_string_to_server(json).await } - pub async fn send(&mut self, request: String) -> io::Result<()> { + async fn send_string_to_server(&mut self, request: String) -> Result<()> { info!("-> {}", request); // send the headers - self.writer + self.server_stdin .write_all(format!("Content-Length: {}\r\n\r\n", request.len()).as_bytes()) .await?; // send the body - self.writer.write_all(request.as_bytes()).await?; + self.server_stdin.write_all(request.as_bytes()).await?; - self.writer.flush().await?; + self.server_stdin.flush().await?; Ok(()) } - async fn recv_msg(&mut self, msg: Message) -> anyhow::Result<()> { + async fn process_server_message(&mut self, msg: ServerMessage) -> Result<()> { match msg { - Message::Output(output) => self.recv_response(output).await?, - Message::Call(call) => { - self.incoming.send(call).unwrap(); + ServerMessage::Output(output) => self.process_request_response(output).await?, + ServerMessage::Call(call) => { + self.client_tx.send(call).unwrap(); // let notification = Notification::parse(&method, params); } }; Ok(()) } - async fn recv_response(&mut self, output: jsonrpc::Output) -> io::Result<()> { + async fn process_request_response(&mut self, output: jsonrpc::Output) -> Result<()> { let (id, result) = match output { jsonrpc::Output::Success(jsonrpc::Success { id, result, .. }) => { info!("<- {}", result); @@ -200,29 +190,33 @@ impl Transport { Ok(()) } - pub async fn duplex(mut self) { + async fn duplex(mut self) { + let mut recv_buffer = String::new(); + let mut err_buffer = String::new(); loop { tokio::select! { // client -> server - msg = self.outgoing.recv() => { - if msg.is_none() { - break; + msg = self.client_rx.recv() => { + match msg { + Some(msg) => { + self.send_payload_to_server(msg).await.unwrap() + }, + None => break } - let msg = msg.unwrap(); - - self.send_payload(msg).await.unwrap(); } - // server <- client - msg = Self::recv(&mut self.reader, &mut self.headers) => { - if msg.is_err() { - error!("err: <- {:?}", msg); - break; + // server -> client + msg = Self::recv_server_message(&mut self.server_stdout, &mut recv_buffer) => { + match msg { + Ok(msg) => { + self.process_server_message(msg).await.unwrap(); + } + Err(_) => { + error!("err: <- {:?}", msg); + break; + }, } - let msg = msg.unwrap(); - - self.recv_msg(msg).await.unwrap(); } - _msg = Self::err(&mut self.stderr) => {} + _msg = Self::recv_server_error(&mut self.server_stderr, &mut err_buffer) => {} } } } -- cgit v1.2.3-70-g09d2