diff options
Diffstat (limited to 'helix-lsp/src')
-rw-r--r-- | helix-lsp/src/client.rs | 12 | ||||
-rw-r--r-- | helix-lsp/src/lib.rs | 204 | ||||
-rw-r--r-- | helix-lsp/src/transport.rs | 63 |
3 files changed, 163 insertions, 116 deletions
diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index 840e7382..c0f3adb8 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -44,6 +44,7 @@ fn workspace_for_uri(uri: lsp::Url) -> WorkspaceFolder { #[derive(Debug)] pub struct Client { id: usize, + name: String, _process: Child, server_tx: UnboundedSender<Payload>, request_counter: AtomicU64, @@ -166,8 +167,7 @@ impl Client { tokio::spawn(self.did_change_workspace(vec![workspace_for_uri(root_uri)], Vec::new())); } - #[allow(clippy::type_complexity)] - #[allow(clippy::too_many_arguments)] + #[allow(clippy::type_complexity, clippy::too_many_arguments)] pub fn start( cmd: &str, args: &[String], @@ -176,6 +176,7 @@ impl Client { root_markers: &[String], manual_roots: &[PathBuf], id: usize, + name: String, req_timeout: u64, doc_path: Option<&std::path::PathBuf>, ) -> Result<(Self, UnboundedReceiver<(usize, Call)>, Arc<Notify>)> { @@ -200,7 +201,7 @@ impl Client { let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr")); let (server_rx, server_tx, initialize_notify) = - Transport::start(reader, writer, stderr, id); + Transport::start(reader, writer, stderr, id, name.clone()); let (workspace, workspace_is_cwd) = find_workspace(); let workspace = path::get_normalized_path(&workspace); let root = find_lsp_workspace( @@ -225,6 +226,7 @@ impl Client { let client = Self { id, + name, _process: process, server_tx, request_counter: AtomicU64::new(0), @@ -240,6 +242,10 @@ impl Client { Ok((client, server_rx, initialize_notify)) } + pub fn name(&self) -> &String { + &self.name + } + pub fn id(&self) -> usize { self.id } diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index 31ee1d75..12e63255 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -17,19 +17,16 @@ use helix_core::{ use tokio::sync::mpsc::UnboundedReceiver; use std::{ - collections::{hash_map::Entry, HashMap}, + collections::HashMap, path::{Path, PathBuf}, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, + sync::Arc, }; use thiserror::Error; use tokio_stream::wrappers::UnboundedReceiverStream; pub type Result<T> = core::result::Result<T, Error>; -type LanguageId = String; +type LanguageServerName = String; #[derive(Error, Debug)] pub enum Error { @@ -49,7 +46,7 @@ pub enum Error { Other(#[from] anyhow::Error), } -#[derive(Clone, Copy, Debug, Default)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub enum OffsetEncoding { /// UTF-8 code units aka bytes Utf8, @@ -624,23 +621,18 @@ impl Notification { #[derive(Debug)] pub struct Registry { - inner: HashMap<LanguageId, Vec<(usize, Arc<Client>)>>, - - counter: AtomicUsize, + inner: HashMap<LanguageServerName, Vec<Arc<Client>>>, + syn_loader: Arc<helix_core::syntax::Loader>, + counter: usize, pub incoming: SelectAll<UnboundedReceiverStream<(usize, Call)>>, } -impl Default for Registry { - fn default() -> Self { - Self::new() - } -} - impl Registry { - pub fn new() -> Self { + pub fn new(syn_loader: Arc<helix_core::syntax::Loader>) -> Self { Self { inner: HashMap::new(), - counter: AtomicUsize::new(0), + syn_loader, + counter: 0, incoming: SelectAll::new(), } } @@ -649,15 +641,43 @@ impl Registry { self.inner .values() .flatten() - .find(|(client_id, _)| client_id == &id) - .map(|(_, client)| client.as_ref()) + .find(|client| client.id() == id) + .map(|client| &**client) } pub fn remove_by_id(&mut self, id: usize) { - self.inner.retain(|_, clients| { - clients.retain(|&(client_id, _)| client_id != id); - !clients.is_empty() - }) + self.inner.retain(|_, language_servers| { + language_servers.retain(|ls| id != ls.id()); + !language_servers.is_empty() + }); + } + + fn start_client( + &mut self, + name: String, + ls_config: &LanguageConfiguration, + doc_path: Option<&std::path::PathBuf>, + root_dirs: &[PathBuf], + enable_snippets: bool, + ) -> Result<Arc<Client>> { + let config = self + .syn_loader + .language_server_configs() + .get(&name) + .ok_or_else(|| anyhow::anyhow!("Language server '{name}' not defined"))?; + self.counter += 1; + let id = self.counter; + let NewClient(client, incoming) = start_client( + id, + name, + ls_config, + config, + doc_path, + root_dirs, + enable_snippets, + )?; + self.incoming.push(UnboundedReceiverStream::new(incoming)); + Ok(client) } pub fn restart( @@ -666,48 +686,46 @@ impl Registry { doc_path: Option<&std::path::PathBuf>, root_dirs: &[PathBuf], enable_snippets: bool, - ) -> Result<Option<Arc<Client>>> { - let config = match &language_config.language_server { - Some(config) => config, - None => return Ok(None), - }; - - let scope = language_config.scope.clone(); - - match self.inner.entry(scope) { - Entry::Vacant(_) => Ok(None), - Entry::Occupied(mut entry) => { - // initialize a new client - let id = self.counter.fetch_add(1, Ordering::Relaxed); - - let NewClientResult(client, incoming) = start_client( - id, - language_config, - config, - doc_path, - root_dirs, - enable_snippets, - )?; - self.incoming.push(UnboundedReceiverStream::new(incoming)); + ) -> Result<Vec<Arc<Client>>> { + language_config + .language_servers + .iter() + .filter_map(|config| { + let name = config.name().clone(); + + #[allow(clippy::map_entry)] + if self.inner.contains_key(&name) { + let client = match self.start_client( + name.clone(), + language_config, + doc_path, + root_dirs, + enable_snippets, + ) { + Ok(client) => client, + error => return Some(error), + }; + let old_clients = self.inner.insert(name, vec![client.clone()]).unwrap(); - let old_clients = entry.insert(vec![(id, client.clone())]); + // TODO what if there are different language servers for different workspaces, + // I think the language servers will be stopped without being restarted, which is not intended + for old_client in old_clients { + tokio::spawn(async move { + let _ = old_client.force_shutdown().await; + }); + } - for (_, old_client) in old_clients { - tokio::spawn(async move { - let _ = old_client.force_shutdown().await; - }); + Some(Ok(client)) + } else { + None } - - Ok(Some(client)) - } - } + }) + .collect() } - pub fn stop(&mut self, language_config: &LanguageConfiguration) { - let scope = language_config.scope.clone(); - - if let Some(clients) = self.inner.remove(&scope) { - for (_, client) in clients { + pub fn stop(&mut self, name: &str) { + if let Some(clients) = self.inner.remove(name) { + for client in clients { tokio::spawn(async move { let _ = client.force_shutdown().await; }); @@ -721,37 +739,35 @@ impl Registry { doc_path: Option<&std::path::PathBuf>, root_dirs: &[PathBuf], enable_snippets: bool, - ) -> Result<Option<Arc<Client>>> { - let config = match &language_config.language_server { - Some(config) => config, - None => return Ok(None), - }; - - let clients = self.inner.entry(language_config.scope.clone()).or_default(); - // check if we already have a client for this documents root that we can reuse - if let Some((_, client)) = clients.iter_mut().enumerate().find(|(i, (_, client))| { - client.try_add_doc(&language_config.roots, root_dirs, doc_path, *i == 0) - }) { - return Ok(Some(client.1.clone())); - } - // initialize a new client - let id = self.counter.fetch_add(1, Ordering::Relaxed); - - let NewClientResult(client, incoming) = start_client( - id, - language_config, - config, - doc_path, - root_dirs, - enable_snippets, - )?; - clients.push((id, client.clone())); - self.incoming.push(UnboundedReceiverStream::new(incoming)); - Ok(Some(client)) + ) -> Result<Vec<Arc<Client>>> { + language_config + .language_servers + .iter() + .map(|features| { + let name = features.name(); + if let Some(clients) = self.inner.get_mut(name) { + if let Some((_, client)) = clients.iter_mut().enumerate().find(|(i, client)| { + client.try_add_doc(&language_config.roots, root_dirs, doc_path, *i == 0) + }) { + return Ok(client.clone()); + } + } + let client = self.start_client( + name.clone(), + language_config, + doc_path, + root_dirs, + enable_snippets, + )?; + let clients = self.inner.entry(features.name().clone()).or_default(); + clients.push(client.clone()); + Ok(client) + }) + .collect() } pub fn iter_clients(&self) -> impl Iterator<Item = &Arc<Client>> { - self.inner.values().flatten().map(|(_, client)| client) + self.inner.values().flatten() } } @@ -833,26 +849,28 @@ impl LspProgressMap { } } -struct NewClientResult(Arc<Client>, UnboundedReceiver<(usize, Call)>); +struct NewClient(Arc<Client>, UnboundedReceiver<(usize, Call)>); /// start_client takes both a LanguageConfiguration and a LanguageServerConfiguration to ensure that /// it is only called when it makes sense. fn start_client( id: usize, + name: String, config: &LanguageConfiguration, ls_config: &LanguageServerConfiguration, doc_path: Option<&std::path::PathBuf>, root_dirs: &[PathBuf], enable_snippets: bool, -) -> Result<NewClientResult> { +) -> Result<NewClient> { let (client, incoming, initialize_notify) = Client::start( &ls_config.command, &ls_config.args, - config.config.clone(), + ls_config.config.clone(), ls_config.environment.clone(), &config.roots, config.workspace_lsp_roots.as_deref().unwrap_or(root_dirs), id, + name, ls_config.timeout, doc_path, )?; @@ -886,7 +904,7 @@ fn start_client( initialize_notify.notify_one(); }); - Ok(NewClientResult(client, incoming)) + Ok(NewClient(client, incoming)) } /// Find an LSP workspace of a file using the following mechanism: diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index 3e3e06ee..8c38c177 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -38,6 +38,7 @@ enum ServerMessage { #[derive(Debug)] pub struct Transport { id: usize, + name: String, pending_requests: Mutex<HashMap<jsonrpc::Id, Sender<Result<Value>>>>, } @@ -47,6 +48,7 @@ impl Transport { server_stdin: BufWriter<ChildStdin>, server_stderr: BufReader<ChildStderr>, id: usize, + name: String, ) -> ( UnboundedReceiver<(usize, jsonrpc::Call)>, UnboundedSender<Payload>, @@ -58,6 +60,7 @@ impl Transport { let transport = Self { id, + name, pending_requests: Mutex::new(HashMap::default()), }; @@ -83,6 +86,7 @@ impl Transport { async fn recv_server_message( reader: &mut (impl AsyncBufRead + Unpin + Send), buffer: &mut String, + language_server_name: &str, ) -> Result<ServerMessage> { let mut content_length = None; loop { @@ -124,7 +128,7 @@ impl Transport { reader.read_exact(&mut content).await?; let msg = std::str::from_utf8(&content).context("invalid utf8 from server")?; - info!("<- {}", msg); + info!("{language_server_name} <- {msg}"); // try parsing as output (server response) or call (server request) let output: serde_json::Result<ServerMessage> = serde_json::from_str(msg); @@ -135,12 +139,13 @@ impl Transport { async fn recv_server_error( err: &mut (impl AsyncBufRead + Unpin + Send), buffer: &mut String, + language_server_name: &str, ) -> Result<()> { buffer.truncate(0); if err.read_line(buffer).await? == 0 { return Err(Error::StreamClosed); }; - error!("err <- {:?}", buffer); + error!("{language_server_name} err <- {buffer:?}"); Ok(()) } @@ -162,15 +167,17 @@ impl Transport { Payload::Notification(value) => serde_json::to_string(&value)?, Payload::Response(error) => serde_json::to_string(&error)?, }; - self.send_string_to_server(server_stdin, json).await + self.send_string_to_server(server_stdin, json, &self.name) + .await } async fn send_string_to_server( &self, server_stdin: &mut BufWriter<ChildStdin>, request: String, + language_server_name: &str, ) -> Result<()> { - info!("-> {}", request); + info!("{language_server_name} -> {request}"); // send the headers server_stdin @@ -189,9 +196,13 @@ impl Transport { &self, client_tx: &UnboundedSender<(usize, jsonrpc::Call)>, msg: ServerMessage, + language_server_name: &str, ) -> Result<()> { match msg { - ServerMessage::Output(output) => self.process_request_response(output).await?, + ServerMessage::Output(output) => { + self.process_request_response(output, language_server_name) + .await? + } ServerMessage::Call(call) => { client_tx .send((self.id, call)) @@ -202,14 +213,18 @@ impl Transport { Ok(()) } - async fn process_request_response(&self, output: jsonrpc::Output) -> Result<()> { + async fn process_request_response( + &self, + output: jsonrpc::Output, + language_server_name: &str, + ) -> Result<()> { let (id, result) = match output { jsonrpc::Output::Success(jsonrpc::Success { id, result, .. }) => { - info!("<- {}", result); + info!("{language_server_name} <- {}", result); (id, Ok(result)) } jsonrpc::Output::Failure(jsonrpc::Failure { id, error, .. }) => { - error!("<- {}", error); + error!("{language_server_name} <- {error}"); (id, Err(error.into())) } }; @@ -240,12 +255,17 @@ impl Transport { ) { let mut recv_buffer = String::new(); loop { - match Self::recv_server_message(&mut server_stdout, &mut recv_buffer).await { + match Self::recv_server_message(&mut server_stdout, &mut recv_buffer, &transport.name) + .await + { Ok(msg) => { - match transport.process_server_message(&client_tx, msg).await { + match transport + .process_server_message(&client_tx, msg, &transport.name) + .await + { Ok(_) => {} Err(err) => { - error!("err: <- {:?}", err); + error!("{} err: <- {err:?}", transport.name); break; } }; @@ -270,7 +290,7 @@ impl Transport { params: jsonrpc::Params::None, })); match transport - .process_server_message(&client_tx, notification) + .process_server_message(&client_tx, notification, &transport.name) .await { Ok(_) => {} @@ -281,20 +301,22 @@ impl Transport { break; } Err(err) => { - error!("err: <- {:?}", err); + error!("{} err: <- {err:?}", transport.name); break; } } } } - async fn err(_transport: Arc<Self>, mut server_stderr: BufReader<ChildStderr>) { + async fn err(transport: Arc<Self>, mut server_stderr: BufReader<ChildStderr>) { let mut recv_buffer = String::new(); loop { - match Self::recv_server_error(&mut server_stderr, &mut recv_buffer).await { + match Self::recv_server_error(&mut server_stderr, &mut recv_buffer, &transport.name) + .await + { Ok(_) => {} Err(err) => { - error!("err: <- {:?}", err); + error!("{} err: <- {err:?}", transport.name); break; } } @@ -348,10 +370,11 @@ impl Transport { method: lsp_types::notification::Initialized::METHOD.to_string(), params: jsonrpc::Params::None, })); - match transport.process_server_message(&client_tx, notification).await { + let language_server_name = &transport.name; + match transport.process_server_message(&client_tx, notification, language_server_name).await { Ok(_) => {} Err(err) => { - error!("err: <- {:?}", err); + error!("{language_server_name} err: <- {err:?}"); } } @@ -361,7 +384,7 @@ impl Transport { match transport.send_payload_to_server(&mut server_stdin, msg).await { Ok(_) => {} Err(err) => { - error!("err: <- {:?}", err); + error!("{language_server_name} err: <- {err:?}"); } } } @@ -380,7 +403,7 @@ impl Transport { match transport.send_payload_to_server(&mut server_stdin, msg).await { Ok(_) => {} Err(err) => { - error!("err: <- {:?}", err); + error!("{} err: <- {err:?}", transport.name); } } } |