aboutsummaryrefslogtreecommitdiff
path: root/helix-lsp
diff options
context:
space:
mode:
authorBlaž Hrastnik2021-05-06 04:56:34 +0000
committerBlaž Hrastnik2021-05-06 04:56:34 +0000
commit355ad3cb8289611b06cd42fa62ddfe0a5c716e83 (patch)
tree7c94da6e122a9ecf542103b46a3ca9e80654a52e /helix-lsp
parent0e5308bce1a6e7d7d00854ae50902546cea9578d (diff)
Tokio migration.
Diffstat (limited to 'helix-lsp')
-rw-r--r--helix-lsp/Cargo.toml4
-rw-r--r--helix-lsp/src/client.rs79
-rw-r--r--helix-lsp/src/lib.rs18
-rw-r--r--helix-lsp/src/select_all.rs2
-rw-r--r--helix-lsp/src/transport.rs34
5 files changed, 68 insertions, 69 deletions
diff --git a/helix-lsp/Cargo.toml b/helix-lsp/Cargo.toml
index 549e098e..24bb2080 100644
--- a/helix-lsp/Cargo.toml
+++ b/helix-lsp/Cargo.toml
@@ -12,8 +12,8 @@ helix-core = { path = "../helix-core" }
once_cell = "1.4"
lsp-types = { version = "0.89", features = ["proposed"] }
-smol = "1.2"
-smol-timeout = "0.6"
+tokio = { version = "1", features = ["full"] }
+tokio-stream = "0.1.5"
url = "2"
pathdiff = "0.2"
shellexpand = "2.0"
diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs
index c70e6e78..c3de4fd7 100644
--- a/helix-lsp/src/client.rs
+++ b/helix-lsp/src/client.rs
@@ -13,18 +13,18 @@ use jsonrpc_core as jsonrpc;
use lsp_types as lsp;
use serde_json::Value;
-use smol::{
- channel::{Receiver, Sender},
+use std::process::Stdio;
+use tokio::{
io::{BufReader, BufWriter},
// prelude::*,
- process::{Child, Command, Stdio},
- Executor,
+ process::{Child, Command},
+ sync::mpsc::{channel, UnboundedReceiver, UnboundedSender},
};
pub struct Client {
_process: Child,
- outgoing: Sender<Payload>,
+ outgoing: UnboundedSender<Payload>,
// pub incoming: Receiver<Call>,
pub request_counter: AtomicU64,
@@ -33,13 +33,14 @@ pub struct Client {
}
impl Client {
- pub fn start(ex: &Executor, cmd: &str, args: &[String]) -> Result<(Self, Receiver<Call>)> {
- // smol makes sure the process is reaped on drop, but using kill_on_drop(true) maybe?
+ pub fn start(cmd: &str, args: &[String]) -> Result<(Self, UnboundedReceiver<Call>)> {
let process = Command::new(cmd)
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
+ // make sure the process is reaped on drop
+ .kill_on_drop(true)
.spawn();
// use std::io::ErrorKind;
@@ -58,7 +59,7 @@ impl Client {
let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout"));
let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr"));
- let (incoming, outgoing) = Transport::start(ex, reader, writer, stderr);
+ let (incoming, outgoing) = Transport::start(reader, writer, stderr);
let client = Client {
_process: process,
@@ -134,49 +135,53 @@ impl Client {
params: Self::value_into_params(params),
};
- let (tx, rx) = smol::channel::bounded::<Result<Value>>(1);
+ let (tx, mut rx) = channel::<Result<Value>>(1);
self.outgoing
.send(Payload::Request {
chan: tx,
value: request,
})
- .await
.map_err(|e| Error::Other(e.into()))?;
- use smol_timeout::TimeoutExt;
use std::time::Duration;
+ use tokio::time::timeout;
let future = async move {
- rx.recv()
- .timeout(Duration::from_secs(2))
+ timeout(Duration::from_secs(2), rx.recv())
.await
- .ok_or(Error::Timeout)? // return Timeout
- .map_err(|e| Error::Other(e.into()))?
+ .map_err(|e| Error::Timeout)? // return Timeout
+ .unwrap() // TODO: None if channel closed
};
Ok(future)
}
/// Send a RPC notification to the language server.
- pub async fn notify<R: lsp::notification::Notification>(&self, params: R::Params) -> Result<()>
+ pub fn notify<R: lsp::notification::Notification>(
+ &self,
+ params: R::Params,
+ ) -> impl Future<Output = Result<()>>
where
R::Params: serde::Serialize,
{
- let params = serde_json::to_value(params)?;
+ let outgoing = self.outgoing.clone();
- let notification = jsonrpc::Notification {
- jsonrpc: Some(jsonrpc::Version::V2),
- method: R::METHOD.to_string(),
- params: Self::value_into_params(params),
- };
+ async move {
+ let params = serde_json::to_value(params)?;
- self.outgoing
- .send(Payload::Notification(notification))
- .await
- .map_err(|e| Error::Other(e.into()))?;
+ let notification = jsonrpc::Notification {
+ jsonrpc: Some(jsonrpc::Version::V2),
+ method: R::METHOD.to_string(),
+ params: Self::value_into_params(params),
+ };
- Ok(())
+ outgoing
+ .send(Payload::Notification(notification))
+ .map_err(|e| Error::Other(e.into()))?;
+
+ Ok(())
+ }
}
/// Reply to a language server RPC call.
@@ -202,7 +207,6 @@ impl Client {
self.outgoing
.send(Payload::Response(output))
- .await
.map_err(|e| Error::Other(e.into()))?;
Ok(())
@@ -387,13 +391,13 @@ impl Client {
changes
}
- pub async fn text_document_did_change(
+ pub fn text_document_did_change(
&self,
text_document: lsp::VersionedTextDocumentIdentifier,
old_text: &Rope,
new_text: &Rope,
changes: &ChangeSet,
- ) -> Result<()> {
+ ) -> Option<impl Future<Output = Result<()>>> {
// figure out what kind of sync the server supports
let capabilities = self.capabilities.as_ref().unwrap();
@@ -405,7 +409,7 @@ impl Client {
..
})) => kind,
// None | SyncOptions { changes: None }
- _ => return Ok(()),
+ _ => return None,
};
let changes = match sync_capabilities {
@@ -420,14 +424,15 @@ impl Client {
lsp::TextDocumentSyncKind::Incremental => {
Self::changeset_to_changes(old_text, new_text, changes, self.offset_encoding)
}
- lsp::TextDocumentSyncKind::None => return Ok(()),
+ lsp::TextDocumentSyncKind::None => return None,
};
- self.notify::<lsp::notification::DidChangeTextDocument>(lsp::DidChangeTextDocumentParams {
- text_document,
- content_changes: changes,
- })
- .await
+ Some(self.notify::<lsp::notification::DidChangeTextDocument>(
+ lsp::DidChangeTextDocumentParams {
+ text_document,
+ content_changes: changes,
+ },
+ ))
}
pub async fn text_document_did_close(
diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs
index 6dcc6605..fd7e6fd3 100644
--- a/helix-lsp/src/lib.rs
+++ b/helix-lsp/src/lib.rs
@@ -18,6 +18,8 @@ use std::{collections::HashMap, sync::Arc};
use serde::{Deserialize, Serialize};
+use tokio_stream::wrappers::UnboundedReceiverStream;
+
#[derive(Error, Debug)]
pub enum Error {
#[error("protocol error: {0}")]
@@ -163,12 +165,11 @@ pub use jsonrpc::Call;
type LanguageId = String;
use crate::select_all::SelectAll;
-use smol::channel::Receiver;
pub struct Registry {
inner: HashMap<LanguageId, Option<Arc<Client>>>,
- pub incoming: SelectAll<Receiver<Call>>,
+ pub incoming: SelectAll<UnboundedReceiverStream<Call>>,
}
impl Default for Registry {
@@ -185,11 +186,7 @@ impl Registry {
}
}
- pub fn get(
- &mut self,
- language_config: &LanguageConfiguration,
- ex: &smol::Executor,
- ) -> Option<Arc<Client>> {
+ pub fn get(&mut self, language_config: &LanguageConfiguration) -> Option<Arc<Client>> {
// TODO: propagate the error
if let Some(config) = &language_config.language_server {
// avoid borrow issues
@@ -203,12 +200,13 @@ impl Registry {
// initialize a new client
let (mut client, incoming) =
- Client::start(&ex, &config.command, &config.args).ok()?;
+ Client::start(&config.command, &config.args).ok()?;
// TODO: run this async without blocking
- smol::block_on(client.initialize()).unwrap();
+ let rt = tokio::runtime::Handle::current();
+ rt.block_on(client.initialize()).unwrap();
- s_incoming.push(incoming);
+ s_incoming.push(UnboundedReceiverStream::new(incoming));
Some(Arc::new(client))
})
diff --git a/helix-lsp/src/select_all.rs b/helix-lsp/src/select_all.rs
index 4a7f0cbf..edfd1f38 100644
--- a/helix-lsp/src/select_all.rs
+++ b/helix-lsp/src/select_all.rs
@@ -6,10 +6,10 @@ use core::{
pin::Pin,
};
-use smol::{ready, stream::Stream};
use std::task::{Context, Poll};
use futures_util::stream::{FusedStream, FuturesUnordered, StreamExt, StreamFuture};
+use futures_util::{ready, stream::Stream};
/// An unbounded set of streams
///
diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs
index d3e25b9c..4a5ae45a 100644
--- a/helix-lsp/src/transport.rs
+++ b/helix-lsp/src/transport.rs
@@ -10,15 +10,13 @@ type Result<T> = core::result::Result<T, Error>;
use jsonrpc_core as jsonrpc;
use serde_json::Value;
-use smol::prelude::*;
-
-use smol::{
- channel::{Receiver, Sender},
- io::{BufReader, BufWriter},
+use tokio::{
+ io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
process::{ChildStderr, ChildStdin, ChildStdout},
- Executor,
+ sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
};
+#[derive(Debug)]
pub enum Payload {
Request {
chan: Sender<Result<Value>>,
@@ -41,8 +39,8 @@ enum Message {
}
pub struct Transport {
- incoming: Sender<jsonrpc::Call>,
- outgoing: Receiver<Payload>,
+ incoming: UnboundedSender<jsonrpc::Call>,
+ outgoing: UnboundedReceiver<Payload>,
pending_requests: HashMap<jsonrpc::Id, Sender<Result<Value>>>,
headers: HashMap<String, String>,
@@ -54,13 +52,12 @@ pub struct Transport {
impl Transport {
pub fn start(
- ex: &Executor,
reader: BufReader<ChildStdout>,
writer: BufWriter<ChildStdin>,
stderr: BufReader<ChildStderr>,
- ) -> (Receiver<jsonrpc::Call>, Sender<Payload>) {
- let (incoming, rx) = smol::channel::unbounded();
- let (tx, outgoing) = smol::channel::unbounded();
+ ) -> (UnboundedReceiver<jsonrpc::Call>, UnboundedSender<Payload>) {
+ let (incoming, rx) = unbounded_channel();
+ let (tx, outgoing) = unbounded_channel();
let transport = Self {
reader,
@@ -72,7 +69,7 @@ impl Transport {
headers: HashMap::default(),
};
- ex.spawn(transport.duplex()).detach();
+ tokio::spawn(transport.duplex());
(rx, tx)
}
@@ -168,7 +165,7 @@ impl Transport {
match msg {
Message::Output(output) => self.recv_response(output).await?,
Message::Call(call) => {
- self.incoming.send(call).await?;
+ self.incoming.send(call).unwrap();
// let notification = Notification::parse(&method, params);
}
};
@@ -204,11 +201,10 @@ impl Transport {
}
pub async fn duplex(mut self) {
- use futures_util::{select, FutureExt};
loop {
- select! {
+ tokio::select! {
// client -> server
- msg = self.outgoing.next().fuse() => {
+ msg = self.outgoing.recv() => {
if msg.is_none() {
break;
}
@@ -217,7 +213,7 @@ impl Transport {
self.send_payload(msg).await.unwrap();
}
// server <- client
- msg = Self::recv(&mut self.reader, &mut self.headers).fuse() => {
+ msg = Self::recv(&mut self.reader, &mut self.headers) => {
if msg.is_err() {
error!("err: <- {:?}", msg);
break;
@@ -226,7 +222,7 @@ impl Transport {
self.recv_msg(msg).await.unwrap();
}
- _msg = Self::err(&mut self.stderr).fuse() => {}
+ _msg = Self::err(&mut self.stderr) => {}
}
}
}