diff options
author | Blaž Hrastnik | 2021-05-06 04:56:34 +0000 |
---|---|---|
committer | Blaž Hrastnik | 2021-05-06 04:56:34 +0000 |
commit | 355ad3cb8289611b06cd42fa62ddfe0a5c716e83 (patch) | |
tree | 7c94da6e122a9ecf542103b46a3ca9e80654a52e /helix-lsp/src/client.rs | |
parent | 0e5308bce1a6e7d7d00854ae50902546cea9578d (diff) |
Tokio migration.
Diffstat (limited to 'helix-lsp/src/client.rs')
-rw-r--r-- | helix-lsp/src/client.rs | 79 |
1 files changed, 42 insertions, 37 deletions
diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index c70e6e78..c3de4fd7 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -13,18 +13,18 @@ use jsonrpc_core as jsonrpc; use lsp_types as lsp; use serde_json::Value; -use smol::{ - channel::{Receiver, Sender}, +use std::process::Stdio; +use tokio::{ io::{BufReader, BufWriter}, // prelude::*, - process::{Child, Command, Stdio}, - Executor, + process::{Child, Command}, + sync::mpsc::{channel, UnboundedReceiver, UnboundedSender}, }; pub struct Client { _process: Child, - outgoing: Sender<Payload>, + outgoing: UnboundedSender<Payload>, // pub incoming: Receiver<Call>, pub request_counter: AtomicU64, @@ -33,13 +33,14 @@ pub struct Client { } impl Client { - pub fn start(ex: &Executor, cmd: &str, args: &[String]) -> Result<(Self, Receiver<Call>)> { - // smol makes sure the process is reaped on drop, but using kill_on_drop(true) maybe? + pub fn start(cmd: &str, args: &[String]) -> Result<(Self, UnboundedReceiver<Call>)> { let process = Command::new(cmd) .args(args) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) + // make sure the process is reaped on drop + .kill_on_drop(true) .spawn(); // use std::io::ErrorKind; @@ -58,7 +59,7 @@ impl Client { let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr")); - let (incoming, outgoing) = Transport::start(ex, reader, writer, stderr); + let (incoming, outgoing) = Transport::start(reader, writer, stderr); let client = Client { _process: process, @@ -134,49 +135,53 @@ impl Client { params: Self::value_into_params(params), }; - let (tx, rx) = smol::channel::bounded::<Result<Value>>(1); + let (tx, mut rx) = channel::<Result<Value>>(1); self.outgoing .send(Payload::Request { chan: tx, value: request, }) - .await .map_err(|e| Error::Other(e.into()))?; - use smol_timeout::TimeoutExt; use std::time::Duration; + use tokio::time::timeout; let future = async move { - rx.recv() - .timeout(Duration::from_secs(2)) + timeout(Duration::from_secs(2), rx.recv()) .await - .ok_or(Error::Timeout)? // return Timeout - .map_err(|e| Error::Other(e.into()))? + .map_err(|e| Error::Timeout)? // return Timeout + .unwrap() // TODO: None if channel closed }; Ok(future) } /// Send a RPC notification to the language server. - pub async fn notify<R: lsp::notification::Notification>(&self, params: R::Params) -> Result<()> + pub fn notify<R: lsp::notification::Notification>( + &self, + params: R::Params, + ) -> impl Future<Output = Result<()>> where R::Params: serde::Serialize, { - let params = serde_json::to_value(params)?; + let outgoing = self.outgoing.clone(); - let notification = jsonrpc::Notification { - jsonrpc: Some(jsonrpc::Version::V2), - method: R::METHOD.to_string(), - params: Self::value_into_params(params), - }; + async move { + let params = serde_json::to_value(params)?; - self.outgoing - .send(Payload::Notification(notification)) - .await - .map_err(|e| Error::Other(e.into()))?; + let notification = jsonrpc::Notification { + jsonrpc: Some(jsonrpc::Version::V2), + method: R::METHOD.to_string(), + params: Self::value_into_params(params), + }; - Ok(()) + outgoing + .send(Payload::Notification(notification)) + .map_err(|e| Error::Other(e.into()))?; + + Ok(()) + } } /// Reply to a language server RPC call. @@ -202,7 +207,6 @@ impl Client { self.outgoing .send(Payload::Response(output)) - .await .map_err(|e| Error::Other(e.into()))?; Ok(()) @@ -387,13 +391,13 @@ impl Client { changes } - pub async fn text_document_did_change( + pub fn text_document_did_change( &self, text_document: lsp::VersionedTextDocumentIdentifier, old_text: &Rope, new_text: &Rope, changes: &ChangeSet, - ) -> Result<()> { + ) -> Option<impl Future<Output = Result<()>>> { // figure out what kind of sync the server supports let capabilities = self.capabilities.as_ref().unwrap(); @@ -405,7 +409,7 @@ impl Client { .. })) => kind, // None | SyncOptions { changes: None } - _ => return Ok(()), + _ => return None, }; let changes = match sync_capabilities { @@ -420,14 +424,15 @@ impl Client { lsp::TextDocumentSyncKind::Incremental => { Self::changeset_to_changes(old_text, new_text, changes, self.offset_encoding) } - lsp::TextDocumentSyncKind::None => return Ok(()), + lsp::TextDocumentSyncKind::None => return None, }; - self.notify::<lsp::notification::DidChangeTextDocument>(lsp::DidChangeTextDocumentParams { - text_document, - content_changes: changes, - }) - .await + Some(self.notify::<lsp::notification::DidChangeTextDocument>( + lsp::DidChangeTextDocumentParams { + text_document, + content_changes: changes, + }, + )) } pub async fn text_document_did_close( |