From 91a1d9ced636f294679c6eb07ccbdc5c36da4688 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Fri, 3 Nov 2023 11:35:22 +0100 Subject: [PATCH] Backfill room history on startup --- Cargo.toml | 3 +- src/buffers/mod.rs | 11 +++++++ src/buffers/room.rs | 63 +++++++++++++++++++++++++++++++++------ src/components/backlog.rs | 9 +++++- 4 files changed, 75 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c7b2666..c6451ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,8 @@ signal-hook = "0.3.17" smallvec = "1.11.1" # Matrix -eyeball-im = "0.4.1" # immutable data structures returned by matrix-sdk-ui +eyeball = "0.8.7" # data structures observer returned by matrix-sdk-ui +eyeball-im = "0.4.1" # immutable data structures observer returned by matrix-sdk-ui imbl = "2.0" # ditto matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "91e7f2f7224b8ada17ab639d60da10dad98aeaf9", features = ["eyre", "markdown"] } matrix-sdk-ui = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "91e7f2f7224b8ada17ab639d60da10dad98aeaf9" } diff --git a/src/buffers/mod.rs b/src/buffers/mod.rs index 91d2e7a..6974a0c 100644 --- a/src/buffers/mod.rs +++ b/src/buffers/mod.rs @@ -28,6 +28,10 @@ pub trait Buffer: Send { fn short_name(&self) -> String; async fn poll_updates(&mut self) {} fn content(&self) -> Vec; // TODO: make this lazy, only the last few are used + /// Called when the user is being showned the oldest items this buffer returned. + /// + /// This should return immediately, not waiting for anything to be loaded. + fn request_back_pagination(&self, num: u16) {} } pub struct Buffers { @@ -69,4 +73,11 @@ impl Buffers { .get(self.active_index) .expect("Active buffer index does not exist") } + + pub fn active_buffer_mut(&mut self) -> &mut Box { + self + .buffers + .get_mut(self.active_index) + .expect("Active buffer index does not exist") + } } diff --git a/src/buffers/room.rs b/src/buffers/room.rs index 52a66e1..0c3cdfd 100644 --- a/src/buffers/room.rs +++ b/src/buffers/room.rs @@ -14,8 +14,8 @@ * along with this program. If not, see . */ -use std::pin::Pin; -use std::sync::Arc; +use std::sync::atomic::{AtomicU16, Ordering}; +use std::sync::{Arc, OnceLock}; use color_eyre::eyre::{eyre, Result}; use eyeball_im::VectorDiff; @@ -25,22 +25,31 @@ use matrix_sdk::async_trait; use matrix_sdk::ruma::OwnedRoomId; use matrix_sdk::Client; use matrix_sdk::Room; -use matrix_sdk_ui::timeline::{RoomExt, Timeline, TimelineItem}; +use matrix_sdk_ui::timeline::{ + BackPaginationStatus, PaginationOptions, RoomExt, Timeline, TimelineItem, +}; use ratatui::text::Text; use smallvec::SmallVec; -use tokio::pin; use super::Buffer; pub struct SingleClientRoomBuffer { + room_id: OwnedRoomId, client: Client, items: imbl::vector::Vector, // TODO: get rid of this trait object, we know it's matrix_sdk_ui::timeline::TimelineStream - stream: Box>>> + Send + Unpin>, + stream: Box>>> + Send + Sync + Unpin>, + timeline: Arc, + back_pagination_request: AtomicU16, } impl SingleClientRoomBuffer { async fn poll_updates(&mut self) { + let back_pagination_request = self.back_pagination_request.swap(0, Ordering::Relaxed); + if back_pagination_request > 0 { + // TODO: run this concurrently with stream.next() below + self.spawn_back_pagination(back_pagination_request).await; + } self.items.extend( self .stream @@ -49,6 +58,30 @@ impl SingleClientRoomBuffer { .map(|change| format!("New items: {:#?}", change)), ); } + + async fn spawn_back_pagination(&self, num: u16) { + let room_id = self.room_id.clone(); + let timeline = self.timeline.clone(); + let mut back_pagination_status = timeline.back_pagination_status(); + if back_pagination_status.get() == BackPaginationStatus::Paginating { + // We are already waiting for a backfill from the server + return; + } + tokio::spawn(async move { + tracing::info!("Starting pagination for {}", room_id); + timeline + .paginate_backwards(matrix_sdk_ui::timeline::PaginationOptions::until_num_items( + num, num, + )) + .await + .unwrap_or_else(|e| tracing::error!("Failed to paginate {} backward: {}", room_id, e)); + tracing::info!("Ended pagination for {}", room_id); + }); + + // Wait for the task we just spawned to change the status, so we don't risk starting + // a new one in the meantime + back_pagination_status.next().await; + } } pub struct RoomBuffer { @@ -75,7 +108,7 @@ impl RoomBuffer { } pub async fn add_client(&mut self, client: Client) -> Result<()> { - let (items, stream) = client + let timeline = client .get_room(&self.room_id) .ok_or_else(|| { tracing::error!( @@ -88,21 +121,23 @@ impl RoomBuffer { })? .timeline_builder() .build() - .await - .subscribe_batched() .await; + let (items, stream) = timeline.subscribe_batched().await; tracing::info!( "Added client for {}, initial items: {:?}", self.room_id, items ); self.buffers.push(SingleClientRoomBuffer { + room_id: self.room_id.clone(), client, + timeline: Arc::new(timeline), items: items // FIXME: it's always empty. why? .into_iter() .map(|item| format!("Initial item: {:#?}", item)) .collect(), stream: Box::new(stream), + back_pagination_request: AtomicU16::new(0), }); Ok(()) } @@ -132,7 +167,17 @@ impl Buffer for RoomBuffer { .unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id)) .items .iter() - .map(|line|Text::raw(line)) + .map(|line| Text::raw(line)) .collect() } + + fn request_back_pagination(&self, num: u16) { + // TODO: pick a client at random instead of just the first one, etc. + self + .buffers + .first() + .unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id)) + .back_pagination_request + .fetch_max(num, Ordering::Relaxed); + } } diff --git a/src/components/backlog.rs b/src/components/backlog.rs index 1ffdfda..d2f9fdd 100644 --- a/src/components/backlog.rs +++ b/src/components/backlog.rs @@ -34,7 +34,8 @@ impl Component for Backlog { let mut text_area = block.inner(area); block.render(area, frame.buffer_mut()); - let mut items = buffers.active_buffer().content(); + let active_buffer = buffers.active_buffer(); + let mut items = active_buffer.content(); items.reverse(); for item in items { let widget = BottomAlignedParagraph::new(item); @@ -42,6 +43,12 @@ impl Component for Backlog { assert!(area.height >= height, "{:?} {}", area, height); text_area.height -= height; // Remove lines at the bottom used by this paragraph } + + // If there is empty room on screen, ask the buffer to fetch more backlog if it can + if text_area.height > 0 { + active_buffer.request_back_pagination(100); + } + Ok(()) } }