aboutsummaryrefslogtreecommitdiff
path: root/helix-view
diff options
context:
space:
mode:
authorBlaž Hrastnik2022-10-14 07:22:21 +0000
committerSkyler Hawthorne2022-10-19 02:31:39 +0000
commit30c93994b50888aaeb32c65c90426e997800ccea (patch)
treeb1c62f58120f58b3a12679e59e4afa1508aa8152 /helix-view
parentbeb3427bfbaa88bec8b4c683e342f85eb53ad77d (diff)
Use a single save_queue on the editor
Diffstat (limited to 'helix-view')
-rw-r--r--helix-view/src/document.rs140
-rw-r--r--helix-view/src/editor.rs79
2 files changed, 68 insertions, 151 deletions
diff --git a/helix-view/src/document.rs b/helix-view/src/document.rs
index 0774e516..9fa1241e 100644
--- a/helix-view/src/document.rs
+++ b/helix-view/src/document.rs
@@ -13,10 +13,6 @@ use std::future::Future;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
-use tokio::sync::mpsc::error::TryRecvError;
-use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
-
-use tokio::sync::Mutex;
use helix_core::{
encoding,
@@ -134,9 +130,6 @@ pub struct Document {
last_saved_revision: usize,
version: i32, // should be usize?
pub(crate) modified_since_accessed: bool,
- save_sender: Option<UnboundedSender<DocumentSavedEventFuture>>,
- save_receiver: Option<UnboundedReceiver<DocumentSavedEventFuture>>,
- current_save: Arc<Mutex<Option<DocumentSavedEventFuture>>>,
diagnostics: Vec<Diagnostic>,
language_server: Option<Arc<helix_lsp::Client>>,
@@ -357,7 +350,6 @@ impl Document {
let encoding = encoding.unwrap_or(encoding::UTF_8);
let changes = ChangeSet::new(&text);
let old_state = None;
- let (save_sender, save_receiver) = tokio::sync::mpsc::unbounded_channel();
Self {
id: DocumentId::default(),
@@ -378,9 +370,6 @@ impl Document {
savepoint: None,
last_saved_revision: 0,
modified_since_accessed: false,
- save_sender: Some(save_sender),
- save_receiver: Some(save_receiver),
- current_save: Arc::new(Mutex::new(None)),
language_server: None,
}
}
@@ -519,21 +508,26 @@ impl Document {
&mut self,
path: Option<P>,
force: bool,
- ) -> Result<(), anyhow::Error> {
- self.save_impl::<futures_util::future::Ready<_>, _>(path, force)
+ ) -> Result<
+ impl Future<Output = Result<DocumentSavedEvent, anyhow::Error>> + 'static + Send,
+ anyhow::Error,
+ > {
+ let path = path.map(|path| path.into());
+ self.save_impl(path, force)
+
+ // futures_util::future::Ready<_>,
}
/// The `Document`'s text is encoded according to its encoding and written to the file located
/// at its `path()`.
- fn save_impl<F, P>(&mut self, path: Option<P>, force: bool) -> Result<(), anyhow::Error>
- where
- F: Future<Output = Result<Transaction, FormatterError>> + 'static + Send,
- P: Into<PathBuf>,
- {
- if self.save_sender.is_none() {
- bail!("saves are closed for this document!");
- }
-
+ fn save_impl(
+ &mut self,
+ path: Option<PathBuf>,
+ force: bool,
+ ) -> Result<
+ impl Future<Output = Result<DocumentSavedEvent, anyhow::Error>> + 'static + Send,
+ anyhow::Error,
+ > {
log::debug!(
"submitting save of doc '{:?}'",
self.path().map(|path| path.to_string_lossy())
@@ -544,7 +538,7 @@ impl Document {
let text = self.text().clone();
let path = match path {
- Some(path) => helix_core::path::get_canonicalized_path(&path.into())?,
+ Some(path) => helix_core::path::get_canonicalized_path(&path)?,
None => {
if self.path.is_none() {
bail!("Can't save with no path set!");
@@ -564,7 +558,7 @@ impl Document {
let encoding = self.encoding;
// We encode the file according to the `Document`'s encoding.
- let save_event = async move {
+ let future = async move {
use tokio::fs::File;
if let Some(parent) = path.parent() {
// TODO: display a prompt asking the user if the directories should be created
@@ -604,107 +598,15 @@ impl Document {
Ok(event)
};
- self.save_sender
- .as_mut()
- .unwrap()
- .send(Box::pin(save_event))
- .map_err(|err| anyhow!("failed to send save event: {}", err))
- }
-
- pub async fn await_save(&mut self) -> Option<DocumentSavedEventResult> {
- self.await_save_impl(true).await
- }
-
- async fn await_save_impl(&mut self, block: bool) -> Option<DocumentSavedEventResult> {
- let mut current_save = self.current_save.lock().await;
- if let Some(ref mut save) = *current_save {
- log::trace!("reawaiting save of '{:?}'", self.path());
- let result = save.await;
- *current_save = None;
- log::trace!("reawait save of '{:?}' result: {:?}", self.path(), result);
- return Some(result);
- }
-
- // return early if the receiver is closed
- let rx = self.save_receiver.as_mut()?;
-
- let save_req = if block {
- rx.recv().await
- } else {
- let msg = rx.try_recv();
-
- if let Err(err) = msg {
- match err {
- TryRecvError::Empty => return None,
- TryRecvError::Disconnected => None,
- }
- } else {
- msg.ok()
- }
- };
-
- let save = match save_req {
- Some(save) => save,
- None => {
- self.save_receiver = None;
- return None;
- }
- };
-
- // save a handle to the future so that when a poll on this
- // function gets cancelled, we don't lose it
- *current_save = Some(save);
- log::trace!("awaiting save of '{:?}'", self.path());
-
- let result = (*current_save).as_mut().unwrap().await;
- *current_save = None;
-
- log::trace!("save of '{:?}' result: {:?}", self.path(), result);
-
- Some(result)
- }
-
- /// Flushes the queue of pending writes. If any fail,
- /// it stops early before emptying the rest of the queue.
- pub async fn try_flush_saves(&mut self) -> Option<DocumentSavedEventResult> {
- self.flush_saves_impl(false).await
- }
-
- async fn flush_saves_impl(&mut self, block: bool) -> Option<DocumentSavedEventResult> {
- let mut final_result = None;
-
- while let Some(save_event) = self.await_save_impl(block).await {
- let is_err = match &save_event {
- Ok(event) => {
- self.set_last_saved_revision(event.revision);
- false
- }
- Err(err) => {
- log::error!(
- "error saving document {:?}: {}",
- self.path().map(|path| path.to_string_lossy()),
- err
- );
- true
- }
- };
-
- final_result = Some(save_event);
-
- if is_err {
- break;
- }
- }
-
- final_result
+ Ok(future)
}
/// Prepares the Document for being closed by stopping any new writes
/// and flushing through the queue of pending writes. If any fail,
/// it stops early before emptying the rest of the queue.
pub async fn close(&mut self) -> Option<DocumentSavedEventResult> {
- self.save_sender.take();
- self.flush_saves_impl(true).await
+ // TODO
+ None
}
/// Detect the programming language based on the file type.
diff --git a/helix-view/src/editor.rs b/helix-view/src/editor.rs
index fbd0b2b0..c4789ee2 100644
--- a/helix-view/src/editor.rs
+++ b/helix-view/src/editor.rs
@@ -1,6 +1,6 @@
use crate::{
clipboard::{get_clipboard_provider, ClipboardProvider},
- document::{DocumentSavedEventResult, Mode},
+ document::{DocumentSavedEventFuture, DocumentSavedEventResult, Mode},
graphics::{CursorKind, Rect},
info::Info,
input::KeyEvent,
@@ -9,7 +9,7 @@ use crate::{
Document, DocumentId, View, ViewId,
};
-use futures_util::stream::{select_all::SelectAll, FuturesUnordered};
+use futures_util::stream::select_all::SelectAll;
use futures_util::{future, StreamExt};
use helix_lsp::Call;
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -29,7 +29,7 @@ use tokio::{
time::{sleep, Duration, Instant, Sleep},
};
-use anyhow::Error;
+use anyhow::{anyhow, Error};
pub use helix_core::diagnostic::Severity;
pub use helix_core::register::Registers;
@@ -644,12 +644,20 @@ pub struct Breakpoint {
pub log_message: Option<String>,
}
+use futures_util::stream::{Flatten, Once};
+
pub struct Editor {
/// Current editing mode.
pub mode: Mode,
pub tree: Tree,
pub next_document_id: DocumentId,
pub documents: BTreeMap<DocumentId, Document>,
+
+ // We Flatten<> to resolve the inner DocumentSavedEventFuture. For that we need a stream of streams, hence the Once<>.
+ // https://stackoverflow.com/a/66875668
+ pub saves: HashMap<DocumentId, UnboundedSender<Once<DocumentSavedEventFuture>>>,
+ pub save_queue: SelectAll<Flatten<UnboundedReceiverStream<Once<DocumentSavedEventFuture>>>>,
+
pub count: Option<std::num::NonZeroUsize>,
pub selected_register: Option<char>,
pub registers: Registers,
@@ -751,6 +759,8 @@ impl Editor {
tree: Tree::new(area),
next_document_id: DocumentId::default(),
documents: BTreeMap::new(),
+ saves: HashMap::new(),
+ save_queue: SelectAll::new(),
count: None,
selected_register: None,
macro_recording: None,
@@ -1083,6 +1093,12 @@ impl Editor {
self.new_document(doc)
};
+ let (save_sender, save_receiver) = tokio::sync::mpsc::unbounded_channel();
+ self.saves.insert(id, save_sender);
+
+ let stream = UnboundedReceiverStream::new(save_receiver).flatten();
+ self.save_queue.push(stream);
+
self.switch(id, action);
Ok(id)
}
@@ -1095,38 +1111,21 @@ impl Editor {
self._refresh();
}
- pub async fn close_document(
- &mut self,
- doc_id: DocumentId,
- force: bool,
- ) -> Result<(), CloseError> {
+ pub fn close_document(&mut self, doc_id: DocumentId, force: bool) -> Result<(), CloseError> {
let doc = match self.documents.get_mut(&doc_id) {
Some(doc) => doc,
None => return Err(CloseError::DoesNotExist),
};
-
- // flush out any pending writes first to clear the modified status
- if let Some(Err(err)) = doc.try_flush_saves().await {
- return Err(CloseError::SaveError(err));
- }
-
if !force && doc.is_modified() {
return Err(CloseError::BufferModified(doc.display_name().into_owned()));
}
- if let Some(Err(err)) = doc.close().await {
- return Err(CloseError::SaveError(err));
- }
+ // This will also disallow any follow-up writes
+ self.saves.remove(&doc_id);
- // Don't fail the whole write because the language server could not
- // acknowledge the close
if let Some(language_server) = doc.language_server() {
- if let Err(err) = language_server
- .text_document_did_close(doc.identifier())
- .await
- {
- log::error!("Error closing doc in language server: {}", err);
- }
+ // TODO: track error
+ tokio::spawn(language_server.text_document_did_close(doc.identifier()));
}
enum Action {
@@ -1188,6 +1187,28 @@ impl Editor {
Ok(())
}
+ pub fn save<P: Into<PathBuf>>(
+ &mut self,
+ doc_id: DocumentId,
+ path: Option<P>,
+ force: bool,
+ ) -> anyhow::Result<()> {
+ // convert a channel of futures to pipe into main queue one by one
+ // via stream.then() ? then push into main future
+
+ let path = path.map(|path| path.into());
+ let doc = doc_mut!(self, &doc_id);
+ let future = doc.save(path, force)?;
+ // TODO: if no self.saves for that doc id then bail
+ // bail!("saves are closed for this document!");
+ use futures_util::stream;
+ self.saves[&doc_id]
+ .send(stream::once(Box::pin(future)))
+ .map_err(|err| anyhow!("failed to send save event: {}", err))?;
+
+ Ok(())
+ }
+
pub fn resize(&mut self, area: Rect) {
if self.tree.resize(area) {
self._refresh();
@@ -1307,16 +1328,10 @@ impl Editor {
}
pub async fn wait_event(&mut self) -> EditorEvent {
- let mut saves: FuturesUnordered<_> = self
- .documents
- .values_mut()
- .map(Document::await_save)
- .collect();
-
tokio::select! {
biased;
- Some(Some(event)) = saves.next() => {
+ Some(event) = self.save_queue.next() => {
EditorEvent::DocumentSaved(event)
}
Some(config_event) = self.config_events.1.recv() => {