aboutsummaryrefslogtreecommitdiff
path: root/helix-lsp/src/transport.rs
diff options
context:
space:
mode:
Diffstat (limited to 'helix-lsp/src/transport.rs')
-rw-r--r--helix-lsp/src/transport.rs34
1 files changed, 15 insertions, 19 deletions
diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs
index d3e25b9c..4a5ae45a 100644
--- a/helix-lsp/src/transport.rs
+++ b/helix-lsp/src/transport.rs
@@ -10,15 +10,13 @@ type Result<T> = core::result::Result<T, Error>;
use jsonrpc_core as jsonrpc;
use serde_json::Value;
-use smol::prelude::*;
-
-use smol::{
- channel::{Receiver, Sender},
- io::{BufReader, BufWriter},
+use tokio::{
+ io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
process::{ChildStderr, ChildStdin, ChildStdout},
- Executor,
+ sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
};
+#[derive(Debug)]
pub enum Payload {
Request {
chan: Sender<Result<Value>>,
@@ -41,8 +39,8 @@ enum Message {
}
pub struct Transport {
- incoming: Sender<jsonrpc::Call>,
- outgoing: Receiver<Payload>,
+ incoming: UnboundedSender<jsonrpc::Call>,
+ outgoing: UnboundedReceiver<Payload>,
pending_requests: HashMap<jsonrpc::Id, Sender<Result<Value>>>,
headers: HashMap<String, String>,
@@ -54,13 +52,12 @@ pub struct Transport {
impl Transport {
pub fn start(
- ex: &Executor,
reader: BufReader<ChildStdout>,
writer: BufWriter<ChildStdin>,
stderr: BufReader<ChildStderr>,
- ) -> (Receiver<jsonrpc::Call>, Sender<Payload>) {
- let (incoming, rx) = smol::channel::unbounded();
- let (tx, outgoing) = smol::channel::unbounded();
+ ) -> (UnboundedReceiver<jsonrpc::Call>, UnboundedSender<Payload>) {
+ let (incoming, rx) = unbounded_channel();
+ let (tx, outgoing) = unbounded_channel();
let transport = Self {
reader,
@@ -72,7 +69,7 @@ impl Transport {
headers: HashMap::default(),
};
- ex.spawn(transport.duplex()).detach();
+ tokio::spawn(transport.duplex());
(rx, tx)
}
@@ -168,7 +165,7 @@ impl Transport {
match msg {
Message::Output(output) => self.recv_response(output).await?,
Message::Call(call) => {
- self.incoming.send(call).await?;
+ self.incoming.send(call).unwrap();
// let notification = Notification::parse(&method, params);
}
};
@@ -204,11 +201,10 @@ impl Transport {
}
pub async fn duplex(mut self) {
- use futures_util::{select, FutureExt};
loop {
- select! {
+ tokio::select! {
// client -> server
- msg = self.outgoing.next().fuse() => {
+ msg = self.outgoing.recv() => {
if msg.is_none() {
break;
}
@@ -217,7 +213,7 @@ impl Transport {
self.send_payload(msg).await.unwrap();
}
// server <- client
- msg = Self::recv(&mut self.reader, &mut self.headers).fuse() => {
+ msg = Self::recv(&mut self.reader, &mut self.headers) => {
if msg.is_err() {
error!("err: <- {:?}", msg);
break;
@@ -226,7 +222,7 @@ impl Transport {
self.recv_msg(msg).await.unwrap();
}
- _msg = Self::err(&mut self.stderr).fuse() => {}
+ _msg = Self::err(&mut self.stderr) => {}
}
}
}