diff options
Diffstat (limited to 'helix-term/src/job.rs')
-rw-r--r-- | helix-term/src/job.rs | 55 |
1 files changed, 43 insertions, 12 deletions
diff --git a/helix-term/src/job.rs b/helix-term/src/job.rs index 19f2521a..72ed892d 100644 --- a/helix-term/src/job.rs +++ b/helix-term/src/job.rs @@ -1,13 +1,37 @@ +use helix_event::status::StatusMessage; +use helix_event::{runtime_local, send_blocking}; use helix_view::Editor; +use once_cell::sync::OnceCell; use crate::compositor::Compositor; use futures_util::future::{BoxFuture, Future, FutureExt}; use futures_util::stream::{FuturesUnordered, StreamExt}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; pub type EditorCompositorCallback = Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>; pub type EditorCallback = Box<dyn FnOnce(&mut Editor) + Send>; +runtime_local! { + static JOB_QUEUE: OnceCell<Sender<Callback>> = OnceCell::new(); +} + +pub async fn dispatch_callback(job: Callback) { + let _ = JOB_QUEUE.wait().send(job).await; +} + +pub async fn dispatch(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) { + let _ = JOB_QUEUE + .wait() + .send(Callback::EditorCompositor(Box::new(job))) + .await; +} + +pub fn dispatch_blocking(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) { + let jobs = JOB_QUEUE.wait(); + send_blocking(jobs, Callback::EditorCompositor(Box::new(job))) +} + pub enum Callback { EditorCompositor(EditorCompositorCallback), Editor(EditorCallback), @@ -21,11 +45,11 @@ pub struct Job { pub wait: bool, } -#[derive(Default)] pub struct Jobs { - pub futures: FuturesUnordered<JobFuture>, - /// These are the ones that need to complete before we exit. + /// jobs that need to complete before we exit. pub wait_futures: FuturesUnordered<JobFuture>, + pub callbacks: Receiver<Callback>, + pub status_messages: Receiver<StatusMessage>, } impl Job { @@ -52,8 +76,16 @@ impl Job { } impl Jobs { + #[allow(clippy::new_without_default)] pub fn new() -> Self { - Self::default() + let (tx, rx) = channel(1024); + let _ = JOB_QUEUE.set(tx); + let status_messages = helix_event::status::setup(); + Self { + wait_futures: FuturesUnordered::new(), + callbacks: rx, + status_messages, + } } pub fn spawn<F: Future<Output = anyhow::Result<()>> + Send + 'static>(&mut self, f: F) { @@ -85,18 +117,17 @@ impl Jobs { } } - pub async fn next_job(&mut self) -> Option<anyhow::Result<Option<Callback>>> { - tokio::select! { - event = self.futures.next() => { event } - event = self.wait_futures.next() => { event } - } - } - pub fn add(&self, j: Job) { if j.wait { self.wait_futures.push(j.future); } else { - self.futures.push(j.future); + tokio::spawn(async move { + match j.future.await { + Ok(Some(cb)) => dispatch_callback(cb).await, + Ok(None) => (), + Err(err) => helix_event::status::report(err).await, + } + }); } } |