diff options
author | Blaž Hrastnik | 2021-05-06 04:56:34 +0000 |
---|---|---|
committer | Blaž Hrastnik | 2021-05-06 04:56:34 +0000 |
commit | 355ad3cb8289611b06cd42fa62ddfe0a5c716e83 (patch) | |
tree | 7c94da6e122a9ecf542103b46a3ca9e80654a52e /helix-lsp/src | |
parent | 0e5308bce1a6e7d7d00854ae50902546cea9578d (diff) |
Tokio migration.
Diffstat (limited to 'helix-lsp/src')
-rw-r--r-- | helix-lsp/src/client.rs | 79 | ||||
-rw-r--r-- | helix-lsp/src/lib.rs | 18 | ||||
-rw-r--r-- | helix-lsp/src/select_all.rs | 2 | ||||
-rw-r--r-- | helix-lsp/src/transport.rs | 34 |
4 files changed, 66 insertions, 67 deletions
diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index c70e6e78..c3de4fd7 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -13,18 +13,18 @@ use jsonrpc_core as jsonrpc; use lsp_types as lsp; use serde_json::Value; -use smol::{ - channel::{Receiver, Sender}, +use std::process::Stdio; +use tokio::{ io::{BufReader, BufWriter}, // prelude::*, - process::{Child, Command, Stdio}, - Executor, + process::{Child, Command}, + sync::mpsc::{channel, UnboundedReceiver, UnboundedSender}, }; pub struct Client { _process: Child, - outgoing: Sender<Payload>, + outgoing: UnboundedSender<Payload>, // pub incoming: Receiver<Call>, pub request_counter: AtomicU64, @@ -33,13 +33,14 @@ pub struct Client { } impl Client { - pub fn start(ex: &Executor, cmd: &str, args: &[String]) -> Result<(Self, Receiver<Call>)> { - // smol makes sure the process is reaped on drop, but using kill_on_drop(true) maybe? + pub fn start(cmd: &str, args: &[String]) -> Result<(Self, UnboundedReceiver<Call>)> { let process = Command::new(cmd) .args(args) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) + // make sure the process is reaped on drop + .kill_on_drop(true) .spawn(); // use std::io::ErrorKind; @@ -58,7 +59,7 @@ impl Client { let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr")); - let (incoming, outgoing) = Transport::start(ex, reader, writer, stderr); + let (incoming, outgoing) = Transport::start(reader, writer, stderr); let client = Client { _process: process, @@ -134,49 +135,53 @@ impl Client { params: Self::value_into_params(params), }; - let (tx, rx) = smol::channel::bounded::<Result<Value>>(1); + let (tx, mut rx) = channel::<Result<Value>>(1); self.outgoing .send(Payload::Request { chan: tx, value: request, }) - .await .map_err(|e| Error::Other(e.into()))?; - use smol_timeout::TimeoutExt; use std::time::Duration; + use tokio::time::timeout; let future = async move { - rx.recv() - .timeout(Duration::from_secs(2)) + timeout(Duration::from_secs(2), rx.recv()) .await - .ok_or(Error::Timeout)? // return Timeout - .map_err(|e| Error::Other(e.into()))? + .map_err(|e| Error::Timeout)? // return Timeout + .unwrap() // TODO: None if channel closed }; Ok(future) } /// Send a RPC notification to the language server. - pub async fn notify<R: lsp::notification::Notification>(&self, params: R::Params) -> Result<()> + pub fn notify<R: lsp::notification::Notification>( + &self, + params: R::Params, + ) -> impl Future<Output = Result<()>> where R::Params: serde::Serialize, { - let params = serde_json::to_value(params)?; + let outgoing = self.outgoing.clone(); - let notification = jsonrpc::Notification { - jsonrpc: Some(jsonrpc::Version::V2), - method: R::METHOD.to_string(), - params: Self::value_into_params(params), - }; + async move { + let params = serde_json::to_value(params)?; - self.outgoing - .send(Payload::Notification(notification)) - .await - .map_err(|e| Error::Other(e.into()))?; + let notification = jsonrpc::Notification { + jsonrpc: Some(jsonrpc::Version::V2), + method: R::METHOD.to_string(), + params: Self::value_into_params(params), + }; - Ok(()) + outgoing + .send(Payload::Notification(notification)) + .map_err(|e| Error::Other(e.into()))?; + + Ok(()) + } } /// Reply to a language server RPC call. @@ -202,7 +207,6 @@ impl Client { self.outgoing .send(Payload::Response(output)) - .await .map_err(|e| Error::Other(e.into()))?; Ok(()) @@ -387,13 +391,13 @@ impl Client { changes } - pub async fn text_document_did_change( + pub fn text_document_did_change( &self, text_document: lsp::VersionedTextDocumentIdentifier, old_text: &Rope, new_text: &Rope, changes: &ChangeSet, - ) -> Result<()> { + ) -> Option<impl Future<Output = Result<()>>> { // figure out what kind of sync the server supports let capabilities = self.capabilities.as_ref().unwrap(); @@ -405,7 +409,7 @@ impl Client { .. })) => kind, // None | SyncOptions { changes: None } - _ => return Ok(()), + _ => return None, }; let changes = match sync_capabilities { @@ -420,14 +424,15 @@ impl Client { lsp::TextDocumentSyncKind::Incremental => { Self::changeset_to_changes(old_text, new_text, changes, self.offset_encoding) } - lsp::TextDocumentSyncKind::None => return Ok(()), + lsp::TextDocumentSyncKind::None => return None, }; - self.notify::<lsp::notification::DidChangeTextDocument>(lsp::DidChangeTextDocumentParams { - text_document, - content_changes: changes, - }) - .await + Some(self.notify::<lsp::notification::DidChangeTextDocument>( + lsp::DidChangeTextDocumentParams { + text_document, + content_changes: changes, + }, + )) } pub async fn text_document_did_close( diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index 6dcc6605..fd7e6fd3 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -18,6 +18,8 @@ use std::{collections::HashMap, sync::Arc}; use serde::{Deserialize, Serialize}; +use tokio_stream::wrappers::UnboundedReceiverStream; + #[derive(Error, Debug)] pub enum Error { #[error("protocol error: {0}")] @@ -163,12 +165,11 @@ pub use jsonrpc::Call; type LanguageId = String; use crate::select_all::SelectAll; -use smol::channel::Receiver; pub struct Registry { inner: HashMap<LanguageId, Option<Arc<Client>>>, - pub incoming: SelectAll<Receiver<Call>>, + pub incoming: SelectAll<UnboundedReceiverStream<Call>>, } impl Default for Registry { @@ -185,11 +186,7 @@ impl Registry { } } - pub fn get( - &mut self, - language_config: &LanguageConfiguration, - ex: &smol::Executor, - ) -> Option<Arc<Client>> { + pub fn get(&mut self, language_config: &LanguageConfiguration) -> Option<Arc<Client>> { // TODO: propagate the error if let Some(config) = &language_config.language_server { // avoid borrow issues @@ -203,12 +200,13 @@ impl Registry { // initialize a new client let (mut client, incoming) = - Client::start(&ex, &config.command, &config.args).ok()?; + Client::start(&config.command, &config.args).ok()?; // TODO: run this async without blocking - smol::block_on(client.initialize()).unwrap(); + let rt = tokio::runtime::Handle::current(); + rt.block_on(client.initialize()).unwrap(); - s_incoming.push(incoming); + s_incoming.push(UnboundedReceiverStream::new(incoming)); Some(Arc::new(client)) }) diff --git a/helix-lsp/src/select_all.rs b/helix-lsp/src/select_all.rs index 4a7f0cbf..edfd1f38 100644 --- a/helix-lsp/src/select_all.rs +++ b/helix-lsp/src/select_all.rs @@ -6,10 +6,10 @@ use core::{ pin::Pin, }; -use smol::{ready, stream::Stream}; use std::task::{Context, Poll}; use futures_util::stream::{FusedStream, FuturesUnordered, StreamExt, StreamFuture}; +use futures_util::{ready, stream::Stream}; /// An unbounded set of streams /// diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index d3e25b9c..4a5ae45a 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -10,15 +10,13 @@ type Result<T> = core::result::Result<T, Error>; use jsonrpc_core as jsonrpc; use serde_json::Value; -use smol::prelude::*; - -use smol::{ - channel::{Receiver, Sender}, - io::{BufReader, BufWriter}, +use tokio::{ + io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, process::{ChildStderr, ChildStdin, ChildStdout}, - Executor, + sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, }; +#[derive(Debug)] pub enum Payload { Request { chan: Sender<Result<Value>>, @@ -41,8 +39,8 @@ enum Message { } pub struct Transport { - incoming: Sender<jsonrpc::Call>, - outgoing: Receiver<Payload>, + incoming: UnboundedSender<jsonrpc::Call>, + outgoing: UnboundedReceiver<Payload>, pending_requests: HashMap<jsonrpc::Id, Sender<Result<Value>>>, headers: HashMap<String, String>, @@ -54,13 +52,12 @@ pub struct Transport { impl Transport { pub fn start( - ex: &Executor, reader: BufReader<ChildStdout>, writer: BufWriter<ChildStdin>, stderr: BufReader<ChildStderr>, - ) -> (Receiver<jsonrpc::Call>, Sender<Payload>) { - let (incoming, rx) = smol::channel::unbounded(); - let (tx, outgoing) = smol::channel::unbounded(); + ) -> (UnboundedReceiver<jsonrpc::Call>, UnboundedSender<Payload>) { + let (incoming, rx) = unbounded_channel(); + let (tx, outgoing) = unbounded_channel(); let transport = Self { reader, @@ -72,7 +69,7 @@ impl Transport { headers: HashMap::default(), }; - ex.spawn(transport.duplex()).detach(); + tokio::spawn(transport.duplex()); (rx, tx) } @@ -168,7 +165,7 @@ impl Transport { match msg { Message::Output(output) => self.recv_response(output).await?, Message::Call(call) => { - self.incoming.send(call).await?; + self.incoming.send(call).unwrap(); // let notification = Notification::parse(&method, params); } }; @@ -204,11 +201,10 @@ impl Transport { } pub async fn duplex(mut self) { - use futures_util::{select, FutureExt}; loop { - select! { + tokio::select! { // client -> server - msg = self.outgoing.next().fuse() => { + msg = self.outgoing.recv() => { if msg.is_none() { break; } @@ -217,7 +213,7 @@ impl Transport { self.send_payload(msg).await.unwrap(); } // server <- client - msg = Self::recv(&mut self.reader, &mut self.headers).fuse() => { + msg = Self::recv(&mut self.reader, &mut self.headers) => { if msg.is_err() { error!("err: <- {:?}", msg); break; @@ -226,7 +222,7 @@ impl Transport { self.recv_msg(msg).await.unwrap(); } - _msg = Self::err(&mut self.stderr).fuse() => {} + _msg = Self::err(&mut self.stderr) => {} } } } |