diff options
Diffstat (limited to 'helix-lsp/src/transport.rs')
-rw-r--r-- | helix-lsp/src/transport.rs | 137 |
1 files changed, 115 insertions, 22 deletions
diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index 67b7b48f..6e28094d 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -1,7 +1,7 @@ -use crate::Result; +use crate::{Error, Result}; use anyhow::Context; use jsonrpc_core as jsonrpc; -use log::{debug, error, info, warn}; +use log::{error, info}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; @@ -11,7 +11,7 @@ use tokio::{ process::{ChildStderr, ChildStdin, ChildStdout}, sync::{ mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, - Mutex, + Mutex, Notify, }, }; @@ -51,9 +51,11 @@ impl Transport { ) -> ( UnboundedReceiver<(usize, jsonrpc::Call)>, UnboundedSender<Payload>, + Arc<Notify>, ) { let (client_tx, rx) = unbounded_channel(); let (tx, client_rx) = unbounded_channel(); + let notify = Arc::new(Notify::new()); let transport = Self { id, @@ -62,11 +64,21 @@ impl Transport { let transport = Arc::new(transport); - tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx)); + tokio::spawn(Self::recv( + transport.clone(), + server_stdout, + client_tx.clone(), + )); tokio::spawn(Self::err(transport.clone(), server_stderr)); - tokio::spawn(Self::send(transport, server_stdin, client_rx)); - - (rx, tx) + tokio::spawn(Self::send( + transport, + server_stdin, + client_tx, + client_rx, + notify.clone(), + )); + + (rx, tx, notify) } async fn recv_server_message( @@ -76,14 +88,18 @@ impl Transport { let mut content_length = None; loop { buffer.truncate(0); - reader.read_line(buffer).await?; - let header = buffer.trim(); + if reader.read_line(buffer).await? == 0 { + return Err(Error::StreamClosed); + }; + + // debug!("<- header {:?}", buffer); - if header.is_empty() { + if buffer == "\r\n" { + // look for an empty CRLF line break; } - debug!("<- header {}", header); + let header = buffer.trim(); let parts = header.split_once(": "); @@ -96,7 +112,8 @@ impl Transport { // Workaround: Some non-conformant language servers will output logging and other garbage // into the same stream as JSON-RPC messages. This can also happen from shell scripts that spawn // the server. Skip such lines and log a warning. - warn!("Failed to parse header: {:?}", header); + + // warn!("Failed to parse header: {:?}", header); } } } @@ -121,8 +138,10 @@ impl Transport { buffer: &mut String, ) -> Result<()> { buffer.truncate(0); - err.read_line(buffer).await?; - error!("err <- {}", buffer); + if err.read_line(buffer).await? == 0 { + return Err(Error::StreamClosed); + }; + error!("err <- {:?}", buffer); Ok(()) } @@ -255,16 +274,90 @@ impl Transport { async fn send( transport: Arc<Self>, mut server_stdin: BufWriter<ChildStdin>, + client_tx: UnboundedSender<(usize, jsonrpc::Call)>, mut client_rx: UnboundedReceiver<Payload>, + initialize_notify: Arc<Notify>, ) { - while let Some(msg) = client_rx.recv().await { - match transport - .send_payload_to_server(&mut server_stdin, msg) - .await - { - Ok(_) => {} - Err(err) => { - error!("err: <- {:?}", err); + let mut pending_messages: Vec<Payload> = Vec::new(); + let mut is_pending = true; + + // Determine if a message is allowed to be sent early + fn is_initialize(payload: &Payload) -> bool { + use lsp_types::{ + notification::{Initialized, Notification}, + request::{Initialize, Request}, + }; + match payload { + Payload::Request { + value: jsonrpc::MethodCall { method, .. }, + .. + } if method == Initialize::METHOD => true, + Payload::Notification(jsonrpc::Notification { method, .. }) + if method == Initialized::METHOD => + { + true + } + _ => false, + } + } + + // TODO: events that use capabilities need to do the right thing + + loop { + tokio::select! { + biased; + _ = initialize_notify.notified() => { // TODO: notified is technically not cancellation safe + // server successfully initialized + is_pending = false; + + use lsp_types::notification::Notification; + // Hack: inject an initialized notification so we trigger code that needs to happen after init + let notification = ServerMessage::Call(jsonrpc::Call::Notification(jsonrpc::Notification { + jsonrpc: None, + + method: lsp_types::notification::Initialized::METHOD.to_string(), + params: jsonrpc::Params::None, + })); + match transport.process_server_message(&client_tx, notification).await { + Ok(_) => {} + Err(err) => { + error!("err: <- {:?}", err); + } + } + + // drain the pending queue and send payloads to server + for msg in pending_messages.drain(..) { + log::info!("Draining pending message {:?}", msg); + match transport.send_payload_to_server(&mut server_stdin, msg).await { + Ok(_) => {} + Err(err) => { + error!("err: <- {:?}", err); + } + } + } + } + msg = client_rx.recv() => { + if let Some(msg) = msg { + if is_pending && !is_initialize(&msg) { + // ignore notifications + if let Payload::Notification(_) = msg { + continue; + } + + log::info!("Language server not initialized, delaying request"); + pending_messages.push(msg); + } else { + match transport.send_payload_to_server(&mut server_stdin, msg).await { + Ok(_) => {} + Err(err) => { + error!("err: <- {:?}", err); + } + } + } + } else { + // channel closed + break; + } } } } |