182 lines
5.3 KiB
Rust
182 lines
5.3 KiB
Rust
/*
|
|
* Copyright (C) 2023 Valentin Lorentz
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License version 3,
|
|
* as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU Affero General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
use std::sync::atomic::{AtomicU16, Ordering};
|
|
use std::sync::{Arc, OnceLock};
|
|
|
|
use color_eyre::eyre::{eyre, Result};
|
|
use eyeball_im::VectorDiff;
|
|
use futures::{FutureExt, Stream, StreamExt};
|
|
use itertools::Itertools;
|
|
use matrix_sdk::async_trait;
|
|
use matrix_sdk::ruma::OwnedRoomId;
|
|
use matrix_sdk::Client;
|
|
use matrix_sdk::Room;
|
|
use matrix_sdk_ui::timeline::{
|
|
BackPaginationStatus, PaginationOptions, RoomExt, Timeline, TimelineItem,
|
|
};
|
|
use ratatui::text::Text;
|
|
use smallvec::SmallVec;
|
|
|
|
use super::Buffer;
|
|
|
|
pub struct SingleClientRoomBuffer {
|
|
room_id: OwnedRoomId,
|
|
client: Client,
|
|
items: imbl::vector::Vector<String>,
|
|
// TODO: get rid of this trait object, we know it's matrix_sdk_ui::timeline::TimelineStream
|
|
stream: Box<dyn Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + Send + Sync + Unpin>,
|
|
timeline: Arc<Timeline>,
|
|
back_pagination_request: AtomicU16,
|
|
}
|
|
|
|
impl SingleClientRoomBuffer {
|
|
async fn poll_updates(&mut self) {
|
|
let back_pagination_request = self.back_pagination_request.swap(0, Ordering::Relaxed);
|
|
if back_pagination_request > 0 {
|
|
// TODO: run this concurrently with stream.next() below
|
|
self.spawn_back_pagination(back_pagination_request).await;
|
|
}
|
|
if let Some(changes) = self.stream.next().await {
|
|
for change in changes {
|
|
change.map(|timeline_item| format!("{:#?}", timeline_item)).apply(&mut self.items);
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn spawn_back_pagination(&self, num: u16) {
|
|
let room_id = self.room_id.clone();
|
|
let timeline = self.timeline.clone();
|
|
let mut back_pagination_status = timeline.back_pagination_status();
|
|
if back_pagination_status.get() == BackPaginationStatus::Paginating {
|
|
// We are already waiting for a backfill from the server
|
|
return;
|
|
}
|
|
tokio::spawn(async move {
|
|
tracing::info!("Starting pagination for {}", room_id);
|
|
timeline
|
|
.paginate_backwards(matrix_sdk_ui::timeline::PaginationOptions::until_num_items(
|
|
num, num,
|
|
))
|
|
.await
|
|
.unwrap_or_else(|e| tracing::error!("Failed to paginate {} backward: {}", room_id, e));
|
|
tracing::info!("Ended pagination for {}", room_id);
|
|
});
|
|
|
|
// Wait for the task we just spawned to change the status, so we don't risk starting
|
|
// a new one in the meantime
|
|
back_pagination_status.next().await;
|
|
}
|
|
}
|
|
|
|
pub struct RoomBuffer {
|
|
room_id: OwnedRoomId,
|
|
|
|
// It's unlikely users will join the same room with more than one account;
|
|
// avoid a useless heap allocation for the usual case.
|
|
buffers: SmallVec<[SingleClientRoomBuffer; 1]>,
|
|
}
|
|
|
|
fn f(b: RoomBuffer) {
|
|
g(b);
|
|
}
|
|
fn g<B: Send>(b: B) {}
|
|
|
|
impl RoomBuffer {
|
|
pub async fn new(initial_client: Client, room_id: OwnedRoomId) -> Result<Self> {
|
|
let mut self_ = RoomBuffer {
|
|
buffers: SmallVec::new(),
|
|
room_id,
|
|
};
|
|
self_.add_client(initial_client).await?;
|
|
Ok(self_)
|
|
}
|
|
|
|
pub async fn add_client(&mut self, client: Client) -> Result<()> {
|
|
let timeline = client
|
|
.get_room(&self.room_id)
|
|
.ok_or_else(|| {
|
|
tracing::error!(
|
|
"Adding {:?} for {:?}, but it does not know this room ({} other clients know this room)",
|
|
client,
|
|
self.room_id,
|
|
self.buffers.len()
|
|
);
|
|
eyre!("Unknown room {} for client {:?}", self.room_id, client)
|
|
})?
|
|
.timeline_builder()
|
|
.build()
|
|
.await;
|
|
let (items, stream) = timeline.subscribe_batched().await;
|
|
tracing::info!(
|
|
"Added client for {}, initial items: {:?}",
|
|
self.room_id,
|
|
items
|
|
);
|
|
self.buffers.push(SingleClientRoomBuffer {
|
|
room_id: self.room_id.clone(),
|
|
client,
|
|
timeline: Arc::new(timeline),
|
|
items: items // FIXME: it's always empty. why?
|
|
.into_iter()
|
|
.map(|item| format!("Initial item: {:#?}", item))
|
|
.collect(),
|
|
stream: Box::new(stream),
|
|
back_pagination_request: AtomicU16::new(0),
|
|
});
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Buffer for RoomBuffer {
|
|
fn short_name(&self) -> String {
|
|
self.room_id.as_str().to_owned()
|
|
}
|
|
|
|
async fn poll_updates(&mut self) {
|
|
futures::future::join_all(
|
|
self
|
|
.buffers
|
|
.iter_mut()
|
|
.map(SingleClientRoomBuffer::poll_updates),
|
|
)
|
|
.await;
|
|
}
|
|
|
|
fn content(&self) -> Vec<Text> {
|
|
// TODO: merge buffers, etc.
|
|
self
|
|
.buffers
|
|
.first()
|
|
.unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id))
|
|
.items
|
|
.iter()
|
|
.map(|line| Text::raw(line))
|
|
.collect()
|
|
}
|
|
|
|
fn request_back_pagination(&self, num: u16) {
|
|
// TODO: pick a client at random instead of just the first one, etc.
|
|
self
|
|
.buffers
|
|
.first()
|
|
.unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id))
|
|
.back_pagination_request
|
|
.fetch_max(num, Ordering::Relaxed);
|
|
}
|
|
}
|