aboutsummaryrefslogtreecommitdiff
path: root/helix-lsp
diff options
context:
space:
mode:
Diffstat (limited to 'helix-lsp')
-rw-r--r--helix-lsp/src/client.rs12
-rw-r--r--helix-lsp/src/lib.rs204
-rw-r--r--helix-lsp/src/transport.rs63
3 files changed, 163 insertions, 116 deletions
diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs
index 840e7382..c0f3adb8 100644
--- a/helix-lsp/src/client.rs
+++ b/helix-lsp/src/client.rs
@@ -44,6 +44,7 @@ fn workspace_for_uri(uri: lsp::Url) -> WorkspaceFolder {
#[derive(Debug)]
pub struct Client {
id: usize,
+ name: String,
_process: Child,
server_tx: UnboundedSender<Payload>,
request_counter: AtomicU64,
@@ -166,8 +167,7 @@ impl Client {
tokio::spawn(self.did_change_workspace(vec![workspace_for_uri(root_uri)], Vec::new()));
}
- #[allow(clippy::type_complexity)]
- #[allow(clippy::too_many_arguments)]
+ #[allow(clippy::type_complexity, clippy::too_many_arguments)]
pub fn start(
cmd: &str,
args: &[String],
@@ -176,6 +176,7 @@ impl Client {
root_markers: &[String],
manual_roots: &[PathBuf],
id: usize,
+ name: String,
req_timeout: u64,
doc_path: Option<&std::path::PathBuf>,
) -> Result<(Self, UnboundedReceiver<(usize, Call)>, Arc<Notify>)> {
@@ -200,7 +201,7 @@ impl Client {
let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr"));
let (server_rx, server_tx, initialize_notify) =
- Transport::start(reader, writer, stderr, id);
+ Transport::start(reader, writer, stderr, id, name.clone());
let (workspace, workspace_is_cwd) = find_workspace();
let workspace = path::get_normalized_path(&workspace);
let root = find_lsp_workspace(
@@ -225,6 +226,7 @@ impl Client {
let client = Self {
id,
+ name,
_process: process,
server_tx,
request_counter: AtomicU64::new(0),
@@ -240,6 +242,10 @@ impl Client {
Ok((client, server_rx, initialize_notify))
}
+ pub fn name(&self) -> &String {
+ &self.name
+ }
+
pub fn id(&self) -> usize {
self.id
}
diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs
index 31ee1d75..12e63255 100644
--- a/helix-lsp/src/lib.rs
+++ b/helix-lsp/src/lib.rs
@@ -17,19 +17,16 @@ use helix_core::{
use tokio::sync::mpsc::UnboundedReceiver;
use std::{
- collections::{hash_map::Entry, HashMap},
+ collections::HashMap,
path::{Path, PathBuf},
- sync::{
- atomic::{AtomicUsize, Ordering},
- Arc,
- },
+ sync::Arc,
};
use thiserror::Error;
use tokio_stream::wrappers::UnboundedReceiverStream;
pub type Result<T> = core::result::Result<T, Error>;
-type LanguageId = String;
+type LanguageServerName = String;
#[derive(Error, Debug)]
pub enum Error {
@@ -49,7 +46,7 @@ pub enum Error {
Other(#[from] anyhow::Error),
}
-#[derive(Clone, Copy, Debug, Default)]
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum OffsetEncoding {
/// UTF-8 code units aka bytes
Utf8,
@@ -624,23 +621,18 @@ impl Notification {
#[derive(Debug)]
pub struct Registry {
- inner: HashMap<LanguageId, Vec<(usize, Arc<Client>)>>,
-
- counter: AtomicUsize,
+ inner: HashMap<LanguageServerName, Vec<Arc<Client>>>,
+ syn_loader: Arc<helix_core::syntax::Loader>,
+ counter: usize,
pub incoming: SelectAll<UnboundedReceiverStream<(usize, Call)>>,
}
-impl Default for Registry {
- fn default() -> Self {
- Self::new()
- }
-}
-
impl Registry {
- pub fn new() -> Self {
+ pub fn new(syn_loader: Arc<helix_core::syntax::Loader>) -> Self {
Self {
inner: HashMap::new(),
- counter: AtomicUsize::new(0),
+ syn_loader,
+ counter: 0,
incoming: SelectAll::new(),
}
}
@@ -649,15 +641,43 @@ impl Registry {
self.inner
.values()
.flatten()
- .find(|(client_id, _)| client_id == &id)
- .map(|(_, client)| client.as_ref())
+ .find(|client| client.id() == id)
+ .map(|client| &**client)
}
pub fn remove_by_id(&mut self, id: usize) {
- self.inner.retain(|_, clients| {
- clients.retain(|&(client_id, _)| client_id != id);
- !clients.is_empty()
- })
+ self.inner.retain(|_, language_servers| {
+ language_servers.retain(|ls| id != ls.id());
+ !language_servers.is_empty()
+ });
+ }
+
+ fn start_client(
+ &mut self,
+ name: String,
+ ls_config: &LanguageConfiguration,
+ doc_path: Option<&std::path::PathBuf>,
+ root_dirs: &[PathBuf],
+ enable_snippets: bool,
+ ) -> Result<Arc<Client>> {
+ let config = self
+ .syn_loader
+ .language_server_configs()
+ .get(&name)
+ .ok_or_else(|| anyhow::anyhow!("Language server '{name}' not defined"))?;
+ self.counter += 1;
+ let id = self.counter;
+ let NewClient(client, incoming) = start_client(
+ id,
+ name,
+ ls_config,
+ config,
+ doc_path,
+ root_dirs,
+ enable_snippets,
+ )?;
+ self.incoming.push(UnboundedReceiverStream::new(incoming));
+ Ok(client)
}
pub fn restart(
@@ -666,48 +686,46 @@ impl Registry {
doc_path: Option<&std::path::PathBuf>,
root_dirs: &[PathBuf],
enable_snippets: bool,
- ) -> Result<Option<Arc<Client>>> {
- let config = match &language_config.language_server {
- Some(config) => config,
- None => return Ok(None),
- };
-
- let scope = language_config.scope.clone();
-
- match self.inner.entry(scope) {
- Entry::Vacant(_) => Ok(None),
- Entry::Occupied(mut entry) => {
- // initialize a new client
- let id = self.counter.fetch_add(1, Ordering::Relaxed);
-
- let NewClientResult(client, incoming) = start_client(
- id,
- language_config,
- config,
- doc_path,
- root_dirs,
- enable_snippets,
- )?;
- self.incoming.push(UnboundedReceiverStream::new(incoming));
+ ) -> Result<Vec<Arc<Client>>> {
+ language_config
+ .language_servers
+ .iter()
+ .filter_map(|config| {
+ let name = config.name().clone();
+
+ #[allow(clippy::map_entry)]
+ if self.inner.contains_key(&name) {
+ let client = match self.start_client(
+ name.clone(),
+ language_config,
+ doc_path,
+ root_dirs,
+ enable_snippets,
+ ) {
+ Ok(client) => client,
+ error => return Some(error),
+ };
+ let old_clients = self.inner.insert(name, vec![client.clone()]).unwrap();
- let old_clients = entry.insert(vec![(id, client.clone())]);
+ // TODO what if there are different language servers for different workspaces,
+ // I think the language servers will be stopped without being restarted, which is not intended
+ for old_client in old_clients {
+ tokio::spawn(async move {
+ let _ = old_client.force_shutdown().await;
+ });
+ }
- for (_, old_client) in old_clients {
- tokio::spawn(async move {
- let _ = old_client.force_shutdown().await;
- });
+ Some(Ok(client))
+ } else {
+ None
}
-
- Ok(Some(client))
- }
- }
+ })
+ .collect()
}
- pub fn stop(&mut self, language_config: &LanguageConfiguration) {
- let scope = language_config.scope.clone();
-
- if let Some(clients) = self.inner.remove(&scope) {
- for (_, client) in clients {
+ pub fn stop(&mut self, name: &str) {
+ if let Some(clients) = self.inner.remove(name) {
+ for client in clients {
tokio::spawn(async move {
let _ = client.force_shutdown().await;
});
@@ -721,37 +739,35 @@ impl Registry {
doc_path: Option<&std::path::PathBuf>,
root_dirs: &[PathBuf],
enable_snippets: bool,
- ) -> Result<Option<Arc<Client>>> {
- let config = match &language_config.language_server {
- Some(config) => config,
- None => return Ok(None),
- };
-
- let clients = self.inner.entry(language_config.scope.clone()).or_default();
- // check if we already have a client for this documents root that we can reuse
- if let Some((_, client)) = clients.iter_mut().enumerate().find(|(i, (_, client))| {
- client.try_add_doc(&language_config.roots, root_dirs, doc_path, *i == 0)
- }) {
- return Ok(Some(client.1.clone()));
- }
- // initialize a new client
- let id = self.counter.fetch_add(1, Ordering::Relaxed);
-
- let NewClientResult(client, incoming) = start_client(
- id,
- language_config,
- config,
- doc_path,
- root_dirs,
- enable_snippets,
- )?;
- clients.push((id, client.clone()));
- self.incoming.push(UnboundedReceiverStream::new(incoming));
- Ok(Some(client))
+ ) -> Result<Vec<Arc<Client>>> {
+ language_config
+ .language_servers
+ .iter()
+ .map(|features| {
+ let name = features.name();
+ if let Some(clients) = self.inner.get_mut(name) {
+ if let Some((_, client)) = clients.iter_mut().enumerate().find(|(i, client)| {
+ client.try_add_doc(&language_config.roots, root_dirs, doc_path, *i == 0)
+ }) {
+ return Ok(client.clone());
+ }
+ }
+ let client = self.start_client(
+ name.clone(),
+ language_config,
+ doc_path,
+ root_dirs,
+ enable_snippets,
+ )?;
+ let clients = self.inner.entry(features.name().clone()).or_default();
+ clients.push(client.clone());
+ Ok(client)
+ })
+ .collect()
}
pub fn iter_clients(&self) -> impl Iterator<Item = &Arc<Client>> {
- self.inner.values().flatten().map(|(_, client)| client)
+ self.inner.values().flatten()
}
}
@@ -833,26 +849,28 @@ impl LspProgressMap {
}
}
-struct NewClientResult(Arc<Client>, UnboundedReceiver<(usize, Call)>);
+struct NewClient(Arc<Client>, UnboundedReceiver<(usize, Call)>);
/// start_client takes both a LanguageConfiguration and a LanguageServerConfiguration to ensure that
/// it is only called when it makes sense.
fn start_client(
id: usize,
+ name: String,
config: &LanguageConfiguration,
ls_config: &LanguageServerConfiguration,
doc_path: Option<&std::path::PathBuf>,
root_dirs: &[PathBuf],
enable_snippets: bool,
-) -> Result<NewClientResult> {
+) -> Result<NewClient> {
let (client, incoming, initialize_notify) = Client::start(
&ls_config.command,
&ls_config.args,
- config.config.clone(),
+ ls_config.config.clone(),
ls_config.environment.clone(),
&config.roots,
config.workspace_lsp_roots.as_deref().unwrap_or(root_dirs),
id,
+ name,
ls_config.timeout,
doc_path,
)?;
@@ -886,7 +904,7 @@ fn start_client(
initialize_notify.notify_one();
});
- Ok(NewClientResult(client, incoming))
+ Ok(NewClient(client, incoming))
}
/// Find an LSP workspace of a file using the following mechanism:
diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs
index 3e3e06ee..8c38c177 100644
--- a/helix-lsp/src/transport.rs
+++ b/helix-lsp/src/transport.rs
@@ -38,6 +38,7 @@ enum ServerMessage {
#[derive(Debug)]
pub struct Transport {
id: usize,
+ name: String,
pending_requests: Mutex<HashMap<jsonrpc::Id, Sender<Result<Value>>>>,
}
@@ -47,6 +48,7 @@ impl Transport {
server_stdin: BufWriter<ChildStdin>,
server_stderr: BufReader<ChildStderr>,
id: usize,
+ name: String,
) -> (
UnboundedReceiver<(usize, jsonrpc::Call)>,
UnboundedSender<Payload>,
@@ -58,6 +60,7 @@ impl Transport {
let transport = Self {
id,
+ name,
pending_requests: Mutex::new(HashMap::default()),
};
@@ -83,6 +86,7 @@ impl Transport {
async fn recv_server_message(
reader: &mut (impl AsyncBufRead + Unpin + Send),
buffer: &mut String,
+ language_server_name: &str,
) -> Result<ServerMessage> {
let mut content_length = None;
loop {
@@ -124,7 +128,7 @@ impl Transport {
reader.read_exact(&mut content).await?;
let msg = std::str::from_utf8(&content).context("invalid utf8 from server")?;
- info!("<- {}", msg);
+ info!("{language_server_name} <- {msg}");
// try parsing as output (server response) or call (server request)
let output: serde_json::Result<ServerMessage> = serde_json::from_str(msg);
@@ -135,12 +139,13 @@ impl Transport {
async fn recv_server_error(
err: &mut (impl AsyncBufRead + Unpin + Send),
buffer: &mut String,
+ language_server_name: &str,
) -> Result<()> {
buffer.truncate(0);
if err.read_line(buffer).await? == 0 {
return Err(Error::StreamClosed);
};
- error!("err <- {:?}", buffer);
+ error!("{language_server_name} err <- {buffer:?}");
Ok(())
}
@@ -162,15 +167,17 @@ impl Transport {
Payload::Notification(value) => serde_json::to_string(&value)?,
Payload::Response(error) => serde_json::to_string(&error)?,
};
- self.send_string_to_server(server_stdin, json).await
+ self.send_string_to_server(server_stdin, json, &self.name)
+ .await
}
async fn send_string_to_server(
&self,
server_stdin: &mut BufWriter<ChildStdin>,
request: String,
+ language_server_name: &str,
) -> Result<()> {
- info!("-> {}", request);
+ info!("{language_server_name} -> {request}");
// send the headers
server_stdin
@@ -189,9 +196,13 @@ impl Transport {
&self,
client_tx: &UnboundedSender<(usize, jsonrpc::Call)>,
msg: ServerMessage,
+ language_server_name: &str,
) -> Result<()> {
match msg {
- ServerMessage::Output(output) => self.process_request_response(output).await?,
+ ServerMessage::Output(output) => {
+ self.process_request_response(output, language_server_name)
+ .await?
+ }
ServerMessage::Call(call) => {
client_tx
.send((self.id, call))
@@ -202,14 +213,18 @@ impl Transport {
Ok(())
}
- async fn process_request_response(&self, output: jsonrpc::Output) -> Result<()> {
+ async fn process_request_response(
+ &self,
+ output: jsonrpc::Output,
+ language_server_name: &str,
+ ) -> Result<()> {
let (id, result) = match output {
jsonrpc::Output::Success(jsonrpc::Success { id, result, .. }) => {
- info!("<- {}", result);
+ info!("{language_server_name} <- {}", result);
(id, Ok(result))
}
jsonrpc::Output::Failure(jsonrpc::Failure { id, error, .. }) => {
- error!("<- {}", error);
+ error!("{language_server_name} <- {error}");
(id, Err(error.into()))
}
};
@@ -240,12 +255,17 @@ impl Transport {
) {
let mut recv_buffer = String::new();
loop {
- match Self::recv_server_message(&mut server_stdout, &mut recv_buffer).await {
+ match Self::recv_server_message(&mut server_stdout, &mut recv_buffer, &transport.name)
+ .await
+ {
Ok(msg) => {
- match transport.process_server_message(&client_tx, msg).await {
+ match transport
+ .process_server_message(&client_tx, msg, &transport.name)
+ .await
+ {
Ok(_) => {}
Err(err) => {
- error!("err: <- {:?}", err);
+ error!("{} err: <- {err:?}", transport.name);
break;
}
};
@@ -270,7 +290,7 @@ impl Transport {
params: jsonrpc::Params::None,
}));
match transport
- .process_server_message(&client_tx, notification)
+ .process_server_message(&client_tx, notification, &transport.name)
.await
{
Ok(_) => {}
@@ -281,20 +301,22 @@ impl Transport {
break;
}
Err(err) => {
- error!("err: <- {:?}", err);
+ error!("{} err: <- {err:?}", transport.name);
break;
}
}
}
}
- async fn err(_transport: Arc<Self>, mut server_stderr: BufReader<ChildStderr>) {
+ async fn err(transport: Arc<Self>, mut server_stderr: BufReader<ChildStderr>) {
let mut recv_buffer = String::new();
loop {
- match Self::recv_server_error(&mut server_stderr, &mut recv_buffer).await {
+ match Self::recv_server_error(&mut server_stderr, &mut recv_buffer, &transport.name)
+ .await
+ {
Ok(_) => {}
Err(err) => {
- error!("err: <- {:?}", err);
+ error!("{} err: <- {err:?}", transport.name);
break;
}
}
@@ -348,10 +370,11 @@ impl Transport {
method: lsp_types::notification::Initialized::METHOD.to_string(),
params: jsonrpc::Params::None,
}));
- match transport.process_server_message(&client_tx, notification).await {
+ let language_server_name = &transport.name;
+ match transport.process_server_message(&client_tx, notification, language_server_name).await {
Ok(_) => {}
Err(err) => {
- error!("err: <- {:?}", err);
+ error!("{language_server_name} err: <- {err:?}");
}
}
@@ -361,7 +384,7 @@ impl Transport {
match transport.send_payload_to_server(&mut server_stdin, msg).await {
Ok(_) => {}
Err(err) => {
- error!("err: <- {:?}", err);
+ error!("{language_server_name} err: <- {err:?}");
}
}
}
@@ -380,7 +403,7 @@ impl Transport {
match transport.send_payload_to_server(&mut server_stdin, msg).await {
Ok(_) => {}
Err(err) => {
- error!("err: <- {:?}", err);
+ error!("{} err: <- {err:?}", transport.name);
}
}
}