aboutsummaryrefslogtreecommitdiff
path: root/helix-lsp/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'helix-lsp/src/client.rs')
-rw-r--r--helix-lsp/src/client.rs79
1 files changed, 42 insertions, 37 deletions
diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs
index c70e6e78..c3de4fd7 100644
--- a/helix-lsp/src/client.rs
+++ b/helix-lsp/src/client.rs
@@ -13,18 +13,18 @@ use jsonrpc_core as jsonrpc;
use lsp_types as lsp;
use serde_json::Value;
-use smol::{
- channel::{Receiver, Sender},
+use std::process::Stdio;
+use tokio::{
io::{BufReader, BufWriter},
// prelude::*,
- process::{Child, Command, Stdio},
- Executor,
+ process::{Child, Command},
+ sync::mpsc::{channel, UnboundedReceiver, UnboundedSender},
};
pub struct Client {
_process: Child,
- outgoing: Sender<Payload>,
+ outgoing: UnboundedSender<Payload>,
// pub incoming: Receiver<Call>,
pub request_counter: AtomicU64,
@@ -33,13 +33,14 @@ pub struct Client {
}
impl Client {
- pub fn start(ex: &Executor, cmd: &str, args: &[String]) -> Result<(Self, Receiver<Call>)> {
- // smol makes sure the process is reaped on drop, but using kill_on_drop(true) maybe?
+ pub fn start(cmd: &str, args: &[String]) -> Result<(Self, UnboundedReceiver<Call>)> {
let process = Command::new(cmd)
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
+ // make sure the process is reaped on drop
+ .kill_on_drop(true)
.spawn();
// use std::io::ErrorKind;
@@ -58,7 +59,7 @@ impl Client {
let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout"));
let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr"));
- let (incoming, outgoing) = Transport::start(ex, reader, writer, stderr);
+ let (incoming, outgoing) = Transport::start(reader, writer, stderr);
let client = Client {
_process: process,
@@ -134,49 +135,53 @@ impl Client {
params: Self::value_into_params(params),
};
- let (tx, rx) = smol::channel::bounded::<Result<Value>>(1);
+ let (tx, mut rx) = channel::<Result<Value>>(1);
self.outgoing
.send(Payload::Request {
chan: tx,
value: request,
})
- .await
.map_err(|e| Error::Other(e.into()))?;
- use smol_timeout::TimeoutExt;
use std::time::Duration;
+ use tokio::time::timeout;
let future = async move {
- rx.recv()
- .timeout(Duration::from_secs(2))
+ timeout(Duration::from_secs(2), rx.recv())
.await
- .ok_or(Error::Timeout)? // return Timeout
- .map_err(|e| Error::Other(e.into()))?
+ .map_err(|e| Error::Timeout)? // return Timeout
+ .unwrap() // TODO: None if channel closed
};
Ok(future)
}
/// Send a RPC notification to the language server.
- pub async fn notify<R: lsp::notification::Notification>(&self, params: R::Params) -> Result<()>
+ pub fn notify<R: lsp::notification::Notification>(
+ &self,
+ params: R::Params,
+ ) -> impl Future<Output = Result<()>>
where
R::Params: serde::Serialize,
{
- let params = serde_json::to_value(params)?;
+ let outgoing = self.outgoing.clone();
- let notification = jsonrpc::Notification {
- jsonrpc: Some(jsonrpc::Version::V2),
- method: R::METHOD.to_string(),
- params: Self::value_into_params(params),
- };
+ async move {
+ let params = serde_json::to_value(params)?;
- self.outgoing
- .send(Payload::Notification(notification))
- .await
- .map_err(|e| Error::Other(e.into()))?;
+ let notification = jsonrpc::Notification {
+ jsonrpc: Some(jsonrpc::Version::V2),
+ method: R::METHOD.to_string(),
+ params: Self::value_into_params(params),
+ };
- Ok(())
+ outgoing
+ .send(Payload::Notification(notification))
+ .map_err(|e| Error::Other(e.into()))?;
+
+ Ok(())
+ }
}
/// Reply to a language server RPC call.
@@ -202,7 +207,6 @@ impl Client {
self.outgoing
.send(Payload::Response(output))
- .await
.map_err(|e| Error::Other(e.into()))?;
Ok(())
@@ -387,13 +391,13 @@ impl Client {
changes
}
- pub async fn text_document_did_change(
+ pub fn text_document_did_change(
&self,
text_document: lsp::VersionedTextDocumentIdentifier,
old_text: &Rope,
new_text: &Rope,
changes: &ChangeSet,
- ) -> Result<()> {
+ ) -> Option<impl Future<Output = Result<()>>> {
// figure out what kind of sync the server supports
let capabilities = self.capabilities.as_ref().unwrap();
@@ -405,7 +409,7 @@ impl Client {
..
})) => kind,
// None | SyncOptions { changes: None }
- _ => return Ok(()),
+ _ => return None,
};
let changes = match sync_capabilities {
@@ -420,14 +424,15 @@ impl Client {
lsp::TextDocumentSyncKind::Incremental => {
Self::changeset_to_changes(old_text, new_text, changes, self.offset_encoding)
}
- lsp::TextDocumentSyncKind::None => return Ok(()),
+ lsp::TextDocumentSyncKind::None => return None,
};
- self.notify::<lsp::notification::DidChangeTextDocument>(lsp::DidChangeTextDocumentParams {
- text_document,
- content_changes: changes,
- })
- .await
+ Some(self.notify::<lsp::notification::DidChangeTextDocument>(
+ lsp::DidChangeTextDocumentParams {
+ text_document,
+ content_changes: changes,
+ },
+ ))
}
pub async fn text_document_did_close(