diff options
Diffstat (limited to 'helix-lsp/src')
-rw-r--r-- | helix-lsp/src/client.rs | 34 | ||||
-rw-r--r-- | helix-lsp/src/lib.rs | 33 | ||||
-rw-r--r-- | helix-lsp/src/select_all.rs | 143 |
3 files changed, 191 insertions, 19 deletions
diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index 1f07cf89..160dd93b 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -23,13 +23,16 @@ use smol::{ Executor, }; +fn text_document_identifier(doc: &Document) -> lsp::TextDocumentIdentifier { + lsp::TextDocumentIdentifier::new(lsp::Url::from_file_path(doc.path().unwrap()).unwrap()) +} + pub struct Client { _process: Child, stderr: BufReader<ChildStderr>, outgoing: Sender<Payload>, - pub incoming: Receiver<Call>, - + // pub incoming: Receiver<Call>, pub request_counter: AtomicU64, capabilities: Option<lsp::ServerCapabilities>, @@ -38,7 +41,7 @@ pub struct Client { } impl Client { - pub fn start(ex: &Executor, cmd: &str, args: &[String]) -> Self { + pub fn start(ex: &Executor, cmd: &str, args: &[String]) -> (Self, Receiver<Call>) { let mut process = Command::new(cmd) .args(args) .stdin(Stdio::piped()) @@ -55,18 +58,22 @@ impl Client { let (incoming, outgoing) = Transport::start(ex, reader, writer); - Client { + let client = Client { _process: process, stderr, outgoing, - incoming, - + // incoming, request_counter: AtomicU64::new(0), capabilities: None, // diagnostics: HashMap::new(), - } + }; + + // TODO: async client.initialize() + // maybe use an arc<atomic> flag + + (client, incoming) } fn next_request_id(&self) -> jsonrpc::Id { @@ -219,7 +226,7 @@ impl Client { // Text document // ------------------------------------------------------------------------------------------- - pub async fn text_document_did_open(&mut self, doc: &Document) -> Result<()> { + pub async fn text_document_did_open(&self, doc: &Document) -> Result<()> { self.notify::<lsp::notification::DidOpenTextDocument>(lsp::DidOpenTextDocumentParams { text_document: lsp::TextDocumentItem { uri: lsp::Url::from_file_path(doc.path().unwrap()).unwrap(), @@ -295,7 +302,7 @@ impl Client { // TODO: trigger any time history.commit_revision happens pub async fn text_document_did_change( - &mut self, + &self, doc: &Document, transaction: &Transaction, ) -> Result<()> { @@ -328,6 +335,7 @@ impl Client { self.notify::<lsp::notification::DidChangeTextDocument>(lsp::DidChangeTextDocumentParams { text_document: lsp::VersionedTextDocumentIdentifier::new( + // TODO: doc.into() Url lsp::Url::from_file_path(doc.path().unwrap()).unwrap(), doc.version, ), @@ -338,18 +346,16 @@ impl Client { // TODO: impl into() TextDocumentIdentifier / VersionedTextDocumentIdentifier for Document. - pub async fn text_document_did_close(&mut self, doc: &Document) -> Result<()> { + pub async fn text_document_did_close(&self, doc: &Document) -> Result<()> { self.notify::<lsp::notification::DidCloseTextDocument>(lsp::DidCloseTextDocumentParams { - text_document: lsp::TextDocumentIdentifier::new( - lsp::Url::from_file_path(doc.path().unwrap()).unwrap(), - ), + text_document: text_document_identifier(doc), }) .await } // will_save / will_save_wait_until - pub async fn text_document_did_save(&mut self) -> anyhow::Result<()> { + pub async fn text_document_did_save(&self) -> anyhow::Result<()> { unimplemented!() } } diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index eae6fa86..c56721a5 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -1,4 +1,5 @@ mod client; +mod select_all; mod transport; pub use jsonrpc_core as jsonrpc; @@ -69,16 +70,24 @@ pub use jsonrpc::Call; type LanguageId = String; -pub static REGISTRY: Lazy<Registry> = Lazy::new(Registry::init); +use crate::select_all::SelectAll; +use smol::channel::Receiver; pub struct Registry { inner: HashMap<LanguageId, OnceCell<Arc<Client>>>, + + pub incoming: SelectAll<Receiver<Call>>, } impl Registry { - pub fn init() -> Self { + pub fn new() -> Self { + let mut inner = HashMap::new(); + + inner.insert("rust".to_string(), OnceCell::new()); + Self { - inner: HashMap::new(), + inner, + incoming: SelectAll::new(), } } @@ -91,8 +100,12 @@ impl Registry { // TODO: lookup defaults for id (name, args) // initialize a new client - let client = Client::start(&ex, "rust-analyzer", &[]); - // TODO: also call initialize().await() + let (mut client, incoming) = Client::start(&ex, "rust-analyzer", &[]); + // TODO: run this async without blocking + smol::block_on(client.initialize()).unwrap(); + + self.incoming.push(incoming); + Arc::new(client) }) }) @@ -115,3 +128,13 @@ impl Registry { // -> PROBLEM: how do you trigger an update on the editor side when data updates? // // -> The data updates should pull all events until we run out so we don't frequently re-render +// +// +// v2: +// +// there should be a registry of lsp clients, one per language type (or workspace). +// the clients should lazy init on first access +// the client.initialize() should be called async and we buffer any requests until that completes +// there needs to be a way to process incoming lsp messages from all clients. +// -> notifications need to be dispatched to wherever +// -> requests need to generate a reply and travel back to the same lsp! diff --git a/helix-lsp/src/select_all.rs b/helix-lsp/src/select_all.rs new file mode 100644 index 00000000..987f2a10 --- /dev/null +++ b/helix-lsp/src/select_all.rs @@ -0,0 +1,143 @@ +//! An unbounded set of streams + +use core::fmt::{self, Debug}; +use core::iter::FromIterator; +use core::pin::Pin; + +use smol::ready; +use smol::stream::Stream; +use std::task::{Context, Poll}; + +use futures_util::stream::FusedStream; +use futures_util::stream::{FuturesUnordered, StreamExt, StreamFuture}; + +/// An unbounded set of streams +/// +/// This "combinator" provides the ability to maintain a set of streams +/// and drive them all to completion. +/// +/// Streams are pushed into this set and their realized values are +/// yielded as they become ready. Streams will only be polled when they +/// generate notifications. This allows to coordinate a large number of streams. +/// +/// Note that you can create a ready-made `SelectAll` via the +/// `select_all` function in the `stream` module, or you can start with an +/// empty set with the `SelectAll::new` constructor. +#[must_use = "streams do nothing unless polled"] +pub struct SelectAll<St> { + inner: FuturesUnordered<StreamFuture<St>>, +} + +impl<St: Debug> Debug for SelectAll<St> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SelectAll {{ ... }}") + } +} + +impl<St: Stream + Unpin> SelectAll<St> { + /// Constructs a new, empty `SelectAll` + /// + /// The returned `SelectAll` does not contain any streams and, in this + /// state, `SelectAll::poll` will return `Poll::Ready(None)`. + pub fn new() -> Self { + Self { + inner: FuturesUnordered::new(), + } + } + + /// Returns the number of streams contained in the set. + /// + /// This represents the total number of in-flight streams. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns `true` if the set contains no streams + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Push a stream into the set. + /// + /// This function submits the given stream to the set for managing. This + /// function will not call `poll` on the submitted stream. The caller must + /// ensure that `SelectAll::poll` is called in order to receive task + /// notifications. + pub fn push(&self, stream: St) { + self.inner.push(stream.into_future()); + } +} + +impl<St: Stream + Unpin> Default for SelectAll<St> { + fn default() -> Self { + Self::new() + } +} + +impl<St: Stream + Unpin> Stream for SelectAll<St> { + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + loop { + match ready!(self.inner.poll_next_unpin(cx)) { + Some((Some(item), remaining)) => { + self.push(remaining); + return Poll::Ready(Some(item)); + } + Some((None, _)) => { + // `FuturesUnordered` thinks it isn't terminated + // because it yielded a Some. + // We do not return, but poll `FuturesUnordered` + // in the next loop iteration. + } + None => return Poll::Ready(None), + } + } + } +} + +impl<St: Stream + Unpin> FusedStream for SelectAll<St> { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + +/// Convert a list of streams into a `Stream` of results from the streams. +/// +/// This essentially takes a list of streams (e.g. a vector, an iterator, etc.) +/// and bundles them together into a single stream. +/// The stream will yield items as they become available on the underlying +/// streams internally, in the order they become available. +/// +/// Note that the returned set can also be used to dynamically push more +/// futures into the set as they become available. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +pub fn select_all<I>(streams: I) -> SelectAll<I::Item> +where + I: IntoIterator, + I::Item: Stream + Unpin, +{ + let mut set = SelectAll::new(); + + for stream in streams { + set.push(stream); + } + + set +} + +impl<St: Stream + Unpin> FromIterator<St> for SelectAll<St> { + fn from_iter<T: IntoIterator<Item = St>>(iter: T) -> Self { + select_all(iter) + } +} + +impl<St: Stream + Unpin> Extend<St> for SelectAll<St> { + fn extend<T: IntoIterator<Item = St>>(&mut self, iter: T) { + for st in iter { + self.push(st) + } + } +} |