From 30c93994b50888aaeb32c65c90426e997800ccea Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Fri, 14 Oct 2022 16:22:21 +0900 Subject: Use a single save_queue on the editor --- helix-term/src/application.rs | 20 +++++- helix-term/src/commands.rs | 3 +- helix-term/src/commands/typed.rs | 88 ++++++++++++++---------- helix-term/src/compositor.rs | 22 ++++-- helix-view/src/document.rs | 140 ++++++--------------------------------- helix-view/src/editor.rs | 79 +++++++++++++--------- 6 files changed, 160 insertions(+), 192 deletions(-) diff --git a/helix-term/src/application.rs b/helix-term/src/application.rs index 2e49e6d1..6010e745 100644 --- a/helix-term/src/application.rs +++ b/helix-term/src/application.rs @@ -1,5 +1,5 @@ use arc_swap::{access::Map, ArcSwap}; -use futures_util::Stream; +use futures_util::{Stream, StreamExt}; use helix_core::{ diagnostic::{DiagnosticTag, NumberOrString}, path::get_relative_path, @@ -968,6 +968,24 @@ impl Application { // errors along the way let mut errs = Vec::new(); + // TODO: deduplicate with ctx.block_try_flush_writes + tokio::task::block_in_place(|| { + helix_lsp::block_on(async { + while let Some(save_event) = self.editor.save_queue.next().await { + match &save_event { + Ok(event) => { + let doc = doc_mut!(self.editor, &event.doc_id); + doc.set_last_saved_revision(event.revision); + } + Err(err) => { + log::error!("error saving document: {}", err); + } + }; + // TODO: if is_err: break? + } + }) + }); + if let Err(err) = self .jobs .finish(&mut self.editor, Some(&mut self.compositor)) diff --git a/helix-term/src/commands.rs b/helix-term/src/commands.rs index f6d583f5..87bbd6c6 100644 --- a/helix-term/src/commands.rs +++ b/helix-term/src/commands.rs @@ -2541,7 +2541,8 @@ async fn make_format_callback( } if let Some((path, force)) = write { - if let Err(err) = doc.save(path, force) { + let id = doc.id(); + if let Err(err) = editor.save(id, path, force) { editor.set_error(format!("Error saving: {}", err)); } } diff --git a/helix-term/src/commands/typed.rs b/helix-term/src/commands/typed.rs index 070215cb..ef774256 100644 --- a/helix-term/src/commands/typed.rs +++ b/helix-term/src/commands/typed.rs @@ -79,12 +79,28 @@ fn buffer_close_by_ids_impl( doc_ids: &[DocumentId], force: bool, ) -> anyhow::Result<()> { + // TODO: deduplicate with ctx.block_try_flush_writes + tokio::task::block_in_place(|| { + helix_lsp::block_on(async { + while let Some(save_event) = editor.save_queue.next().await { + match &save_event { + Ok(event) => { + let doc = doc_mut!(editor, &event.doc_id); + doc.set_last_saved_revision(event.revision); + } + Err(err) => { + log::error!("error saving document: {}", err); + } + }; + // TODO: if is_err: break? + } + }) + }); + let (modified_ids, modified_names): (Vec<_>, Vec<_>) = doc_ids .iter() .filter_map(|&doc_id| { - if let Err(CloseError::BufferModified(name)) = tokio::task::block_in_place(|| { - helix_lsp::block_on(editor.close_document(doc_id, force)) - }) { + if let Err(CloseError::BufferModified(name)) = editor.close_document(doc_id, force) { Some((doc_id, name)) } else { None @@ -289,7 +305,8 @@ fn write_impl( }; if fmt.is_none() { - doc.save(path, force)?; + let id = doc.id(); + cx.editor.save(id, path, force)?; } Ok(()) @@ -569,40 +586,45 @@ fn write_all_impl( return Ok(()); } - let mut errors: Option = None; + let mut errors: Vec<&'static str> = Vec::new(); let auto_format = cx.editor.config().auto_format; let jobs = &mut cx.jobs; // save all documents - for doc in &mut cx.editor.documents.values_mut() { - if doc.path().is_none() { - errors = errors - .or_else(|| Some(String::new())) - .map(|mut errs: String| { - errs.push_str("cannot write a buffer without a filename\n"); - errs - }); - - continue; - } + let saves: Vec<_> = cx + .editor + .documents + .values() + .filter_map(|doc| { + if doc.path().is_none() { + errors.push("cannot write a buffer without a filename\n"); + return None; + } - if !doc.is_modified() { - continue; - } + if !doc.is_modified() { + return None; + } - let fmt = if auto_format { - doc.auto_format().map(|fmt| { - let callback = - make_format_callback(doc.id(), doc.version(), fmt, Some((None, force))); - jobs.add(Job::with_callback(callback).wait_before_exiting()); - }) - } else { + let fmt = if auto_format { + doc.auto_format().map(|fmt| { + let callback = + make_format_callback(doc.id(), doc.version(), fmt, Some((None, force))); + jobs.add(Job::with_callback(callback).wait_before_exiting()); + }) + } else { + None + }; + + if fmt.is_none() { + return Some(doc.id()); + } None - }; + }) + .collect(); - if fmt.is_none() { - doc.save::(None, force)?; - } + // manually call save for the rest of docs that don't have a formatter + for id in saves { + cx.editor.save::(id, None, force)?; } if quit { @@ -619,10 +641,8 @@ fn write_all_impl( } } - if let Some(errs) = errors { - if !force { - bail!(errs); - } + if !errors.is_empty() && !force { + bail!("{:?}", errors); } Ok(()) diff --git a/helix-term/src/compositor.rs b/helix-term/src/compositor.rs index 35b9d054..a4ffaff2 100644 --- a/helix-term/src/compositor.rs +++ b/helix-term/src/compositor.rs @@ -1,3 +1,4 @@ +use futures_util::StreamExt; // Each component declares it's own size constraints and gets fitted based on it's parent. // Q: how does this work with popups? // cursive does compositor.screen_mut().add_layer_at(pos::absolute(x, y), ) @@ -33,11 +34,22 @@ impl<'a> Context<'a> { pub fn block_try_flush_writes(&mut self) -> anyhow::Result<()> { tokio::task::block_in_place(|| helix_lsp::block_on(self.jobs.finish(self.editor, None)))?; - for doc in &mut self.editor.documents.values_mut() { - tokio::task::block_in_place(|| helix_lsp::block_on(doc.try_flush_saves())) - .map(|result| result.map(|_| ())) - .unwrap_or(Ok(()))?; - } + tokio::task::block_in_place(|| { + helix_lsp::block_on(async { + while let Some(save_event) = self.editor.save_queue.next().await { + match &save_event { + Ok(event) => { + let doc = doc_mut!(self.editor, &event.doc_id); + doc.set_last_saved_revision(event.revision); + } + Err(err) => { + log::error!("error saving document: {}", err); + } + }; + // TODO: if is_err: break? + } + }) + }); Ok(()) } diff --git a/helix-view/src/document.rs b/helix-view/src/document.rs index 0774e516..9fa1241e 100644 --- a/helix-view/src/document.rs +++ b/helix-view/src/document.rs @@ -13,10 +13,6 @@ use std::future::Future; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; -use tokio::sync::mpsc::error::TryRecvError; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; - -use tokio::sync::Mutex; use helix_core::{ encoding, @@ -134,9 +130,6 @@ pub struct Document { last_saved_revision: usize, version: i32, // should be usize? pub(crate) modified_since_accessed: bool, - save_sender: Option>, - save_receiver: Option>, - current_save: Arc>>, diagnostics: Vec, language_server: Option>, @@ -357,7 +350,6 @@ impl Document { let encoding = encoding.unwrap_or(encoding::UTF_8); let changes = ChangeSet::new(&text); let old_state = None; - let (save_sender, save_receiver) = tokio::sync::mpsc::unbounded_channel(); Self { id: DocumentId::default(), @@ -378,9 +370,6 @@ impl Document { savepoint: None, last_saved_revision: 0, modified_since_accessed: false, - save_sender: Some(save_sender), - save_receiver: Some(save_receiver), - current_save: Arc::new(Mutex::new(None)), language_server: None, } } @@ -519,21 +508,26 @@ impl Document { &mut self, path: Option

, force: bool, - ) -> Result<(), anyhow::Error> { - self.save_impl::, _>(path, force) + ) -> Result< + impl Future> + 'static + Send, + anyhow::Error, + > { + let path = path.map(|path| path.into()); + self.save_impl(path, force) + + // futures_util::future::Ready<_>, } /// The `Document`'s text is encoded according to its encoding and written to the file located /// at its `path()`. - fn save_impl(&mut self, path: Option

, force: bool) -> Result<(), anyhow::Error> - where - F: Future> + 'static + Send, - P: Into, - { - if self.save_sender.is_none() { - bail!("saves are closed for this document!"); - } - + fn save_impl( + &mut self, + path: Option, + force: bool, + ) -> Result< + impl Future> + 'static + Send, + anyhow::Error, + > { log::debug!( "submitting save of doc '{:?}'", self.path().map(|path| path.to_string_lossy()) @@ -544,7 +538,7 @@ impl Document { let text = self.text().clone(); let path = match path { - Some(path) => helix_core::path::get_canonicalized_path(&path.into())?, + Some(path) => helix_core::path::get_canonicalized_path(&path)?, None => { if self.path.is_none() { bail!("Can't save with no path set!"); @@ -564,7 +558,7 @@ impl Document { let encoding = self.encoding; // We encode the file according to the `Document`'s encoding. - let save_event = async move { + let future = async move { use tokio::fs::File; if let Some(parent) = path.parent() { // TODO: display a prompt asking the user if the directories should be created @@ -604,107 +598,15 @@ impl Document { Ok(event) }; - self.save_sender - .as_mut() - .unwrap() - .send(Box::pin(save_event)) - .map_err(|err| anyhow!("failed to send save event: {}", err)) - } - - pub async fn await_save(&mut self) -> Option { - self.await_save_impl(true).await - } - - async fn await_save_impl(&mut self, block: bool) -> Option { - let mut current_save = self.current_save.lock().await; - if let Some(ref mut save) = *current_save { - log::trace!("reawaiting save of '{:?}'", self.path()); - let result = save.await; - *current_save = None; - log::trace!("reawait save of '{:?}' result: {:?}", self.path(), result); - return Some(result); - } - - // return early if the receiver is closed - let rx = self.save_receiver.as_mut()?; - - let save_req = if block { - rx.recv().await - } else { - let msg = rx.try_recv(); - - if let Err(err) = msg { - match err { - TryRecvError::Empty => return None, - TryRecvError::Disconnected => None, - } - } else { - msg.ok() - } - }; - - let save = match save_req { - Some(save) => save, - None => { - self.save_receiver = None; - return None; - } - }; - - // save a handle to the future so that when a poll on this - // function gets cancelled, we don't lose it - *current_save = Some(save); - log::trace!("awaiting save of '{:?}'", self.path()); - - let result = (*current_save).as_mut().unwrap().await; - *current_save = None; - - log::trace!("save of '{:?}' result: {:?}", self.path(), result); - - Some(result) - } - - /// Flushes the queue of pending writes. If any fail, - /// it stops early before emptying the rest of the queue. - pub async fn try_flush_saves(&mut self) -> Option { - self.flush_saves_impl(false).await - } - - async fn flush_saves_impl(&mut self, block: bool) -> Option { - let mut final_result = None; - - while let Some(save_event) = self.await_save_impl(block).await { - let is_err = match &save_event { - Ok(event) => { - self.set_last_saved_revision(event.revision); - false - } - Err(err) => { - log::error!( - "error saving document {:?}: {}", - self.path().map(|path| path.to_string_lossy()), - err - ); - true - } - }; - - final_result = Some(save_event); - - if is_err { - break; - } - } - - final_result + Ok(future) } /// Prepares the Document for being closed by stopping any new writes /// and flushing through the queue of pending writes. If any fail, /// it stops early before emptying the rest of the queue. pub async fn close(&mut self) -> Option { - self.save_sender.take(); - self.flush_saves_impl(true).await + // TODO + None } /// Detect the programming language based on the file type. diff --git a/helix-view/src/editor.rs b/helix-view/src/editor.rs index fbd0b2b0..c4789ee2 100644 --- a/helix-view/src/editor.rs +++ b/helix-view/src/editor.rs @@ -1,6 +1,6 @@ use crate::{ clipboard::{get_clipboard_provider, ClipboardProvider}, - document::{DocumentSavedEventResult, Mode}, + document::{DocumentSavedEventFuture, DocumentSavedEventResult, Mode}, graphics::{CursorKind, Rect}, info::Info, input::KeyEvent, @@ -9,7 +9,7 @@ use crate::{ Document, DocumentId, View, ViewId, }; -use futures_util::stream::{select_all::SelectAll, FuturesUnordered}; +use futures_util::stream::select_all::SelectAll; use futures_util::{future, StreamExt}; use helix_lsp::Call; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -29,7 +29,7 @@ use tokio::{ time::{sleep, Duration, Instant, Sleep}, }; -use anyhow::Error; +use anyhow::{anyhow, Error}; pub use helix_core::diagnostic::Severity; pub use helix_core::register::Registers; @@ -644,12 +644,20 @@ pub struct Breakpoint { pub log_message: Option, } +use futures_util::stream::{Flatten, Once}; + pub struct Editor { /// Current editing mode. pub mode: Mode, pub tree: Tree, pub next_document_id: DocumentId, pub documents: BTreeMap, + + // We Flatten<> to resolve the inner DocumentSavedEventFuture. For that we need a stream of streams, hence the Once<>. + // https://stackoverflow.com/a/66875668 + pub saves: HashMap>>, + pub save_queue: SelectAll>>>, + pub count: Option, pub selected_register: Option, pub registers: Registers, @@ -751,6 +759,8 @@ impl Editor { tree: Tree::new(area), next_document_id: DocumentId::default(), documents: BTreeMap::new(), + saves: HashMap::new(), + save_queue: SelectAll::new(), count: None, selected_register: None, macro_recording: None, @@ -1083,6 +1093,12 @@ impl Editor { self.new_document(doc) }; + let (save_sender, save_receiver) = tokio::sync::mpsc::unbounded_channel(); + self.saves.insert(id, save_sender); + + let stream = UnboundedReceiverStream::new(save_receiver).flatten(); + self.save_queue.push(stream); + self.switch(id, action); Ok(id) } @@ -1095,38 +1111,21 @@ impl Editor { self._refresh(); } - pub async fn close_document( - &mut self, - doc_id: DocumentId, - force: bool, - ) -> Result<(), CloseError> { + pub fn close_document(&mut self, doc_id: DocumentId, force: bool) -> Result<(), CloseError> { let doc = match self.documents.get_mut(&doc_id) { Some(doc) => doc, None => return Err(CloseError::DoesNotExist), }; - - // flush out any pending writes first to clear the modified status - if let Some(Err(err)) = doc.try_flush_saves().await { - return Err(CloseError::SaveError(err)); - } - if !force && doc.is_modified() { return Err(CloseError::BufferModified(doc.display_name().into_owned())); } - if let Some(Err(err)) = doc.close().await { - return Err(CloseError::SaveError(err)); - } + // This will also disallow any follow-up writes + self.saves.remove(&doc_id); - // Don't fail the whole write because the language server could not - // acknowledge the close if let Some(language_server) = doc.language_server() { - if let Err(err) = language_server - .text_document_did_close(doc.identifier()) - .await - { - log::error!("Error closing doc in language server: {}", err); - } + // TODO: track error + tokio::spawn(language_server.text_document_did_close(doc.identifier())); } enum Action { @@ -1188,6 +1187,28 @@ impl Editor { Ok(()) } + pub fn save>( + &mut self, + doc_id: DocumentId, + path: Option

, + force: bool, + ) -> anyhow::Result<()> { + // convert a channel of futures to pipe into main queue one by one + // via stream.then() ? then push into main future + + let path = path.map(|path| path.into()); + let doc = doc_mut!(self, &doc_id); + let future = doc.save(path, force)?; + // TODO: if no self.saves for that doc id then bail + // bail!("saves are closed for this document!"); + use futures_util::stream; + self.saves[&doc_id] + .send(stream::once(Box::pin(future))) + .map_err(|err| anyhow!("failed to send save event: {}", err))?; + + Ok(()) + } + pub fn resize(&mut self, area: Rect) { if self.tree.resize(area) { self._refresh(); @@ -1307,16 +1328,10 @@ impl Editor { } pub async fn wait_event(&mut self) -> EditorEvent { - let mut saves: FuturesUnordered<_> = self - .documents - .values_mut() - .map(Document::await_save) - .collect(); - tokio::select! { biased; - Some(Some(event)) = saves.next() => { + Some(event) = self.save_queue.next() => { EditorEvent::DocumentSaved(event) } Some(config_event) = self.config_events.1.recv() => { -- cgit v1.2.3-70-g09d2