From a938f5a87a2b67dd8667337cd569cc6627d0f666 Mon Sep 17 00:00:00 2001 From: Dmitry Sharshakov Date: Sat, 21 Aug 2021 17:21:35 +0300 Subject: refactor: handle DAP events in editor main loop --- helix-dap/src/client.rs | 74 ++++++++++++++++--------------------------------- 1 file changed, 24 insertions(+), 50 deletions(-) (limited to 'helix-dap/src') diff --git a/helix-dap/src/client.rs b/helix-dap/src/client.rs index f2d5b865..bc7a93d8 100644 --- a/helix-dap/src/client.rs +++ b/helix-dap/src/client.rs @@ -1,5 +1,5 @@ use crate::{ - transport::{Event, Payload, Request, Transport}, + transport::{Payload, Request, Transport}, types::*, Result, }; @@ -9,19 +9,13 @@ use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, process::Stdio, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::atomic::{AtomicU64, Ordering}, }; use tokio::{ io::{AsyncBufRead, AsyncWrite, BufReader, BufWriter}, net::TcpStream, process::{Child, Command}, - sync::{ - mpsc::{channel, Receiver, Sender, UnboundedReceiver, UnboundedSender}, - Mutex, - }, + sync::mpsc::{channel, unbounded_channel, UnboundedReceiver, UnboundedSender}, time, }; @@ -32,8 +26,6 @@ pub struct Client { server_tx: UnboundedSender, request_counter: AtomicU64, capabilities: Option, - awaited_events: Arc>>>, - // pub breakpoints: HashMap>, // TODO: multiple threads support @@ -46,8 +38,9 @@ impl Client { tx: Box, id: usize, process: Option, - ) -> Result { + ) -> Result<(Self, UnboundedReceiver)> { let (server_rx, server_tx) = Transport::start(rx, tx, id); + let (client_rx, client_tx) = unbounded_channel(); let client = Self { id, @@ -55,24 +48,30 @@ impl Client { server_tx, request_counter: AtomicU64::new(0), capabilities: None, - awaited_events: Arc::new(Mutex::new(HashMap::default())), // breakpoints: HashMap::new(), stack_pointer: None, }; - tokio::spawn(Self::recv(Arc::clone(&client.awaited_events), server_rx)); + tokio::spawn(Self::recv(server_rx, client_rx)); - Ok(client) + Ok((client, client_tx)) } - pub async fn tcp(addr: std::net::SocketAddr, id: usize) -> Result { + pub async fn tcp( + addr: std::net::SocketAddr, + id: usize, + ) -> Result<(Self, UnboundedReceiver)> { let stream = TcpStream::connect(addr).await?; let (rx, tx) = stream.into_split(); Self::streams(Box::new(BufReader::new(rx)), Box::new(tx), id, None) } - pub fn stdio(cmd: &str, args: Vec<&str>, id: usize) -> Result { + pub fn stdio( + cmd: &str, + args: Vec<&str>, + id: usize, + ) -> Result<(Self, UnboundedReceiver)> { let process = Command::new(cmd) .args(args) .stdin(Stdio::piped()) @@ -114,7 +113,7 @@ impl Client { args: Vec<&str>, port_format: &str, id: usize, - ) -> Result { + ) -> Result<(Self, UnboundedReceiver)> { let port = Self::get_port().await.unwrap(); let process = Command::new(cmd) @@ -145,43 +144,22 @@ impl Client { ) } - async fn recv( - awaited_events: Arc>>>, - mut server_rx: UnboundedReceiver, - ) { + async fn recv(mut server_rx: UnboundedReceiver, client_tx: UnboundedSender) { while let Some(msg) = server_rx.recv().await { match msg { Payload::Event(ev) => { - let name = ev.event.clone(); - let hashmap = awaited_events.lock().await; - let tx = hashmap.get(&name); - - match tx { - Some(tx) => match tx.send(ev).await { - Ok(_) => (), - Err(_) => error!( - "Tried sending event into a closed channel (name={:?})", - name - ), - }, - None => { - info!("unhandled event {}", name); - // client_tx.send(Payload::Event(ev)).expect("Failed to send"); - } - } + client_tx.send(Payload::Event(ev)).expect("Failed to send"); } Payload::Response(_) => unreachable!(), - Payload::Request(_) => todo!(), + Payload::Request(req) => { + client_tx + .send(Payload::Request(req)) + .expect("Failed to send"); + } } } } - pub async fn listen_for_event(&self, name: String) -> Receiver { - let (rx, tx) = channel(1); - self.awaited_events.lock().await.insert(name.clone(), rx); - tx - } - pub fn id(&self) -> usize { self.id } @@ -248,8 +226,6 @@ impl Client { } pub async fn launch(&mut self, args: serde_json::Value) -> Result<()> { - // TODO: buffer these until initialized arrives - let response = self.request::(args).await?; log::error!("launch response {}", response); @@ -257,8 +233,6 @@ impl Client { } pub async fn attach(&mut self, args: serde_json::Value) -> Result<()> { - // TODO: buffer these until initialized arrives - let response = self.request::(args).await?; log::error!("attach response {}", response); -- cgit v1.2.3-70-g09d2