|
|
|
@ -21,6 +21,7 @@ use std::sync::{Arc, OnceLock};
|
|
|
|
|
|
|
|
|
|
use chrono::{offset::Local, DateTime};
|
|
|
|
|
use color_eyre::eyre::{eyre, Result};
|
|
|
|
|
use eyeball::SharedObservable;
|
|
|
|
|
use eyeball_im::VectorDiff;
|
|
|
|
|
use futures::future::OptionFuture;
|
|
|
|
|
use futures::stream::FuturesUnordered;
|
|
|
|
@ -48,7 +49,9 @@ use memuse::DynamicUsage;
|
|
|
|
|
use ratatui::text::Text;
|
|
|
|
|
use smallvec::SmallVec;
|
|
|
|
|
use sorted_vec::SortedVec;
|
|
|
|
|
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
|
|
|
|
use tokio::sync::oneshot;
|
|
|
|
|
use tokio::sync::RwLock;
|
|
|
|
|
|
|
|
|
|
use super::{Buffer, BufferId, BufferItem, BufferItemContent, BufferSortKey, FullyReadStatus};
|
|
|
|
|
use crate::config::Config;
|
|
|
|
@ -116,6 +119,25 @@ impl DynamicUsage for OwnedBufferItemContent {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct SingleClientRoomBufferInner {
|
|
|
|
|
latest_event_id: Option<OwnedEventId>,
|
|
|
|
|
timeline: Arc<Timeline>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl DynamicUsage for SingleClientRoomBufferInner {
|
|
|
|
|
fn dynamic_usage(&self) -> usize {
|
|
|
|
|
std::mem::size_of::<Self>()
|
|
|
|
|
}
|
|
|
|
|
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
|
|
|
|
|
(
|
|
|
|
|
std::mem::size_of::<Self>(),
|
|
|
|
|
Some(std::mem::size_of::<Self>()),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// FIXME: use a single Arc for all the values inside
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
pub struct SingleClientRoomBuffer {
|
|
|
|
|
config: Arc<Config>,
|
|
|
|
|
room_id: Arc<OwnedRoomId>,
|
|
|
|
@ -123,31 +145,32 @@ pub struct SingleClientRoomBuffer {
|
|
|
|
|
|
|
|
|
|
/// Newest first
|
|
|
|
|
items: imbl::vector::Vector<(Option<u64>, OwnedBufferItemContent, Prerender)>,
|
|
|
|
|
latest_event_id: Option<OwnedEventId>,
|
|
|
|
|
// TODO: get rid of this trait object, we know it's matrix_sdk_ui::timeline::TimelineStream
|
|
|
|
|
timeline_stream: Box<dyn Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + Send + Sync + Unpin>,
|
|
|
|
|
timeline: Arc<Timeline>,
|
|
|
|
|
back_pagination_request: AtomicU16,
|
|
|
|
|
|
|
|
|
|
roominfo_subscriber: eyeball::Subscriber<RoomInfo>,
|
|
|
|
|
items_rx: tokio::sync::watch::Receiver<
|
|
|
|
|
imbl::vector::Vector<(Option<u64>, OwnedBufferItemContent, Prerender)>,
|
|
|
|
|
>,
|
|
|
|
|
back_pagination_request: SharedObservable<u16>,
|
|
|
|
|
|
|
|
|
|
inner: Arc<RwLock<SingleClientRoomBufferInner>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl DynamicUsage for SingleClientRoomBuffer {
|
|
|
|
|
fn dynamic_usage(&self) -> usize {
|
|
|
|
|
std::mem::size_of::<Self>()
|
|
|
|
|
self.inner.blocking_read().dynamic_usage()
|
|
|
|
|
+ self
|
|
|
|
|
.items
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|(_, content, prerender)| {
|
|
|
|
|
.map(|item| {
|
|
|
|
|
let (_, content, prerender) = item;
|
|
|
|
|
std::mem::size_of::<Option<u64>>() + content.dynamic_usage() + prerender.dynamic_usage()
|
|
|
|
|
})
|
|
|
|
|
.sum::<usize>()
|
|
|
|
|
}
|
|
|
|
|
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
|
|
|
|
|
let self_usage = std::mem::size_of::<Self>();
|
|
|
|
|
let (min, max) = self.inner.blocking_read().dynamic_usage_bounds();
|
|
|
|
|
crate::utils::sum_memory_bounds(
|
|
|
|
|
self_usage,
|
|
|
|
|
Some(self_usage),
|
|
|
|
|
min,
|
|
|
|
|
max,
|
|
|
|
|
self
|
|
|
|
|
.items
|
|
|
|
|
.iter()
|
|
|
|
@ -162,27 +185,42 @@ impl DynamicUsage for SingleClientRoomBuffer {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl SingleClientRoomBuffer {
|
|
|
|
|
/// If the room info was updated, returns it
|
|
|
|
|
async fn poll_updates(&mut self) -> Option<RoomInfo> {
|
|
|
|
|
let back_pagination_request = self.back_pagination_request.swap(0, Ordering::Relaxed);
|
|
|
|
|
if back_pagination_request > 0 {
|
|
|
|
|
// TODO: run this concurrently with timeline_stream.next() below
|
|
|
|
|
self.spawn_back_pagination(back_pagination_request).await;
|
|
|
|
|
}
|
|
|
|
|
tokio::select! {
|
|
|
|
|
biased;
|
|
|
|
|
roominfo = self.roominfo_subscriber.next() => {
|
|
|
|
|
Some(roominfo.expect("reached end of roominfo_subscriber"))
|
|
|
|
|
},
|
|
|
|
|
changes = self.timeline_stream.next() => {
|
|
|
|
|
let Some(changes) = changes else { return None; };
|
|
|
|
|
for change in changes {
|
|
|
|
|
change
|
|
|
|
|
.map(|item| (Some(item.unique_id()), self.format_timeline_item(item), Prerender::new()))
|
|
|
|
|
.apply(&mut self.items);
|
|
|
|
|
async fn poll_updates(
|
|
|
|
|
self,
|
|
|
|
|
updates_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>,
|
|
|
|
|
write_items: tokio::sync::watch::Sender<
|
|
|
|
|
imbl::vector::Vector<(Option<u64>, OwnedBufferItemContent, Prerender)>,
|
|
|
|
|
>,
|
|
|
|
|
mut roominfo_subscriber: eyeball::Subscriber<RoomInfo>,
|
|
|
|
|
mut timeline_stream: impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + Send + Sync + Unpin,
|
|
|
|
|
) {
|
|
|
|
|
loop {
|
|
|
|
|
tokio::select! {
|
|
|
|
|
biased;
|
|
|
|
|
roominfo = roominfo_subscriber.next() => {
|
|
|
|
|
match roominfo {
|
|
|
|
|
Some(roominfo) => {
|
|
|
|
|
updates_tx.send((self.clone(), roominfo)).expect("updates_tx closed");
|
|
|
|
|
}
|
|
|
|
|
None => {
|
|
|
|
|
log::info!("Closing {:?} client for {}", self.client, self.room_id);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
changes = timeline_stream.next() => {
|
|
|
|
|
if let Some(changes) = changes {
|
|
|
|
|
write_items.send_modify(|items| {
|
|
|
|
|
for change in changes {
|
|
|
|
|
change
|
|
|
|
|
.map(|item| (Some(item.unique_id()), self.format_timeline_item(item), Prerender::new()))
|
|
|
|
|
.apply(items);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
let mut inner = self.inner.write().await;
|
|
|
|
|
inner.latest_event_id = inner.timeline.latest_event().await.and_then(|e| e.event_id().map(ToOwned::to_owned));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
self.latest_event_id = self.timeline.latest_event().await.and_then(|e| e.event_id().map(ToOwned::to_owned));
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -443,31 +481,49 @@ impl SingleClientRoomBuffer {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn spawn_back_pagination(&self, num: u16) {
|
|
|
|
|
let room_id = self.room_id.clone();
|
|
|
|
|
let timeline = self.timeline.clone();
|
|
|
|
|
let mut back_pagination_status = timeline.back_pagination_status();
|
|
|
|
|
match back_pagination_status.get() {
|
|
|
|
|
BackPaginationStatus::Paginating | BackPaginationStatus::TimelineStartReached => {
|
|
|
|
|
// We are already waiting for a backfill from the server
|
|
|
|
|
},
|
|
|
|
|
BackPaginationStatus::Idle => {
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
tracing::debug!("Starting pagination for {}", room_id);
|
|
|
|
|
timeline
|
|
|
|
|
.paginate_backwards(matrix_sdk_ui::timeline::PaginationOptions::until_num_items(
|
|
|
|
|
num, num,
|
|
|
|
|
))
|
|
|
|
|
.await
|
|
|
|
|
.unwrap_or_else(|e| tracing::error!("Failed to paginate {} backward: {}", room_id, e));
|
|
|
|
|
tracing::debug!("Ended pagination for {}", room_id);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Wait for the task we just spawned to change the status, so we don't risk starting
|
|
|
|
|
// a new one in the meantime
|
|
|
|
|
back_pagination_status.next().await;
|
|
|
|
|
},
|
|
|
|
|
async fn back_pagination_worker(self) {
|
|
|
|
|
let mut request_subscriber = self.back_pagination_request.subscribe();
|
|
|
|
|
let mut num_requested_items = request_subscriber.next_now();
|
|
|
|
|
loop {
|
|
|
|
|
if num_requested_items != 0 {
|
|
|
|
|
let inner = self.inner.read().await;
|
|
|
|
|
let room_id = self.room_id.clone();
|
|
|
|
|
let timeline = inner.timeline.clone();
|
|
|
|
|
let back_pagination_status = timeline.back_pagination_status();
|
|
|
|
|
match back_pagination_status.get() {
|
|
|
|
|
BackPaginationStatus::Paginating | BackPaginationStatus::TimelineStartReached => {
|
|
|
|
|
// We are already waiting for a backfill from the server
|
|
|
|
|
tracing::debug!("Pending pagination for {}", room_id);
|
|
|
|
|
},
|
|
|
|
|
BackPaginationStatus::Idle => {
|
|
|
|
|
tracing::debug!("Starting pagination for {}", room_id);
|
|
|
|
|
timeline
|
|
|
|
|
.paginate_backwards(matrix_sdk_ui::timeline::PaginationOptions::until_num_items(
|
|
|
|
|
num_requested_items,
|
|
|
|
|
num_requested_items,
|
|
|
|
|
))
|
|
|
|
|
.await
|
|
|
|
|
.unwrap_or_else(|e| {
|
|
|
|
|
tracing::error!("Failed to paginate {} backward: {}", room_id, e)
|
|
|
|
|
});
|
|
|
|
|
tracing::debug!("Ended pagination for {}", room_id);
|
|
|
|
|
self.back_pagination_request.set(0);
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
num_requested_items = match request_subscriber.next().await {
|
|
|
|
|
Some(num_requested_items) => num_requested_items,
|
|
|
|
|
None => break,
|
|
|
|
|
};
|
|
|
|
|
if num_requested_items != 0 {
|
|
|
|
|
log::debug!(
|
|
|
|
|
"got back_pagination_request for {}: {}",
|
|
|
|
|
self.room_id,
|
|
|
|
|
num_requested_items
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
log::debug!("end of back_pagination_worker for {}", self.room_id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -478,6 +534,9 @@ pub struct RoomBuffer {
|
|
|
|
|
|
|
|
|
|
computed_roominfo: Option<ComputedRoomInfo>,
|
|
|
|
|
|
|
|
|
|
poll_updates_rx: UnboundedReceiver<(SingleClientRoomBuffer, RoomInfo)>,
|
|
|
|
|
poll_updates_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>,
|
|
|
|
|
|
|
|
|
|
/// Set while we are already computing roominfo, to block other updates to the room.
|
|
|
|
|
/// Holds the next `self.computed_roominfo`.
|
|
|
|
|
update_roominfo_rx: Option<oneshot::Receiver<ComputedRoomInfo>>,
|
|
|
|
@ -511,10 +570,13 @@ impl RoomBuffer {
|
|
|
|
|
initial_client: Client,
|
|
|
|
|
room_id: Arc<OwnedRoomId>,
|
|
|
|
|
) -> Result<Self> {
|
|
|
|
|
let (poll_updates_tx, poll_updates_rx) = unbounded_channel();
|
|
|
|
|
let mut self_ = RoomBuffer {
|
|
|
|
|
config,
|
|
|
|
|
room_id,
|
|
|
|
|
computed_roominfo: None,
|
|
|
|
|
poll_updates_tx,
|
|
|
|
|
poll_updates_rx,
|
|
|
|
|
update_roominfo_rx: None,
|
|
|
|
|
buffers: SmallVec::new(),
|
|
|
|
|
};
|
|
|
|
@ -539,32 +601,111 @@ impl RoomBuffer {
|
|
|
|
|
self.room_id,
|
|
|
|
|
items
|
|
|
|
|
);
|
|
|
|
|
self.buffers.push(SingleClientRoomBuffer {
|
|
|
|
|
let items: imbl::Vector<_> = items // FIXME: it's always empty. why?
|
|
|
|
|
.into_iter()
|
|
|
|
|
.map(|item| {
|
|
|
|
|
(
|
|
|
|
|
None,
|
|
|
|
|
OwnedBufferItemContent::SimpleText {
|
|
|
|
|
event_id: None,
|
|
|
|
|
is_message: false,
|
|
|
|
|
text: Text::raw(format!("Initial item: {:#?}", item)),
|
|
|
|
|
},
|
|
|
|
|
Prerender::new(),
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
.collect();
|
|
|
|
|
let (write_items, items_rx) = tokio::sync::watch::channel(items.clone());
|
|
|
|
|
let buffer = SingleClientRoomBuffer {
|
|
|
|
|
inner: Arc::new(RwLock::new(SingleClientRoomBufferInner {
|
|
|
|
|
timeline: Arc::new(timeline),
|
|
|
|
|
latest_event_id: None,
|
|
|
|
|
})),
|
|
|
|
|
config: self.config.clone(),
|
|
|
|
|
room_id: self.room_id.clone(),
|
|
|
|
|
client,
|
|
|
|
|
timeline: Arc::new(timeline),
|
|
|
|
|
items: items // FIXME: it's always empty. why?
|
|
|
|
|
.into_iter()
|
|
|
|
|
.map(|item| {
|
|
|
|
|
(
|
|
|
|
|
None,
|
|
|
|
|
OwnedBufferItemContent::SimpleText {
|
|
|
|
|
event_id: None,
|
|
|
|
|
is_message: false,
|
|
|
|
|
text: Text::raw(format!("Initial item: {:#?}", item)),
|
|
|
|
|
},
|
|
|
|
|
Prerender::new(),
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
.collect(),
|
|
|
|
|
latest_event_id: None,
|
|
|
|
|
timeline_stream: Box::new(timeline_stream),
|
|
|
|
|
back_pagination_request: AtomicU16::new(0),
|
|
|
|
|
roominfo_subscriber: room.subscribe_info(),
|
|
|
|
|
});
|
|
|
|
|
back_pagination_request: SharedObservable::new(0),
|
|
|
|
|
items,
|
|
|
|
|
items_rx,
|
|
|
|
|
};
|
|
|
|
|
self.buffers.push(buffer.clone());
|
|
|
|
|
tokio::task::spawn(buffer.clone().poll_updates(
|
|
|
|
|
self.poll_updates_tx.clone(),
|
|
|
|
|
write_items,
|
|
|
|
|
room.subscribe_info(),
|
|
|
|
|
timeline_stream,
|
|
|
|
|
));
|
|
|
|
|
tokio::task::spawn(buffer.back_pagination_worker());
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn poll_updates_once(&mut self) {
|
|
|
|
|
if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() {
|
|
|
|
|
// Initialize the computed_roominfo
|
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
|
self.update_roominfo_rx = Some(rx);
|
|
|
|
|
let room = self
|
|
|
|
|
.buffers
|
|
|
|
|
.first()
|
|
|
|
|
.expect("missing sub-buffer")
|
|
|
|
|
.client
|
|
|
|
|
.get_room(&self.room_id)
|
|
|
|
|
.expect("client missing room");
|
|
|
|
|
let roominfo_hash = hash_roominfo(room.clone_info());
|
|
|
|
|
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait for result of processing new updates
|
|
|
|
|
let update_roominfo_rx = async {
|
|
|
|
|
match self.update_roominfo_rx.as_mut() {
|
|
|
|
|
Some(update_roominfo_rx) => Some(
|
|
|
|
|
update_roominfo_rx
|
|
|
|
|
.await
|
|
|
|
|
.unwrap_or_else(|_| panic!("update_roominfo_rx for {} was dropped", self.room_id)),
|
|
|
|
|
),
|
|
|
|
|
None => std::future::pending().await,
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut poll_buffers_timeline: FuturesUnordered<_> = self
|
|
|
|
|
.buffers
|
|
|
|
|
.iter_mut()
|
|
|
|
|
.map(|buf| async {
|
|
|
|
|
buf.items_rx.changed().await.unwrap();
|
|
|
|
|
buf.items = buf.items_rx.borrow().clone(); // O(1) clone because imbl::Vector
|
|
|
|
|
})
|
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
|
|
tokio::select! {
|
|
|
|
|
new_update = self.poll_updates_rx.recv() => {
|
|
|
|
|
tracing::trace!("Got update for {}", self.room_id);
|
|
|
|
|
let (buf, roominfo) = new_update.expect("poll_updates_rx closed");
|
|
|
|
|
let roominfo_hash = hash_roominfo(roominfo);
|
|
|
|
|
if let Some(computed_roominfo) = self.computed_roominfo.as_ref() {
|
|
|
|
|
if computed_roominfo.roominfo_hash == roominfo_hash {
|
|
|
|
|
// No visible change, exit early
|
|
|
|
|
tracing::trace!("no visible change to {}", self.room_id);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
tracing::trace!("visible change to {}", self.room_id);
|
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
|
self.update_roominfo_rx = Some(rx); // Prevents spawning more update_room_info() calls while
|
|
|
|
|
// this one is still running
|
|
|
|
|
let room = buf.client.get_room(&self.room_id).expect("client missing room");
|
|
|
|
|
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
|
|
|
|
|
},
|
|
|
|
|
computed_roominfo = update_roominfo_rx => {
|
|
|
|
|
// Received result of a computation
|
|
|
|
|
tracing::trace!("Got computed_roominfo {}", self.room_id);
|
|
|
|
|
self.computed_roominfo = computed_roominfo;
|
|
|
|
|
self.update_roominfo_rx = None;
|
|
|
|
|
}
|
|
|
|
|
_ = poll_buffers_timeline.next() => {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
@ -616,71 +757,8 @@ impl Buffer for RoomBuffer {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn poll_updates(&mut self) {
|
|
|
|
|
if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() {
|
|
|
|
|
// Initialize the computed_roominfo
|
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
|
self.update_roominfo_rx = Some(rx);
|
|
|
|
|
let room = self
|
|
|
|
|
.buffers
|
|
|
|
|
.first()
|
|
|
|
|
.expect("missing sub-buffer")
|
|
|
|
|
.client
|
|
|
|
|
.get_room(&self.room_id)
|
|
|
|
|
.expect("client missing room");
|
|
|
|
|
let roominfo_hash = hash_roominfo(room.clone_info());
|
|
|
|
|
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Poll for new updates from the server
|
|
|
|
|
let mut roominfo_update = self
|
|
|
|
|
.buffers
|
|
|
|
|
.iter_mut()
|
|
|
|
|
.map(|buf| async {
|
|
|
|
|
let roominfo = buf.poll_updates().await;
|
|
|
|
|
(buf, roominfo)
|
|
|
|
|
})
|
|
|
|
|
.collect::<FuturesUnordered<_>>();
|
|
|
|
|
|
|
|
|
|
// Wait for result of processing new updates
|
|
|
|
|
let update_roominfo_rx = async {
|
|
|
|
|
match self.update_roominfo_rx.as_mut() {
|
|
|
|
|
Some(update_roominfo_rx) => Some(
|
|
|
|
|
update_roominfo_rx
|
|
|
|
|
.await
|
|
|
|
|
.unwrap_or_else(|_| panic!("update_roominfo_rx for {} was dropped", self.room_id)),
|
|
|
|
|
),
|
|
|
|
|
None => std::future::pending().await,
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
tokio::select! {
|
|
|
|
|
new_update = roominfo_update.next() => {
|
|
|
|
|
tracing::trace!("Got update for {}", self.room_id);
|
|
|
|
|
let Some((buf, Some(roominfo))) = new_update else {
|
|
|
|
|
tracing::trace!("None change to {}", self.room_id);
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
let roominfo_hash = hash_roominfo(roominfo);
|
|
|
|
|
if let Some(computed_roominfo) = self.computed_roominfo.as_ref() {
|
|
|
|
|
if computed_roominfo.roominfo_hash == roominfo_hash {
|
|
|
|
|
// No visible change, exit early
|
|
|
|
|
tracing::trace!("no visible change to {}", self.room_id);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
tracing::trace!("visible change to {}", self.room_id);
|
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
|
self.update_roominfo_rx = Some(rx); // Prevents spawning more update_room_info() calls while
|
|
|
|
|
// this one is still running
|
|
|
|
|
let room = buf.client.get_room(&self.room_id).expect("client missing room");
|
|
|
|
|
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
|
|
|
|
|
},
|
|
|
|
|
computed_roominfo = update_roominfo_rx => {
|
|
|
|
|
// Received result of a computation
|
|
|
|
|
tracing::trace!("Got computed_roominfo {}", self.room_id);
|
|
|
|
|
self.computed_roominfo = computed_roominfo;
|
|
|
|
|
self.update_roominfo_rx = None;
|
|
|
|
|
}
|
|
|
|
|
while !self.buffers.is_empty() {
|
|
|
|
|
self.poll_updates_once().await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -716,7 +794,16 @@ impl Buffer for RoomBuffer {
|
|
|
|
|
.first()
|
|
|
|
|
.unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id))
|
|
|
|
|
.back_pagination_request
|
|
|
|
|
.fetch_max(num, Ordering::Relaxed);
|
|
|
|
|
.update_if(|old| {
|
|
|
|
|
if *old < num {
|
|
|
|
|
log::info!("updating back_pagination_request for {}", self.room_id);
|
|
|
|
|
*old = num;
|
|
|
|
|
true
|
|
|
|
|
} else {
|
|
|
|
|
log::info!("NOT updating back_pagination_request for {}", self.room_id);
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn unread_notification_counts(&self) -> UnreadNotificationsCount {
|
|
|
|
@ -792,9 +879,14 @@ impl Buffer for RoomBuffer {
|
|
|
|
|
if buf.items.len() < self.config.history.max_prefetch_unread
|
|
|
|
|
&& self.config.history.prefetch_unread > 0
|
|
|
|
|
{
|
|
|
|
|
buf
|
|
|
|
|
.back_pagination_request
|
|
|
|
|
.fetch_max(self.config.history.prefetch_unread, Ordering::Relaxed);
|
|
|
|
|
buf.back_pagination_request.update_if(|old| {
|
|
|
|
|
if *old < self.config.history.prefetch_unread {
|
|
|
|
|
*old = self.config.history.prefetch_unread;
|
|
|
|
|
true
|
|
|
|
|
} else {
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|