From 955cb81687db139422875d02b534d753dac5603f Mon Sep 17 00:00:00 2001 From: Blaž Hrastnik Date: Wed, 23 Dec 2020 15:50:16 +0900 Subject: Init lsp through the registry. --- helix-lsp/src/select_all.rs | 143 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 helix-lsp/src/select_all.rs (limited to 'helix-lsp/src/select_all.rs') diff --git a/helix-lsp/src/select_all.rs b/helix-lsp/src/select_all.rs new file mode 100644 index 00000000..987f2a10 --- /dev/null +++ b/helix-lsp/src/select_all.rs @@ -0,0 +1,143 @@ +//! An unbounded set of streams + +use core::fmt::{self, Debug}; +use core::iter::FromIterator; +use core::pin::Pin; + +use smol::ready; +use smol::stream::Stream; +use std::task::{Context, Poll}; + +use futures_util::stream::FusedStream; +use futures_util::stream::{FuturesUnordered, StreamExt, StreamFuture}; + +/// An unbounded set of streams +/// +/// This "combinator" provides the ability to maintain a set of streams +/// and drive them all to completion. +/// +/// Streams are pushed into this set and their realized values are +/// yielded as they become ready. Streams will only be polled when they +/// generate notifications. This allows to coordinate a large number of streams. +/// +/// Note that you can create a ready-made `SelectAll` via the +/// `select_all` function in the `stream` module, or you can start with an +/// empty set with the `SelectAll::new` constructor. +#[must_use = "streams do nothing unless polled"] +pub struct SelectAll { + inner: FuturesUnordered>, +} + +impl Debug for SelectAll { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SelectAll {{ ... }}") + } +} + +impl SelectAll { + /// Constructs a new, empty `SelectAll` + /// + /// The returned `SelectAll` does not contain any streams and, in this + /// state, `SelectAll::poll` will return `Poll::Ready(None)`. + pub fn new() -> Self { + Self { + inner: FuturesUnordered::new(), + } + } + + /// Returns the number of streams contained in the set. + /// + /// This represents the total number of in-flight streams. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns `true` if the set contains no streams + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Push a stream into the set. + /// + /// This function submits the given stream to the set for managing. This + /// function will not call `poll` on the submitted stream. The caller must + /// ensure that `SelectAll::poll` is called in order to receive task + /// notifications. + pub fn push(&self, stream: St) { + self.inner.push(stream.into_future()); + } +} + +impl Default for SelectAll { + fn default() -> Self { + Self::new() + } +} + +impl Stream for SelectAll { + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match ready!(self.inner.poll_next_unpin(cx)) { + Some((Some(item), remaining)) => { + self.push(remaining); + return Poll::Ready(Some(item)); + } + Some((None, _)) => { + // `FuturesUnordered` thinks it isn't terminated + // because it yielded a Some. + // We do not return, but poll `FuturesUnordered` + // in the next loop iteration. + } + None => return Poll::Ready(None), + } + } + } +} + +impl FusedStream for SelectAll { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + +/// Convert a list of streams into a `Stream` of results from the streams. +/// +/// This essentially takes a list of streams (e.g. a vector, an iterator, etc.) +/// and bundles them together into a single stream. +/// The stream will yield items as they become available on the underlying +/// streams internally, in the order they become available. +/// +/// Note that the returned set can also be used to dynamically push more +/// futures into the set as they become available. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +pub fn select_all(streams: I) -> SelectAll +where + I: IntoIterator, + I::Item: Stream + Unpin, +{ + let mut set = SelectAll::new(); + + for stream in streams { + set.push(stream); + } + + set +} + +impl FromIterator for SelectAll { + fn from_iter>(iter: T) -> Self { + select_all(iter) + } +} + +impl Extend for SelectAll { + fn extend>(&mut self, iter: T) { + for st in iter { + self.push(st) + } + } +} -- cgit v1.2.3-70-g09d2