diff options
Diffstat (limited to 'helix-lsp/src')
-rw-r--r-- | helix-lsp/src/client.rs | 61 | ||||
-rw-r--r-- | helix-lsp/src/lib.rs | 27 | ||||
-rw-r--r-- | helix-lsp/src/transport.rs | 180 |
3 files changed, 117 insertions, 151 deletions
diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index 41c38fd9..9c85beef 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -4,30 +4,22 @@ use crate::{ }; use helix_core::{find_root, ChangeSet, Rope}; - -// use std::collections::HashMap; -use std::future::Future; -use std::sync::atomic::{AtomicU64, Ordering}; - use jsonrpc_core as jsonrpc; use lsp_types as lsp; use serde_json::Value; - +use std::future::Future; use std::process::Stdio; +use std::sync::atomic::{AtomicU64, Ordering}; use tokio::{ io::{BufReader, BufWriter}, - // prelude::*, process::{Child, Command}, sync::mpsc::{channel, UnboundedReceiver, UnboundedSender}, }; pub struct Client { _process: Child, - - outgoing: UnboundedSender<Payload>, - // pub incoming: Receiver<Call>, - pub request_counter: AtomicU64, - + server_tx: UnboundedSender<Payload>, + request_counter: AtomicU64, capabilities: Option<lsp::ServerCapabilities>, offset_encoding: OffsetEncoding, } @@ -43,40 +35,27 @@ impl Client { .kill_on_drop(true) .spawn(); - // use std::io::ErrorKind; - let mut process = match process { - Ok(process) => process, - Err(err) => match err.kind() { - // ErrorKind::NotFound | ErrorKind::PermissionDenied => { - // return Err(Error::Other(err.into())) - // } - _kind => return Err(Error::Other(err.into())), - }, - }; + let mut process = process?; // TODO: do we need bufreader/writer here? or do we use async wrappers on unblock? let writer = BufWriter::new(process.stdin.take().expect("Failed to open stdin")); let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr")); - let (incoming, outgoing) = Transport::start(reader, writer, stderr); + let (server_rx, server_tx) = Transport::start(reader, writer, stderr); let client = Self { _process: process, - - outgoing, - // incoming, + server_tx, request_counter: AtomicU64::new(0), - capabilities: None, - // diagnostics: HashMap::new(), offset_encoding: OffsetEncoding::Utf8, }; // TODO: async client.initialize() // maybe use an arc<atomic> flag - Ok((client, incoming)) + Ok((client, server_rx)) } fn next_request_id(&self) -> jsonrpc::Id { @@ -106,7 +85,7 @@ impl Client { } /// Execute a RPC request on the language server. - pub async fn request<R: lsp::request::Request>(&self, params: R::Params) -> Result<R::Result> + async fn request<R: lsp::request::Request>(&self, params: R::Params) -> Result<R::Result> where R::Params: serde::Serialize, R::Result: core::fmt::Debug, // TODO: temporary @@ -118,17 +97,20 @@ impl Client { } /// Execute a RPC request on the language server. - pub fn call<R: lsp::request::Request>( + fn call<R: lsp::request::Request>( &self, params: R::Params, ) -> impl Future<Output = Result<Value>> where R::Params: serde::Serialize, { - let outgoing = self.outgoing.clone(); + let server_tx = self.server_tx.clone(); let id = self.next_request_id(); async move { + use std::time::Duration; + use tokio::time::timeout; + let params = serde_json::to_value(params)?; let request = jsonrpc::MethodCall { @@ -140,16 +122,13 @@ impl Client { let (tx, mut rx) = channel::<Result<Value>>(1); - outgoing + server_tx .send(Payload::Request { chan: tx, value: request, }) .map_err(|e| Error::Other(e.into()))?; - use std::time::Duration; - use tokio::time::timeout; - timeout(Duration::from_secs(2), rx.recv()) .await .map_err(|_| Error::Timeout)? // return Timeout @@ -158,14 +137,14 @@ impl Client { } /// Send a RPC notification to the language server. - pub fn notify<R: lsp::notification::Notification>( + fn notify<R: lsp::notification::Notification>( &self, params: R::Params, ) -> impl Future<Output = Result<()>> where R::Params: serde::Serialize, { - let outgoing = self.outgoing.clone(); + let server_tx = self.server_tx.clone(); async move { let params = serde_json::to_value(params)?; @@ -176,7 +155,7 @@ impl Client { params: Self::value_into_params(params), }; - outgoing + server_tx .send(Payload::Notification(notification)) .map_err(|e| Error::Other(e.into()))?; @@ -205,7 +184,7 @@ impl Client { }), }; - self.outgoing + self.server_tx .send(Payload::Response(output)) .map_err(|e| Error::Other(e.into()))?; @@ -216,7 +195,7 @@ impl Client { // General messages // ------------------------------------------------------------------------------------------- - pub async fn initialize(&mut self) -> Result<()> { + pub(crate) async fn initialize(&mut self) -> Result<()> { // TODO: delay any requests that are triggered prior to initialize let root = find_root(None).and_then(|root| lsp::Url::from_file_path(root).ok()); diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index af8c9005..5c482774 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -1,28 +1,27 @@ mod client; mod transport; -pub use jsonrpc_core as jsonrpc; -pub use lsp_types as lsp; - pub use client::Client; +pub use futures_executor::block_on; +pub use jsonrpc::Call; +pub use jsonrpc_core as jsonrpc; pub use lsp::{Position, Url}; +pub use lsp_types as lsp; -pub type Result<T> = core::result::Result<T, Error>; - +use futures_util::stream::select_all::SelectAll; use helix_core::syntax::LanguageConfiguration; -use thiserror::Error; - use std::{ collections::{hash_map::Entry, HashMap}, sync::Arc, }; use serde::{Deserialize, Serialize}; - +use thiserror::Error; use tokio_stream::wrappers::UnboundedReceiverStream; -pub use futures_executor::block_on; +pub type Result<T> = core::result::Result<T, Error>; +type LanguageId = String; #[derive(Error, Debug)] pub enum Error { @@ -30,6 +29,8 @@ pub enum Error { Rpc(#[from] jsonrpc::Error), #[error("failed to parse: {0}")] Parse(#[from] serde_json::Error), + #[error("IO Error: {0}")] + IO(#[from] std::io::Error), #[error("request timed out")] Timeout, #[error("server closed the stream")] @@ -126,8 +127,6 @@ pub mod util { }), ) } - - // apply_insert_replace_edit } #[derive(Debug, PartialEq, Clone)] @@ -173,12 +172,6 @@ impl Notification { } } -pub use jsonrpc::Call; - -type LanguageId = String; - -use futures_util::stream::select_all::SelectAll; - pub struct Registry { inner: HashMap<LanguageId, Arc<Client>>, diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index dd42f7bb..c69c1ccf 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -1,15 +1,9 @@ -use std::collections::HashMap; -use std::io; - -use log::{error, info}; - -use crate::Error; - -type Result<T> = core::result::Result<T, Error>; - +use crate::Result; use jsonrpc_core as jsonrpc; +use log::{error, info}; +use serde::{Deserialize, Serialize}; use serde_json::Value; - +use std::collections::HashMap; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, process::{ChildStderr, ChildStdin, ChildStdout}, @@ -26,12 +20,11 @@ pub enum Payload { Response(jsonrpc::Output), } -use serde::{Deserialize, Serialize}; /// A type representing all possible values sent from the server to the client. #[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] #[serde(deny_unknown_fields)] #[serde(untagged)] -enum Message { +enum ServerMessage { /// A regular JSON-RPC request output (single response). Output(jsonrpc::Output), /// A JSON-RPC request or notification. @@ -39,34 +32,32 @@ enum Message { } pub struct Transport { - incoming: UnboundedSender<jsonrpc::Call>, - outgoing: UnboundedReceiver<Payload>, + client_tx: UnboundedSender<jsonrpc::Call>, + client_rx: UnboundedReceiver<Payload>, pending_requests: HashMap<jsonrpc::Id, Sender<Result<Value>>>, - headers: HashMap<String, String>, - writer: BufWriter<ChildStdin>, - reader: BufReader<ChildStdout>, - stderr: BufReader<ChildStderr>, + server_stdin: BufWriter<ChildStdin>, + server_stdout: BufReader<ChildStdout>, + server_stderr: BufReader<ChildStderr>, } impl Transport { pub fn start( - reader: BufReader<ChildStdout>, - writer: BufWriter<ChildStdin>, - stderr: BufReader<ChildStderr>, + server_stdout: BufReader<ChildStdout>, + server_stdin: BufWriter<ChildStdin>, + server_stderr: BufReader<ChildStderr>, ) -> (UnboundedReceiver<jsonrpc::Call>, UnboundedSender<Payload>) { - let (incoming, rx) = unbounded_channel(); - let (tx, outgoing) = unbounded_channel(); + let (client_tx, rx) = unbounded_channel(); + let (tx, client_rx) = unbounded_channel(); let transport = Self { - reader, - writer, - stderr, - incoming, - outgoing, + server_stdout, + server_stdin, + server_stderr, + client_tx, + client_rx, pending_requests: HashMap::default(), - headers: HashMap::default(), }; tokio::spawn(transport.duplex()); @@ -74,105 +65,104 @@ impl Transport { (rx, tx) } - async fn recv( + async fn recv_server_message( reader: &mut (impl AsyncBufRead + Unpin + Send), - headers: &mut HashMap<String, String>, - ) -> core::result::Result<Message, std::io::Error> { - // read headers + buffer: &mut String, + ) -> Result<ServerMessage> { + let mut content_length = None; loop { - let mut header = String::new(); - // detect pipe closed if 0 - reader.read_line(&mut header).await?; - let header = header.trim(); + buffer.truncate(0); + reader.read_line(buffer).await?; + let header = buffer.trim(); if header.is_empty() { break; } - let parts: Vec<&str> = header.split(": ").collect(); - if parts.len() != 2 { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Failed to parse header", - )); + let mut parts = header.split(": "); + + match (parts.next(), parts.next(), parts.next()) { + (Some("Content-Length"), Some(value), None) => { + content_length = Some(value.parse().unwrap()); + } + (Some(_), Some(_), None) => {} + _ => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse header", + ) + .into()); + } } - headers.insert(parts[0].to_string(), parts[1].to_string()); } - // find content-length - let content_length = headers.get("Content-Length").unwrap().parse().unwrap(); + let content_length = content_length.unwrap(); + //TODO: reuse vector let mut content = vec![0; content_length]; reader.read_exact(&mut content).await?; let msg = String::from_utf8(content).unwrap(); - // read data - info!("<- {}", msg); // try parsing as output (server response) or call (server request) - let output: serde_json::Result<Message> = serde_json::from_str(&msg); + let output: serde_json::Result<ServerMessage> = serde_json::from_str(&msg); Ok(output?) } - async fn err( + async fn recv_server_error( err: &mut (impl AsyncBufRead + Unpin + Send), - ) -> core::result::Result<(), std::io::Error> { - let mut line = String::new(); - err.read_line(&mut line).await?; - error!("err <- {}", line); + buffer: &mut String, + ) -> Result<()> { + buffer.truncate(0); + err.read_line(buffer).await?; + error!("err <- {}", buffer); Ok(()) } - pub async fn send_payload(&mut self, payload: Payload) -> io::Result<()> { - match payload { + async fn send_payload_to_server(&mut self, payload: Payload) -> Result<()> { + //TODO: reuse string + let json = match payload { Payload::Request { chan, value } => { self.pending_requests.insert(value.id.clone(), chan); - - let json = serde_json::to_string(&value)?; - self.send(json).await - } - Payload::Notification(value) => { - let json = serde_json::to_string(&value)?; - self.send(json).await + serde_json::to_string(&value)? } - Payload::Response(error) => { - let json = serde_json::to_string(&error)?; - self.send(json).await - } - } + Payload::Notification(value) => serde_json::to_string(&value)?, + Payload::Response(error) => serde_json::to_string(&error)?, + }; + self.send_string_to_server(json).await } - pub async fn send(&mut self, request: String) -> io::Result<()> { + async fn send_string_to_server(&mut self, request: String) -> Result<()> { info!("-> {}", request); // send the headers - self.writer + self.server_stdin .write_all(format!("Content-Length: {}\r\n\r\n", request.len()).as_bytes()) .await?; // send the body - self.writer.write_all(request.as_bytes()).await?; + self.server_stdin.write_all(request.as_bytes()).await?; - self.writer.flush().await?; + self.server_stdin.flush().await?; Ok(()) } - async fn recv_msg(&mut self, msg: Message) -> anyhow::Result<()> { + async fn process_server_message(&mut self, msg: ServerMessage) -> Result<()> { match msg { - Message::Output(output) => self.recv_response(output).await?, - Message::Call(call) => { - self.incoming.send(call).unwrap(); + ServerMessage::Output(output) => self.process_request_response(output).await?, + ServerMessage::Call(call) => { + self.client_tx.send(call).unwrap(); // let notification = Notification::parse(&method, params); } }; Ok(()) } - async fn recv_response(&mut self, output: jsonrpc::Output) -> io::Result<()> { + async fn process_request_response(&mut self, output: jsonrpc::Output) -> Result<()> { let (id, result) = match output { jsonrpc::Output::Success(jsonrpc::Success { id, result, .. }) => { info!("<- {}", result); @@ -200,29 +190,33 @@ impl Transport { Ok(()) } - pub async fn duplex(mut self) { + async fn duplex(mut self) { + let mut recv_buffer = String::new(); + let mut err_buffer = String::new(); loop { tokio::select! { // client -> server - msg = self.outgoing.recv() => { - if msg.is_none() { - break; + msg = self.client_rx.recv() => { + match msg { + Some(msg) => { + self.send_payload_to_server(msg).await.unwrap() + }, + None => break } - let msg = msg.unwrap(); - - self.send_payload(msg).await.unwrap(); } - // server <- client - msg = Self::recv(&mut self.reader, &mut self.headers) => { - if msg.is_err() { - error!("err: <- {:?}", msg); - break; + // server -> client + msg = Self::recv_server_message(&mut self.server_stdout, &mut recv_buffer) => { + match msg { + Ok(msg) => { + self.process_server_message(msg).await.unwrap(); + } + Err(_) => { + error!("err: <- {:?}", msg); + break; + }, } - let msg = msg.unwrap(); - - self.recv_msg(msg).await.unwrap(); } - _msg = Self::err(&mut self.stderr) => {} + _msg = Self::recv_server_error(&mut self.server_stderr, &mut err_buffer) => {} } } } |