use helix_view::Editor;
use crate::compositor::Compositor;
use futures_util::future::{BoxFuture, Future, FutureExt};
use futures_util::stream::{FuturesUnordered, StreamExt};
pub enum Callback {
EditorCompositor(Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>),
Editor(Box<dyn FnOnce(&mut Editor) + Send>),
}
pub type JobFuture = BoxFuture<'static, anyhow::Result<Option<Callback>>>;
pub struct Job {
pub future: BoxFuture<'static, anyhow::Result<Option<Callback>>>,
/// Do we need to wait for this job to finish before exiting?
pub wait: bool,
}
#[derive(Default)]
pub struct Jobs {
pub futures: FuturesUnordered<JobFuture>,
/// These are the ones that need to complete before we exit.
pub wait_futures: FuturesUnordered<JobFuture>,
}
impl Job {
pub fn new<F: Future<Output = anyhow::Result<()>> + Send + 'static>(f: F) -> Self {
Self {
future: f.map(|r| r.map(|()| None)).boxed(),
wait: false,
}
}
pub fn with_callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>(
f: F,
) -> Self {
Self {
future: f.map(|r| r.map(Some)).boxed(),
wait: false,
}
}
pub fn wait_before_exiting(mut self) -> Self {
self.wait = true;
self
}
}
impl Jobs {
pub fn new() -> Self {
Self::default()
}
pub fn spawn<F: Future<Output = anyhow::Result<()>> + Send + 'static>(&mut self, f: F) {
self.add(Job::new(f));
}
pub fn callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>(
&mut self,
f: F,
) {
self.add(Job::with_callback(f));
}
pub fn handle_callback(
&self,
editor: &mut Editor,
compositor: &mut Compositor,
call: anyhow::Result<Option<Callback>>,
) {
match call {
Ok(None) => {}
Ok(Some(call)) => match call {
Callback::EditorCompositor(call) => call(editor, compositor),
Callback::Editor(call) => call(editor),
},
Err(e) => {
editor.set_error(format!("Async job failed: {}", e));
}
}
}
pub async fn next_job(&mut self) -> Option<anyhow::Result<Option<Callback>>> {
tokio::select! {
event = self.futures.next() => { event }
event = self.wait_futures.next() => { event }
}
}
pub fn add(&self, j: Job) {
if j.wait {
self.wait_futures.push(j.future);
} else {
self.futures.push(j.future);
}
}
/// Blocks until all the jobs that need to be waited on are done.
pub async fn finish(
&mut self,
editor: &mut Editor,
mut compositor: Option<&mut Compositor>,
) -> anyhow::Result<()> {
log::debug!("waiting on jobs...");
let mut wait_futures = std::mem::take(&mut self.wait_futures);
while let (Some(job), tail) = wait_futures.into_future().await {
match job {
Ok(callback) => {
wait_futures = tail;
if let Some(callback) = callback {
// clippy doesn't realize this is an error without the derefs
#[allow(clippy::needless_option_as_deref)]
match callback {
Callback::EditorCompositor(call) if compositor.is_some() => {
call(editor, compositor.as_deref_mut().unwrap())
}
Callback::Editor(call) => call(editor),
// skip callbacks for which we don't have the necessary references
_ => (),
}
}
}
Err(e) => {
self.wait_futures = tail;
return Err(e);
}
}
}
Ok(())
}
}