aboutsummaryrefslogtreecommitdiff
path: root/helix-term/src/job.rs
diff options
context:
space:
mode:
Diffstat (limited to 'helix-term/src/job.rs')
-rw-r--r--helix-term/src/job.rs55
1 files changed, 43 insertions, 12 deletions
diff --git a/helix-term/src/job.rs b/helix-term/src/job.rs
index 19f2521a..72ed892d 100644
--- a/helix-term/src/job.rs
+++ b/helix-term/src/job.rs
@@ -1,13 +1,37 @@
+use helix_event::status::StatusMessage;
+use helix_event::{runtime_local, send_blocking};
use helix_view::Editor;
+use once_cell::sync::OnceCell;
use crate::compositor::Compositor;
use futures_util::future::{BoxFuture, Future, FutureExt};
use futures_util::stream::{FuturesUnordered, StreamExt};
+use tokio::sync::mpsc::{channel, Receiver, Sender};
pub type EditorCompositorCallback = Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>;
pub type EditorCallback = Box<dyn FnOnce(&mut Editor) + Send>;
+runtime_local! {
+ static JOB_QUEUE: OnceCell<Sender<Callback>> = OnceCell::new();
+}
+
+pub async fn dispatch_callback(job: Callback) {
+ let _ = JOB_QUEUE.wait().send(job).await;
+}
+
+pub async fn dispatch(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) {
+ let _ = JOB_QUEUE
+ .wait()
+ .send(Callback::EditorCompositor(Box::new(job)))
+ .await;
+}
+
+pub fn dispatch_blocking(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) {
+ let jobs = JOB_QUEUE.wait();
+ send_blocking(jobs, Callback::EditorCompositor(Box::new(job)))
+}
+
pub enum Callback {
EditorCompositor(EditorCompositorCallback),
Editor(EditorCallback),
@@ -21,11 +45,11 @@ pub struct Job {
pub wait: bool,
}
-#[derive(Default)]
pub struct Jobs {
- pub futures: FuturesUnordered<JobFuture>,
- /// These are the ones that need to complete before we exit.
+ /// jobs that need to complete before we exit.
pub wait_futures: FuturesUnordered<JobFuture>,
+ pub callbacks: Receiver<Callback>,
+ pub status_messages: Receiver<StatusMessage>,
}
impl Job {
@@ -52,8 +76,16 @@ impl Job {
}
impl Jobs {
+ #[allow(clippy::new_without_default)]
pub fn new() -> Self {
- Self::default()
+ let (tx, rx) = channel(1024);
+ let _ = JOB_QUEUE.set(tx);
+ let status_messages = helix_event::status::setup();
+ Self {
+ wait_futures: FuturesUnordered::new(),
+ callbacks: rx,
+ status_messages,
+ }
}
pub fn spawn<F: Future<Output = anyhow::Result<()>> + Send + 'static>(&mut self, f: F) {
@@ -85,18 +117,17 @@ impl Jobs {
}
}
- pub async fn next_job(&mut self) -> Option<anyhow::Result<Option<Callback>>> {
- tokio::select! {
- event = self.futures.next() => { event }
- event = self.wait_futures.next() => { event }
- }
- }
-
pub fn add(&self, j: Job) {
if j.wait {
self.wait_futures.push(j.future);
} else {
- self.futures.push(j.future);
+ tokio::spawn(async move {
+ match j.future.await {
+ Ok(Some(cb)) => dispatch_callback(cb).await,
+ Ok(None) => (),
+ Err(err) => helix_event::status::report(err).await,
+ }
+ });
}
}