From fda09e01277c64d2dee2be2b178f6237ffc45f58 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Thu, 2 Nov 2023 12:00:40 +0100 Subject: [PATCH] Poll room updates and display them --- Cargo.toml | 6 +++ src/app.rs | 31 ++++++++---- src/buffers/mod.rs | 9 +++- src/buffers/room.rs | 113 +++++++++++++++++++++++++++++++++++++++----- 4 files changed, 137 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f4fc7ac..3bb8f06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ human-panic = "1.2.0" # Internal inventory = "0.3" +itertools = "0.11.0" lazy_static = "1.4.0" libc = "0.2.148" log = "0.4.20" @@ -47,7 +48,12 @@ signal-hook = "0.3.17" smallvec = "1.11.1" # Matrix +eyeball-im = "0.4.1" # immutable data structures returned by matrix-sdk-ui +imbl = "2.0" # ditto matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "91e7f2f7224b8ada17ab639d60da10dad98aeaf9", features = ["eyre", "markdown"] } +matrix-sdk-ui = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "91e7f2f7224b8ada17ab639d60da10dad98aeaf9" } +#matrix-sdk = { path = "../matrix-rust-sdk/crates/matrix-sdk", features = ["eyre", "markdown"] } +#matrix-sdk-ui = { path = "../matrix-rust-sdk/crates/matrix-sdk-ui" } # UI ansi-to-tui = "3.1.0" diff --git a/src/app.rs b/src/app.rs index f4c3864..6fb1276 100644 --- a/src/app.rs +++ b/src/app.rs @@ -22,7 +22,7 @@ use std::sync::RwLock; use color_eyre::eyre::{Result, WrapErr}; use crossterm::event::KeyEvent; use futures::stream::FuturesUnordered; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use nonempty::NonEmpty; use ratatui::prelude::Rect; use serde::{Deserialize, Serialize}; @@ -155,14 +155,24 @@ impl App { component.init(tui.size()?)?; } - for client in &self.clients { - for room in client.joined_rooms() { - self.buffers.push(Box::new(RoomBuffer::new( - [client.clone()], - room.room_id().to_owned(), - ))); - } - } + futures::future::join_all( + self + .clients + .iter() + .flat_map(|client| { + client + .joined_rooms() + .into_iter() + .map(|room| (client.clone(), room)) + }) + .map(|(client, room)| RoomBuffer::new(client, room.room_id().to_owned())) + .map(|fut| fut.map(|res| res.expect("Failed to create RoomBuffer at startup"))), + ) + .await + .into_iter() + .map(|res| res) + .map(Box::new) + .for_each(|buffer| self.buffers.push(buffer)); let sync_clients = self.clients.clone(); let mut sync_results = FuturesUnordered::new(); @@ -211,6 +221,9 @@ impl App { let (client, sync_response) = sync_response.expect("sync_responses_rx unexpectedly closed"); self.handle_sync_response(&action_tx, client, sync_response).await.context("Error while handling sync response")?; } + poll_updates = futures::future::join_all( + self.buffers.iter_mut().map(|buf| buf.poll_updates()) + ) => {} sync_result = sync_results.next() => { if !self.should_quit.load(Ordering::Acquire) { panic!("Sync ended unexpected: {:?}", sync_result); diff --git a/src/buffers/mod.rs b/src/buffers/mod.rs index c7dada6..a0d5579 100644 --- a/src/buffers/mod.rs +++ b/src/buffers/mod.rs @@ -14,6 +14,7 @@ * along with this program. If not, see . */ +use matrix_sdk::async_trait; use nonempty::NonEmpty; mod log; @@ -21,9 +22,11 @@ pub use log::LogBuffer; mod room; pub use room::RoomBuffer; -pub trait Buffer { +#[async_trait] +pub trait Buffer: Send { /// A short human-readable name for the room, eg. to show in compact buflist fn short_name(&self) -> String; + async fn poll_updates(&mut self) {} fn content(&self) -> ratatui::text::Text; } @@ -44,6 +47,10 @@ impl Buffers { self.buffers.iter().map(|buffer_box| &**buffer_box) } + pub fn iter_mut(&mut self) -> impl Iterator> { + self.buffers.iter_mut() + } + pub fn push(&mut self, buffer: Box) { self.buffers.push(buffer) } diff --git a/src/buffers/room.rs b/src/buffers/room.rs index 679c3a2..5487b9c 100644 --- a/src/buffers/room.rs +++ b/src/buffers/room.rs @@ -14,35 +14,124 @@ * along with this program. If not, see . */ +use std::pin::Pin; +use std::sync::Arc; + +use color_eyre::eyre::{eyre, Result}; +use eyeball_im::VectorDiff; +use futures::{FutureExt, Stream, StreamExt}; +use itertools::Itertools; +use matrix_sdk::async_trait; use matrix_sdk::ruma::OwnedRoomId; use matrix_sdk::Client; use matrix_sdk::Room; +use matrix_sdk_ui::timeline::{RoomExt, Timeline, TimelineItem}; use smallvec::SmallVec; +use tokio::pin; use super::Buffer; -pub struct RoomBuffer { - // It's unlikely users will join the same room with more than one account; - // avoid a useless heap allocation for the usual case. - clients: SmallVec<[Client; 1]>, - room_id: OwnedRoomId, +pub struct SingleClientRoomBuffer { + client: Client, + items: imbl::vector::Vector, + // TODO: get rid of this trait object, we know it's matrix_sdk_ui::timeline::TimelineStream + stream: Box>>> + Send + Unpin>, } -impl RoomBuffer { - pub fn new>(clients: Clients, room_id: OwnedRoomId) -> Self { - RoomBuffer { - clients: clients.into_iter().collect(), - room_id, - } +impl SingleClientRoomBuffer { + async fn poll_updates(&mut self) { + self.items.extend( + self + .stream + .next() + .await + .map(|change| format!("New item: {:#?}", change)), + ); } } +pub struct RoomBuffer { + room_id: OwnedRoomId, + + // It's unlikely users will join the same room with more than one account; + // avoid a useless heap allocation for the usual case. + buffers: SmallVec<[SingleClientRoomBuffer; 1]>, +} + +fn f(b: RoomBuffer) { + g(b); +} +fn g(b: B) {} + +impl RoomBuffer { + pub async fn new(initial_client: Client, room_id: OwnedRoomId) -> Result { + let mut self_ = RoomBuffer { + buffers: SmallVec::new(), + room_id, + }; + self_.add_client(initial_client).await?; + Ok(self_) + } + + pub async fn add_client(&mut self, client: Client) -> Result<()> { + let (items, stream) = client + .get_room(&self.room_id) + .ok_or_else(|| { + tracing::error!( + "Adding {:?} for {:?}, but it does not know this room ({} other clients know this room)", + client, + self.room_id, + self.buffers.len() + ); + eyre!("Unknown room {} for client {:?}", self.room_id, client) + })? + .timeline_builder() + .build() + .await + .subscribe_batched() + .await; + tracing::info!( + "Added client for {}, initial items: {:?}", + self.room_id, + items + ); + self.buffers.push(SingleClientRoomBuffer { + client, + items: items // FIXME: it's always empty. why? + .into_iter() + .map(|item| format!("Initial item: {:#?}", item)) + .collect(), + stream: Box::new(stream), + }); + Ok(()) + } +} + +#[async_trait] impl Buffer for RoomBuffer { fn short_name(&self) -> String { self.room_id.as_str().to_owned() } + async fn poll_updates(&mut self) { + futures::future::join_all( + self + .buffers + .iter_mut() + .map(SingleClientRoomBuffer::poll_updates), + ) + .await; + } + fn content(&self) -> ratatui::text::Text { - format!("Timeline of {} goes here", self.room_id).into() + // TODO: merge buffers, etc. + self + .buffers + .first() + .unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id)) + .items + .iter() + .join("\n") + .into() } }