Backfill room history on startup

This commit is contained in:
2023-11-03 11:35:22 +01:00
parent 347c183a8c
commit 91a1d9ced6
4 changed files with 75 additions and 11 deletions

View File

@ -50,7 +50,8 @@ signal-hook = "0.3.17"
smallvec = "1.11.1"
# Matrix
eyeball-im = "0.4.1" # immutable data structures returned by matrix-sdk-ui
eyeball = "0.8.7" # data structures observer returned by matrix-sdk-ui
eyeball-im = "0.4.1" # immutable data structures observer returned by matrix-sdk-ui
imbl = "2.0" # ditto
matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "91e7f2f7224b8ada17ab639d60da10dad98aeaf9", features = ["eyre", "markdown"] }
matrix-sdk-ui = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "91e7f2f7224b8ada17ab639d60da10dad98aeaf9" }

View File

@ -28,6 +28,10 @@ pub trait Buffer: Send {
fn short_name(&self) -> String;
async fn poll_updates(&mut self) {}
fn content(&self) -> Vec<ratatui::text::Text>; // TODO: make this lazy, only the last few are used
/// Called when the user is being showned the oldest items this buffer returned.
///
/// This should return immediately, not waiting for anything to be loaded.
fn request_back_pagination(&self, num: u16) {}
}
pub struct Buffers {
@ -69,4 +73,11 @@ impl Buffers {
.get(self.active_index)
.expect("Active buffer index does not exist")
}
pub fn active_buffer_mut(&mut self) -> &mut Box<dyn Buffer> {
self
.buffers
.get_mut(self.active_index)
.expect("Active buffer index does not exist")
}
}

View File

@ -14,8 +14,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{Arc, OnceLock};
use color_eyre::eyre::{eyre, Result};
use eyeball_im::VectorDiff;
@ -25,22 +25,31 @@ use matrix_sdk::async_trait;
use matrix_sdk::ruma::OwnedRoomId;
use matrix_sdk::Client;
use matrix_sdk::Room;
use matrix_sdk_ui::timeline::{RoomExt, Timeline, TimelineItem};
use matrix_sdk_ui::timeline::{
BackPaginationStatus, PaginationOptions, RoomExt, Timeline, TimelineItem,
};
use ratatui::text::Text;
use smallvec::SmallVec;
use tokio::pin;
use super::Buffer;
pub struct SingleClientRoomBuffer {
room_id: OwnedRoomId,
client: Client,
items: imbl::vector::Vector<String>,
// TODO: get rid of this trait object, we know it's matrix_sdk_ui::timeline::TimelineStream
stream: Box<dyn Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + Send + Unpin>,
stream: Box<dyn Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + Send + Sync + Unpin>,
timeline: Arc<Timeline>,
back_pagination_request: AtomicU16,
}
impl SingleClientRoomBuffer {
async fn poll_updates(&mut self) {
let back_pagination_request = self.back_pagination_request.swap(0, Ordering::Relaxed);
if back_pagination_request > 0 {
// TODO: run this concurrently with stream.next() below
self.spawn_back_pagination(back_pagination_request).await;
}
self.items.extend(
self
.stream
@ -49,6 +58,30 @@ impl SingleClientRoomBuffer {
.map(|change| format!("New items: {:#?}", change)),
);
}
async fn spawn_back_pagination(&self, num: u16) {
let room_id = self.room_id.clone();
let timeline = self.timeline.clone();
let mut back_pagination_status = timeline.back_pagination_status();
if back_pagination_status.get() == BackPaginationStatus::Paginating {
// We are already waiting for a backfill from the server
return;
}
tokio::spawn(async move {
tracing::info!("Starting pagination for {}", room_id);
timeline
.paginate_backwards(matrix_sdk_ui::timeline::PaginationOptions::until_num_items(
num, num,
))
.await
.unwrap_or_else(|e| tracing::error!("Failed to paginate {} backward: {}", room_id, e));
tracing::info!("Ended pagination for {}", room_id);
});
// Wait for the task we just spawned to change the status, so we don't risk starting
// a new one in the meantime
back_pagination_status.next().await;
}
}
pub struct RoomBuffer {
@ -75,7 +108,7 @@ impl RoomBuffer {
}
pub async fn add_client(&mut self, client: Client) -> Result<()> {
let (items, stream) = client
let timeline = client
.get_room(&self.room_id)
.ok_or_else(|| {
tracing::error!(
@ -88,21 +121,23 @@ impl RoomBuffer {
})?
.timeline_builder()
.build()
.await
.subscribe_batched()
.await;
let (items, stream) = timeline.subscribe_batched().await;
tracing::info!(
"Added client for {}, initial items: {:?}",
self.room_id,
items
);
self.buffers.push(SingleClientRoomBuffer {
room_id: self.room_id.clone(),
client,
timeline: Arc::new(timeline),
items: items // FIXME: it's always empty. why?
.into_iter()
.map(|item| format!("Initial item: {:#?}", item))
.collect(),
stream: Box::new(stream),
back_pagination_request: AtomicU16::new(0),
});
Ok(())
}
@ -132,7 +167,17 @@ impl Buffer for RoomBuffer {
.unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id))
.items
.iter()
.map(|line|Text::raw(line))
.map(|line| Text::raw(line))
.collect()
}
fn request_back_pagination(&self, num: u16) {
// TODO: pick a client at random instead of just the first one, etc.
self
.buffers
.first()
.unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id))
.back_pagination_request
.fetch_max(num, Ordering::Relaxed);
}
}

View File

@ -34,7 +34,8 @@ impl Component for Backlog {
let mut text_area = block.inner(area);
block.render(area, frame.buffer_mut());
let mut items = buffers.active_buffer().content();
let active_buffer = buffers.active_buffer();
let mut items = active_buffer.content();
items.reverse();
for item in items {
let widget = BottomAlignedParagraph::new(item);
@ -42,6 +43,12 @@ impl Component for Backlog {
assert!(area.height >= height, "{:?} {}", area, height);
text_area.height -= height; // Remove lines at the bottom used by this paragraph
}
// If there is empty room on screen, ask the buffer to fetch more backlog if it can
if text_area.height > 0 {
active_buffer.request_back_pagination(100);
}
Ok(())
}
}