aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitry Sharshakov2021-08-14 05:42:06 +0000
committerBlaž Hrastnik2021-08-20 04:43:54 +0000
commitd6de5408b71f23032c57fbb7f122d3295605dd33 (patch)
treeea28145e537680d445c712ef35efd7290e344c56
parent59d6b92e5b05a829dd2aeb0994afc51dec0da87f (diff)
dispatch events in client
-rw-r--r--helix-dap/examples/dap-basic.rs6
-rw-r--r--helix-dap/src/client.rs108
2 files changed, 65 insertions, 49 deletions
diff --git a/helix-dap/examples/dap-basic.rs b/helix-dap/examples/dap-basic.rs
index 82e14702..76fc0dd3 100644
--- a/helix-dap/examples/dap-basic.rs
+++ b/helix-dap/examples/dap-basic.rs
@@ -56,8 +56,12 @@ pub async fn main() -> Result<()> {
.read_line(&mut _in)
.expect("Failed to read line");
+ let mut stopped_event = client.listen_for_event("stopped".to_owned()).await;
+
println!("configurationDone: {:?}", client.configuration_done().await);
- println!("stopped: {:?}", client.wait_for_stopped().await);
+
+ println!("stopped: {:?}", stopped_event.recv().await);
+
println!("threads: {:#?}", client.threads().await);
let bt = client.stack_trace(1).await.expect("expected stack trace");
println!("stack trace: {:#?}", bt);
diff --git a/helix-dap/src/client.rs b/helix-dap/src/client.rs
index 03e58757..f648b248 100644
--- a/helix-dap/src/client.rs
+++ b/helix-dap/src/client.rs
@@ -2,15 +2,22 @@ use crate::{
transport::{Event, Payload, Request, Response, Transport},
Result,
};
+use log::{error, info};
use serde::{Deserialize, Serialize};
use serde_json::{from_value, to_value, Value};
-use std::process::Stdio;
-use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::{
+ atomic::{AtomicU64, Ordering},
+ Arc,
+};
+use std::{collections::HashMap, process::Stdio};
use tokio::{
io::{AsyncBufRead, AsyncWrite, BufReader, BufWriter},
net::TcpStream,
process::{Child, Command},
- sync::mpsc::{channel, UnboundedReceiver, UnboundedSender},
+ sync::{
+ mpsc::{channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
+ Mutex,
+ },
};
#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
@@ -252,9 +259,9 @@ pub struct Client {
id: usize,
_process: Option<Child>,
server_tx: UnboundedSender<Request>,
- server_rx: UnboundedReceiver<Payload>,
request_counter: AtomicU64,
capabilities: Option<DebuggerCapabilities>,
+ awaited_events: Arc<Mutex<HashMap<String, Sender<Event>>>>,
}
impl Client {
@@ -270,14 +277,52 @@ impl Client {
id,
_process: process,
server_tx,
- server_rx,
request_counter: AtomicU64::new(0),
capabilities: None,
+ awaited_events: Arc::new(Mutex::new(HashMap::default())),
};
+ tokio::spawn(Self::recv(Arc::clone(&client.awaited_events), server_rx));
+
Ok(client)
}
+ async fn recv(
+ awaited_events: Arc<Mutex<HashMap<String, Sender<Event>>>>,
+ mut server_rx: UnboundedReceiver<Payload>,
+ ) {
+ while let Some(msg) = server_rx.recv().await {
+ match msg {
+ Payload::Event(ev) => {
+ let name = ev.event.clone();
+ let tx = awaited_events.lock().await.remove(&name);
+
+ match tx {
+ Some(tx) => match tx.send(ev).await {
+ Ok(_) => (),
+ Err(_) => error!(
+ "Tried sending event into a closed channel (name={:?})",
+ name
+ ),
+ },
+ None => {
+ info!("unhandled event");
+ // client_tx.send(Payload::Event(ev)).expect("Failed to send");
+ }
+ }
+ }
+ Payload::Response(_) => unreachable!(),
+ Payload::Request(_) => todo!(),
+ }
+ }
+ }
+
+ pub async fn listen_for_event(&self, name: String) -> Receiver<Event> {
+ let (rx, tx) = channel(1);
+ self.awaited_events.lock().await.insert(name.clone(), rx);
+ tx
+ }
+
pub async fn tcp(addr: std::net::SocketAddr, id: usize) -> Result<Self> {
let stream = TcpStream::connect(addr).await?;
let (rx, tx) = stream.into_split();
@@ -373,45 +418,25 @@ impl Client {
}
pub async fn launch(&mut self, args: impl Serialize) -> Result<()> {
+ let mut initialized = self.listen_for_event("initialized".to_owned()).await;
+
self.request("launch".to_owned(), to_value(args).ok())
.await?;
- match self
- .server_rx
- .recv()
- .await
- .expect("Expected initialized event")
- {
- Payload::Event(Event { event, .. }) => {
- if event == *"initialized" {
- Ok(())
- } else {
- unreachable!()
- }
- }
- _ => unreachable!(),
- }
+ initialized.recv().await;
+
+ Ok(())
}
pub async fn attach(&mut self, args: impl Serialize) -> Result<()> {
+ let mut initialized = self.listen_for_event("initialized".to_owned()).await;
+
self.request("attach".to_owned(), to_value(args).ok())
.await?;
- match self
- .server_rx
- .recv()
- .await
- .expect("Expected initialized event")
- {
- Payload::Event(Event { event, .. }) => {
- if event == *"initialized" {
- Ok(())
- } else {
- unreachable!()
- }
- }
- _ => unreachable!(),
- }
+ initialized.recv().await;
+
+ Ok(())
}
pub async fn set_breakpoints(
@@ -447,19 +472,6 @@ impl Client {
Ok(())
}
- pub async fn wait_for_stopped(&mut self) -> Result<()> {
- match self.server_rx.recv().await.expect("Expected stopped event") {
- Payload::Event(Event { event, .. }) => {
- if event == *"stopped" {
- Ok(())
- } else {
- unreachable!()
- }
- }
- _ => unreachable!(),
- }
- }
-
pub async fn continue_thread(&mut self, thread_id: usize) -> Result<Option<bool>> {
let args = ContinueArguments { thread_id };