From 41f1e8e4fb4b3b387f34ef5d2e913e7ebc7fd888 Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Tue, 31 Aug 2021 14:27:55 +0900 Subject: fix: lsp: Terminate transport on EOF If stdout/stderr is closed, read_line will return 0 indicating EOF. --- helix-lsp/src/transport.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) (limited to 'helix-lsp/src') diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index 67b7b48f..9353de20 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -1,4 +1,4 @@ -use crate::Result; +use crate::{Error, Result}; use anyhow::Context; use jsonrpc_core as jsonrpc; use log::{debug, error, info, warn}; @@ -76,14 +76,17 @@ impl Transport { let mut content_length = None; loop { buffer.truncate(0); - reader.read_line(buffer).await?; - let header = buffer.trim(); + if reader.read_line(buffer).await? == 0 { + return Err(Error::StreamClosed); + }; + + // debug!("<- header {:?}", buffer); if header.is_empty() { break; } - debug!("<- header {}", header); + let header = buffer.trim(); let parts = header.split_once(": "); @@ -121,8 +124,10 @@ impl Transport { buffer: &mut String, ) -> Result<()> { buffer.truncate(0); - err.read_line(buffer).await?; - error!("err <- {}", buffer); + if err.read_line(buffer).await? == 0 { + return Err(Error::StreamClosed); + }; + error!("err <- {:?}", buffer); Ok(()) } -- cgit v1.2.3-70-g09d2 From c3a58cdadd8be85b79d773122e807862a3da3a2f Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Tue, 31 Aug 2021 16:03:06 +0900 Subject: lsp: Refactor capabilities as an async OnceCell First step in making LSP init asynchronous --- helix-lsp/src/client.rs | 35 ++++++++++++++--------------------- helix-lsp/src/lib.rs | 29 ++++++++++++++++++++++++++--- 2 files changed, 40 insertions(+), 24 deletions(-) (limited to 'helix-lsp/src') diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index d0a8183f..87078c69 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -13,7 +13,10 @@ use std::sync::atomic::{AtomicU64, Ordering}; use tokio::{ io::{BufReader, BufWriter}, process::{Child, Command}, - sync::mpsc::{channel, UnboundedReceiver, UnboundedSender}, + sync::{ + mpsc::{channel, UnboundedReceiver, UnboundedSender}, + OnceCell, + }, }; #[derive(Debug)] @@ -22,7 +25,7 @@ pub struct Client { _process: Child, server_tx: UnboundedSender, request_counter: AtomicU64, - capabilities: Option, + pub(crate) capabilities: OnceCell, offset_encoding: OffsetEncoding, config: Option, } @@ -57,14 +60,11 @@ impl Client { _process: process, server_tx, request_counter: AtomicU64::new(0), - capabilities: None, + capabilities: OnceCell::new(), offset_encoding: OffsetEncoding::Utf8, config, }; - // TODO: async client.initialize() - // maybe use an arc flag - Ok((client, server_rx)) } @@ -90,7 +90,7 @@ impl Client { pub fn capabilities(&self) -> &lsp::ServerCapabilities { self.capabilities - .as_ref() + .get() .expect("language server not yet initialized!") } @@ -151,7 +151,7 @@ impl Client { } /// Send a RPC notification to the language server. - fn notify( + pub fn notify( &self, params: R::Params, ) -> impl Future> @@ -213,7 +213,7 @@ impl Client { // General messages // ------------------------------------------------------------------------------------------- - pub(crate) async fn initialize(&mut self) -> Result<()> { + pub(crate) async fn initialize(&self) -> Result { // TODO: delay any requests that are triggered prior to initialize let root = find_root(None).and_then(|root| lsp::Url::from_file_path(root).ok()); @@ -281,14 +281,7 @@ impl Client { locale: None, // TODO }; - let response = self.request::(params).await?; - self.capabilities = Some(response.capabilities); - - // next up, notify - self.notify::(lsp::InitializedParams {}) - .await?; - - Ok(()) + self.request::(params).await } pub async fn shutdown(&self) -> Result<()> { @@ -445,7 +438,7 @@ impl Client { ) -> Option>> { // figure out what kind of sync the server supports - let capabilities = self.capabilities.as_ref().unwrap(); + let capabilities = self.capabilities.get().unwrap(); let sync_capabilities = match capabilities.text_document_sync { Some(lsp::TextDocumentSyncCapability::Kind(kind)) @@ -496,7 +489,7 @@ impl Client { text_document: lsp::TextDocumentIdentifier, text: &Rope, ) -> Result<()> { - let capabilities = self.capabilities.as_ref().unwrap(); + let capabilities = self.capabilities.get().unwrap(); let include_text = match &capabilities.text_document_sync { Some(lsp::TextDocumentSyncCapability::Options(lsp::TextDocumentSyncOptions { @@ -590,7 +583,7 @@ impl Client { options: lsp::FormattingOptions, work_done_token: Option, ) -> anyhow::Result> { - let capabilities = self.capabilities.as_ref().unwrap(); + let capabilities = self.capabilities.get().unwrap(); // check if we're able to format match capabilities.document_formatting_provider { @@ -618,7 +611,7 @@ impl Client { options: lsp::FormattingOptions, work_done_token: Option, ) -> anyhow::Result> { - let capabilities = self.capabilities.as_ref().unwrap(); + let capabilities = self.capabilities.get().unwrap(); // check if we're able to format match capabilities.document_range_formatting_provider { diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index 72606b70..a118239f 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -312,17 +312,40 @@ impl Registry { Entry::Vacant(entry) => { // initialize a new client let id = self.counter.fetch_add(1, Ordering::Relaxed); - let (mut client, incoming) = Client::start( + let (client, incoming) = Client::start( &config.command, &config.args, serde_json::from_str(language_config.config.as_deref().unwrap_or("")).ok(), id, )?; - // TODO: run this async without blocking - futures_executor::block_on(client.initialize())?; s_incoming.push(UnboundedReceiverStream::new(incoming)); let client = Arc::new(client); + let _client = client.clone(); + let initialize = tokio::spawn(async move { + use futures_util::TryFutureExt; + + let value = _client + .capabilities + .get_or_try_init(|| { + _client + .initialize() + .map_ok(|response| response.capabilities) + }) + .await; + + value.expect("failed to initialize capabilities"); + + // next up, notify + _client + .notify::(lsp::InitializedParams {}) + .await + .unwrap(); + }); + + // TODO: remove this block + futures_executor::block_on(initialize).map_err(|_| anyhow::anyhow!("bail"))?; + entry.insert((id, client.clone())); Ok(client) } -- cgit v1.2.3-70-g09d2 From 5a558e0d8e20eb5b5d474e0f27fd51f4c633dd80 Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Tue, 31 Aug 2021 16:48:59 +0900 Subject: lsp: Delay requests & notifications until initialization is complete --- helix-lsp/src/client.rs | 15 +++++--- helix-lsp/src/lib.rs | 11 +++--- helix-lsp/src/transport.rs | 91 ++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 91 insertions(+), 26 deletions(-) (limited to 'helix-lsp/src') diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index 87078c69..02cd5747 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -9,13 +9,16 @@ use lsp_types as lsp; use serde_json::Value; use std::future::Future; use std::process::Stdio; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; use tokio::{ io::{BufReader, BufWriter}, process::{Child, Command}, sync::{ mpsc::{channel, UnboundedReceiver, UnboundedSender}, - OnceCell, + Notify, OnceCell, }, }; @@ -31,12 +34,13 @@ pub struct Client { } impl Client { + #[allow(clippy::type_complexity)] pub fn start( cmd: &str, args: &[String], config: Option, id: usize, - ) -> Result<(Self, UnboundedReceiver<(usize, Call)>)> { + ) -> Result<(Self, UnboundedReceiver<(usize, Call)>, Arc)> { let process = Command::new(cmd) .args(args) .stdin(Stdio::piped()) @@ -53,7 +57,8 @@ impl Client { let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr")); - let (server_rx, server_tx) = Transport::start(reader, writer, stderr, id); + let (server_rx, server_tx, initialize_notify) = + Transport::start(reader, writer, stderr, id); let client = Self { id, @@ -65,7 +70,7 @@ impl Client { config, }; - Ok((client, server_rx)) + Ok((client, server_rx, initialize_notify)) } pub fn id(&self) -> usize { diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index a118239f..3a761ad0 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -312,7 +312,7 @@ impl Registry { Entry::Vacant(entry) => { // initialize a new client let id = self.counter.fetch_add(1, Ordering::Relaxed); - let (client, incoming) = Client::start( + let (client, incoming, initialize_notify) = Client::start( &config.command, &config.args, serde_json::from_str(language_config.config.as_deref().unwrap_or("")).ok(), @@ -322,9 +322,9 @@ impl Registry { let client = Arc::new(client); let _client = client.clone(); - let initialize = tokio::spawn(async move { + // Initialize the client asynchronously + tokio::spawn(async move { use futures_util::TryFutureExt; - let value = _client .capabilities .get_or_try_init(|| { @@ -341,10 +341,9 @@ impl Registry { .notify::(lsp::InitializedParams {}) .await .unwrap(); - }); - // TODO: remove this block - futures_executor::block_on(initialize).map_err(|_| anyhow::anyhow!("bail"))?; + initialize_notify.notify_one(); + }); entry.insert((id, client.clone())); Ok(client) diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index 9353de20..071c5b93 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -1,7 +1,7 @@ use crate::{Error, Result}; use anyhow::Context; use jsonrpc_core as jsonrpc; -use log::{debug, error, info, warn}; +use log::{error, info}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; @@ -11,7 +11,7 @@ use tokio::{ process::{ChildStderr, ChildStdin, ChildStdout}, sync::{ mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, - Mutex, + Mutex, Notify, }, }; @@ -51,9 +51,11 @@ impl Transport { ) -> ( UnboundedReceiver<(usize, jsonrpc::Call)>, UnboundedSender, + Arc, ) { let (client_tx, rx) = unbounded_channel(); let (tx, client_rx) = unbounded_channel(); + let notify = Arc::new(Notify::new()); let transport = Self { id, @@ -64,9 +66,14 @@ impl Transport { tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx)); tokio::spawn(Self::err(transport.clone(), server_stderr)); - tokio::spawn(Self::send(transport, server_stdin, client_rx)); - - (rx, tx) + tokio::spawn(Self::send( + transport, + server_stdin, + client_rx, + notify.clone(), + )); + + (rx, tx, notify) } async fn recv_server_message( @@ -82,7 +89,8 @@ impl Transport { // debug!("<- header {:?}", buffer); - if header.is_empty() { + if buffer == "\r\n" { + // look for an empty CRLF line break; } @@ -99,7 +107,8 @@ impl Transport { // Workaround: Some non-conformant language servers will output logging and other garbage // into the same stream as JSON-RPC messages. This can also happen from shell scripts that spawn // the server. Skip such lines and log a warning. - warn!("Failed to parse header: {:?}", header); + + // warn!("Failed to parse header: {:?}", header); } } } @@ -261,15 +270,67 @@ impl Transport { transport: Arc, mut server_stdin: BufWriter, mut client_rx: UnboundedReceiver, + initialize_notify: Arc, ) { - while let Some(msg) = client_rx.recv().await { - match transport - .send_payload_to_server(&mut server_stdin, msg) - .await - { - Ok(_) => {} - Err(err) => { - error!("err: <- {:?}", err); + let mut pending_messages: Vec = Vec::new(); + let mut is_pending = true; + + // Determine if a message is allowed to be sent early + fn is_initialize(payload: &Payload) -> bool { + use lsp_types::{ + notification::{Initialized, Notification}, + request::{Initialize, Request}, + }; + match payload { + Payload::Request { + value: jsonrpc::MethodCall { method, .. }, + .. + } if method == Initialize::METHOD => true, + Payload::Notification(jsonrpc::Notification { method, .. }) + if method == Initialized::METHOD => + { + true + } + _ => false, + } + } + + // TODO: events that use capabilities need to do the right thing + + loop { + tokio::select! { + biased; + _ = initialize_notify.notified() => { // TODO: notified is technically not cancellation safe + // server successfully initialized + is_pending = false; + // drain the pending queue and send payloads to server + for msg in pending_messages.drain(..) { + log::info!("Draining pending message {:?}", msg); + match transport.send_payload_to_server(&mut server_stdin, msg).await { + Ok(_) => {} + Err(err) => { + error!("err: <- {:?}", err); + } + } + } + } + msg = client_rx.recv() => { + if let Some(msg) = msg { + if is_pending && !is_initialize(&msg) { + log::info!("Language server not initialized, delaying request"); + pending_messages.push(msg); + } else { + match transport.send_payload_to_server(&mut server_stdin, msg).await { + Ok(_) => {} + Err(err) => { + error!("err: <- {:?}", err); + } + } + } + } else { + // channel closed + break; + } } } } -- cgit v1.2.3-70-g09d2 From 8744f367bdd3fce5e6cc6ee2b5198188598f5170 Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Tue, 31 Aug 2021 18:12:45 +0900 Subject: wip --- helix-lsp/src/client.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'helix-lsp/src') diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index 02cd5747..ac6ae70a 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -148,7 +148,8 @@ impl Client { }) .map_err(|e| Error::Other(e.into()))?; - timeout(Duration::from_secs(2), rx.recv()) + // TODO: specifiable timeout, delay other calls until initialize success + timeout(Duration::from_secs(20), rx.recv()) .await .map_err(|_| Error::Timeout)? // return Timeout .ok_or(Error::StreamClosed)? -- cgit v1.2.3-70-g09d2 From 184637c55acca49380372ca118f13b3390bcb003 Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Wed, 1 Sep 2021 14:49:48 +0900 Subject: lsp: refactor format so we stop cloning the language_server --- helix-lsp/src/client.rs | 14 +++++++++----- helix-view/src/document.rs | 23 +++++++++++++---------- 2 files changed, 22 insertions(+), 15 deletions(-) (limited to 'helix-lsp/src') diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index ac6ae70a..fdff553f 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -583,19 +583,19 @@ impl Client { // formatting - pub async fn text_document_formatting( + pub fn text_document_formatting( &self, text_document: lsp::TextDocumentIdentifier, options: lsp::FormattingOptions, work_done_token: Option, - ) -> anyhow::Result> { + ) -> Option>>> { let capabilities = self.capabilities.get().unwrap(); // check if we're able to format match capabilities.document_formatting_provider { Some(lsp::OneOf::Left(true)) | Some(lsp::OneOf::Right(_)) => (), // None | Some(false) - _ => return Ok(Vec::new()), + _ => return None, }; // TODO: return err::unavailable so we can fall back to tree sitter formatting @@ -605,9 +605,13 @@ impl Client { work_done_progress_params: lsp::WorkDoneProgressParams { work_done_token }, }; - let response = self.request::(params).await?; + let request = self.call::(params); - Ok(response.unwrap_or_default()) + Some(async move { + let json = request.await?; + let response: Vec = serde_json::from_value(json)?; + Ok(response) + }) } pub async fn text_document_range_formatting( diff --git a/helix-view/src/document.rs b/helix-view/src/document.rs index b2c02927..a27be8e6 100644 --- a/helix-view/src/document.rs +++ b/helix-view/src/document.rs @@ -386,21 +386,24 @@ impl Document { /// If supported, returns the changes that should be applied to this document in order /// to format it nicely. pub fn format(&self) -> Option + 'static> { - if let Some(language_server) = self.language_server.clone() { + if let Some(language_server) = self.language_server() { let text = self.text.clone(); - let id = self.identifier(); + let offset_encoding = language_server.offset_encoding(); + let request = language_server.text_document_formatting( + self.identifier(), + lsp::FormattingOptions::default(), + None, + )?; + let fut = async move { - let edits = language_server - .text_document_formatting(id, lsp::FormattingOptions::default(), None) - .await - .unwrap_or_else(|e| { - log::warn!("LSP formatting failed: {}", e); - Default::default() - }); + let edits = request.await.unwrap_or_else(|e| { + log::warn!("LSP formatting failed: {}", e); + Default::default() + }); LspFormatting { doc: text, edits, - offset_encoding: language_server.offset_encoding(), + offset_encoding, } }; Some(fut) -- cgit v1.2.3-70-g09d2 From 800d79b584cd09020488b8a614e5214b929d8f5d Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Wed, 1 Sep 2021 14:54:11 +0900 Subject: ls: Refactor textDocument/didSave in a similar vein --- helix-lsp/src/client.rs | 19 ++++++++++--------- helix-view/src/document.rs | 8 ++++---- 2 files changed, 14 insertions(+), 13 deletions(-) (limited to 'helix-lsp/src') diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index fdff553f..b8fbfddb 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -490,11 +490,11 @@ impl Client { // will_save / will_save_wait_until - pub async fn text_document_did_save( + pub fn text_document_did_save( &self, text_document: lsp::TextDocumentIdentifier, text: &Rope, - ) -> Result<()> { + ) -> Option>> { let capabilities = self.capabilities.get().unwrap(); let include_text = match &capabilities.text_document_sync { @@ -507,17 +507,18 @@ impl Client { include_text, }) => include_text.unwrap_or(false), // Supported(false) - _ => return Ok(()), + _ => return None, }, // unsupported - _ => return Ok(()), + _ => return None, }; - self.notify::(lsp::DidSaveTextDocumentParams { - text_document, - text: include_text.then(|| text.into()), - }) - .await + Some(self.notify::( + lsp::DidSaveTextDocumentParams { + text_document, + text: include_text.then(|| text.into()), + }, + )) } pub fn completion( diff --git a/helix-view/src/document.rs b/helix-view/src/document.rs index a27be8e6..5677eb44 100644 --- a/helix-view/src/document.rs +++ b/helix-view/src/document.rs @@ -471,10 +471,10 @@ impl Document { let mut file = File::create(path).await?; to_writer(&mut file, encoding, &text).await?; - if let Some(language_server) = language_server { - language_server - .text_document_did_save(identifier, &text) - .await?; + if let Some(notification) = language_server.and_then(|language_server| { + language_server.text_document_did_save(identifier, &text) + }) { + notification.await?; } Ok(()) -- cgit v1.2.3-70-g09d2 From 48fd4843fc4a28bfd05ea01ef0d10f4ea816db20 Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Thu, 2 Sep 2021 11:10:00 +0900 Subject: lsp: Outdated comment --- helix-lsp/src/lib.rs | 26 -------------------------- 1 file changed, 26 deletions(-) (limited to 'helix-lsp/src') diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index 3a761ad0..e10c107b 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -437,32 +437,6 @@ impl LspProgressMap { } } -// REGISTRY = HashMap>> -// spawn one server per language type, need to spawn one per workspace if server doesn't support -// workspaces -// -// could also be a client per root dir -// -// storing a copy of Option>> on Document would make the LSP client easily -// accessible during edit/save callbacks -// -// the event loop needs to process all incoming streams, maybe we can just have that be a separate -// task that's continually running and store the state on the client, then use read lock to -// retrieve data during render -// -> PROBLEM: how do you trigger an update on the editor side when data updates? -// -// -> The data updates should pull all events until we run out so we don't frequently re-render -// -// -// v2: -// -// there should be a registry of lsp clients, one per language type (or workspace). -// the clients should lazy init on first access -// the client.initialize() should be called async and we buffer any requests until that completes -// there needs to be a way to process incoming lsp messages from all clients. -// -> notifications need to be dispatched to wherever -// -> requests need to generate a reply and travel back to the same lsp! - #[cfg(test)] mod tests { use super::{lsp, util::*, OffsetEncoding}; -- cgit v1.2.3-70-g09d2 From 2793ff383228403b1ebaf2a29c870a13ee76075a Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Thu, 2 Sep 2021 13:52:12 +0900 Subject: lsp: SyncKind::Full: we need to send the whole document on each change --- helix-lsp/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'helix-lsp/src') diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index b8fbfddb..27e4697c 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -462,7 +462,7 @@ impl Client { // range = None -> whole document range: None, //Some(Range) range_length: None, // u64 apparently deprecated - text: "".to_string(), + text: new_text.to_string(), }] } lsp::TextDocumentSyncKind::Incremental => { -- cgit v1.2.3-70-g09d2 From 46f3c69f06cc55f36bcc6244a9f96c2481836dea Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Thu, 2 Sep 2021 13:55:08 +0900 Subject: lsp: Don't send notifications until initialize completes Then send open events for all documents with the LSP attached. --- helix-lsp/src/lib.rs | 98 +++++++++++++++++++++---------------------- helix-lsp/src/transport.rs | 29 ++++++++++++- helix-term/src/application.rs | 31 ++++++++++++++ helix-view/src/editor.rs | 5 ++- 4 files changed, 111 insertions(+), 52 deletions(-) (limited to 'helix-lsp/src') diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index e10c107b..7357c885 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -226,6 +226,8 @@ impl MethodCall { #[derive(Debug, PartialEq, Clone)] pub enum Notification { + // we inject this notification to signal the LSP is ready + Initialized, PublishDiagnostics(lsp::PublishDiagnosticsParams), ShowMessage(lsp::ShowMessageParams), LogMessage(lsp::LogMessageParams), @@ -237,6 +239,7 @@ impl Notification { use lsp::notification::Notification as _; let notification = match method { + lsp::notification::Initialized::METHOD => Self::Initialized, lsp::notification::PublishDiagnostics::METHOD => { let params: lsp::PublishDiagnosticsParams = params .parse() @@ -294,7 +297,7 @@ impl Registry { } } - pub fn get_by_id(&mut self, id: usize) -> Option<&Client> { + pub fn get_by_id(&self, id: usize) -> Option<&Client> { self.inner .values() .find(|(client_id, _)| client_id == &id) @@ -302,55 +305,52 @@ impl Registry { } pub fn get(&mut self, language_config: &LanguageConfiguration) -> Result> { - if let Some(config) = &language_config.language_server { - // avoid borrow issues - let inner = &mut self.inner; - let s_incoming = &mut self.incoming; - - match inner.entry(language_config.scope.clone()) { - Entry::Occupied(entry) => Ok(entry.get().1.clone()), - Entry::Vacant(entry) => { - // initialize a new client - let id = self.counter.fetch_add(1, Ordering::Relaxed); - let (client, incoming, initialize_notify) = Client::start( - &config.command, - &config.args, - serde_json::from_str(language_config.config.as_deref().unwrap_or("")).ok(), - id, - )?; - s_incoming.push(UnboundedReceiverStream::new(incoming)); - let client = Arc::new(client); - - let _client = client.clone(); - // Initialize the client asynchronously - tokio::spawn(async move { - use futures_util::TryFutureExt; - let value = _client - .capabilities - .get_or_try_init(|| { - _client - .initialize() - .map_ok(|response| response.capabilities) - }) - .await; - - value.expect("failed to initialize capabilities"); - - // next up, notify - _client - .notify::(lsp::InitializedParams {}) - .await - .unwrap(); - - initialize_notify.notify_one(); - }); - - entry.insert((id, client.clone())); - Ok(client) - } + let config = match &language_config.language_server { + Some(config) => config, + None => return Err(Error::LspNotDefined), + }; + + match self.inner.entry(language_config.scope.clone()) { + Entry::Occupied(entry) => Ok(entry.get().1.clone()), + Entry::Vacant(entry) => { + // initialize a new client + let id = self.counter.fetch_add(1, Ordering::Relaxed); + let (client, incoming, initialize_notify) = Client::start( + &config.command, + &config.args, + serde_json::from_str(language_config.config.as_deref().unwrap_or("")).ok(), + id, + )?; + self.incoming.push(UnboundedReceiverStream::new(incoming)); + let client = Arc::new(client); + + // Initialize the client asynchronously + let _client = client.clone(); + tokio::spawn(async move { + use futures_util::TryFutureExt; + let value = _client + .capabilities + .get_or_try_init(|| { + _client + .initialize() + .map_ok(|response| response.capabilities) + }) + .await; + + value.expect("failed to initialize capabilities"); + + // next up, notify + _client + .notify::(lsp::InitializedParams {}) + .await + .unwrap(); + + initialize_notify.notify_one(); + }); + + entry.insert((id, client.clone())); + Ok(client) } - } else { - Err(Error::LspNotDefined) } } diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index 071c5b93..cf7e66a8 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -64,11 +64,16 @@ impl Transport { let transport = Arc::new(transport); - tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx)); + tokio::spawn(Self::recv( + transport.clone(), + server_stdout, + client_tx.clone(), + )); tokio::spawn(Self::err(transport.clone(), server_stderr)); tokio::spawn(Self::send( transport, server_stdin, + client_tx, client_rx, notify.clone(), )); @@ -269,6 +274,7 @@ impl Transport { async fn send( transport: Arc, mut server_stdin: BufWriter, + mut client_tx: UnboundedSender<(usize, jsonrpc::Call)>, mut client_rx: UnboundedReceiver, initialize_notify: Arc, ) { @@ -303,6 +309,22 @@ impl Transport { _ = initialize_notify.notified() => { // TODO: notified is technically not cancellation safe // server successfully initialized is_pending = false; + + use lsp_types::notification::Notification; + // Hack: inject an initialized notification so we trigger code that needs to happen after init + let notification = ServerMessage::Call(jsonrpc::Call::Notification(jsonrpc::Notification { + jsonrpc: None, + + method: lsp_types::notification::Initialized::METHOD.to_string(), + params: jsonrpc::Params::None, + })); + match transport.process_server_message(&mut client_tx, notification).await { + Ok(_) => {} + Err(err) => { + error!("err: <- {:?}", err); + } + } + // drain the pending queue and send payloads to server for msg in pending_messages.drain(..) { log::info!("Draining pending message {:?}", msg); @@ -317,6 +339,11 @@ impl Transport { msg = client_rx.recv() => { if let Some(msg) = msg { if is_pending && !is_initialize(&msg) { + // ignore notifications + if let Payload::Notification(_) = msg { + continue; + } + log::info!("Language server not initialized, delaying request"); pending_messages.push(msg); } else { diff --git a/helix-term/src/application.rs b/helix-term/src/application.rs index d3b65a4f..e21c5504 100644 --- a/helix-term/src/application.rs +++ b/helix-term/src/application.rs @@ -275,6 +275,37 @@ impl Application { }; match notification { + Notification::Initialized => { + let language_server = + match self.editor.language_servers.get_by_id(server_id) { + Some(language_server) => language_server, + None => { + warn!("can't find language server with id `{}`", server_id); + return; + } + }; + + let docs = self.editor.documents().filter(|doc| { + doc.language_server().map(|server| server.id()) == Some(server_id) + }); + + // trigger textDocument/didOpen for docs that are already open + for doc in docs { + // TODO: extract and share with editor.open + let language_id = doc + .language() + .and_then(|s| s.split('.').last()) // source.rust + .map(ToOwned::to_owned) + .unwrap_or_default(); + + tokio::spawn(language_server.text_document_did_open( + doc.url().unwrap(), + doc.version(), + doc.text(), + language_id, + )); + } + } Notification::PublishDiagnostics(params) => { let path = params.uri.to_file_path().unwrap(); let doc = self.editor.document_by_path_mut(&path); diff --git a/helix-view/src/editor.rs b/helix-view/src/editor.rs index c8abd5b5..3d2d4a87 100644 --- a/helix-view/src/editor.rs +++ b/helix-view/src/editor.rs @@ -255,20 +255,21 @@ impl Editor { .and_then(|language| self.language_servers.get(language).ok()); if let Some(language_server) = language_server { - doc.set_language_server(Some(language_server.clone())); - let language_id = doc .language() .and_then(|s| s.split('.').last()) // source.rust .map(ToOwned::to_owned) .unwrap_or_default(); + // TODO: this now races with on_init code if the init happens too quickly tokio::spawn(language_server.text_document_did_open( doc.url().unwrap(), doc.version(), doc.text(), language_id, )); + + doc.set_language_server(Some(language_server)); } let id = self.documents.insert(doc); -- cgit v1.2.3-70-g09d2 From 37606bad47fe0e197cb34fc7d52856597c32ce33 Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Thu, 2 Sep 2021 13:55:55 +0900 Subject: lsp: doc.language_server() is None until initialize completes --- helix-lsp/src/client.rs | 4 ++++ helix-view/src/document.rs | 13 +++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) (limited to 'helix-lsp/src') diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index 27e4697c..f2bb0059 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -93,6 +93,10 @@ impl Client { } } + pub fn is_initialized(&self) -> bool { + self.capabilities.get().is_some() + } + pub fn capabilities(&self) -> &lsp::ServerCapabilities { self.capabilities .get() diff --git a/helix-view/src/document.rs b/helix-view/src/document.rs index 71f6680c..362a433e 100644 --- a/helix-view/src/document.rs +++ b/helix-view/src/document.rs @@ -798,9 +798,18 @@ impl Document { self.version } - #[inline] pub fn language_server(&self) -> Option<&helix_lsp::Client> { - self.language_server.as_deref() + let server = self.language_server.as_deref(); + let initialized = server + .map(|server| server.is_initialized()) + .unwrap_or(false); + + // only resolve language_server if it's initialized + if initialized { + server + } else { + None + } } #[inline] -- cgit v1.2.3-70-g09d2 From be81f40df8c901f506708a2ce4ff10632fa1d64c Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Mon, 6 Sep 2021 11:32:50 +0900 Subject: lsp: This doesn't need to be a mutable reference --- helix-lsp/src/transport.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'helix-lsp/src') diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index cf7e66a8..6e28094d 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -274,7 +274,7 @@ impl Transport { async fn send( transport: Arc, mut server_stdin: BufWriter, - mut client_tx: UnboundedSender<(usize, jsonrpc::Call)>, + client_tx: UnboundedSender<(usize, jsonrpc::Call)>, mut client_rx: UnboundedReceiver, initialize_notify: Arc, ) { @@ -318,7 +318,7 @@ impl Transport { method: lsp_types::notification::Initialized::METHOD.to_string(), params: jsonrpc::Params::None, })); - match transport.process_server_message(&mut client_tx, notification).await { + match transport.process_server_message(&client_tx, notification).await { Ok(_) => {} Err(err) => { error!("err: <- {:?}", err); -- cgit v1.2.3-70-g09d2