Compare commits

...

2 Commits

Author SHA1 Message Date
Val Lorentz df4134829b Move ComputedRoomInfo creation to its own long-running task
Some checks failed
CI / lint (push) Failing after 36s
CI / Build and test (, 1.76.0) (push) Successful in 3m39s
CI / Build and test (, beta) (push) Successful in 3m35s
CI / Build and test (, nightly) (push) Failing after 14m10s
This should remove another source of Futures churn.

Plus, it makes the code simpler as computed_roominfo doesn't need to be
an Option anymore in this new design.
2024-02-11 10:24:34 +01:00
Val Lorentz e7f3eeb331 poll_updates still needs to return when there are changes in order to update the UI 2024-02-11 09:02:40 +01:00
3 changed files with 76 additions and 106 deletions

View File

@ -64,7 +64,7 @@ impl Buffer for LogBuffer {
BufferId::Log
}
async fn poll_updates(&mut self) {
async fn poll_updates_once(&mut self) {
let line = self
.receiver
.recv()

View File

@ -134,7 +134,7 @@ pub trait Buffer: Send + Sync + memuse::DynamicUsage {
None
}
/// Returns if there are any updates to apply.
async fn poll_updates(&mut self);
async fn poll_updates_once(&mut self);
fn content<'a>(&'a self) -> Box<dyn ExactSizeIterator<Item = BufferItem<'a>> + 'a>;
/// Called when the user is being showned the oldest items this buffer returned.
///
@ -201,7 +201,7 @@ impl Buffers {
let next_reorder = self.next_reorder;
let mut updates_future = self
.iter_mut()
.map(|buf| buf.poll_updates())
.map(|buf| buf.poll_updates_once())
.collect::<FuturesUnordered<_>>();
let reorder_future = async {
@ -213,7 +213,7 @@ impl Buffers {
select! {
res = updates_future.next() => {
res.expect("poll_updates reached the end of the never-ending stream");
res.expect("poll_updates_once reached the end of the never-ending stream");
false
},
_ = reorder_future => {

View File

@ -482,12 +482,12 @@ impl SingleClientRoomBuffer {
}
async fn back_pagination_worker(self) {
let room_id = self.room_id.clone();
let mut request_subscriber = self.back_pagination_request.subscribe();
let mut num_requested_items = request_subscriber.next_now();
loop {
if num_requested_items != 0 {
let inner = self.inner.read().await;
let room_id = self.room_id.clone();
let timeline = inner.timeline.clone();
let back_pagination_status = timeline.back_pagination_status();
match back_pagination_status.get() {
@ -532,14 +532,10 @@ pub struct RoomBuffer {
room_id: Arc<OwnedRoomId>,
computed_roominfo: Option<ComputedRoomInfo>,
computed_roominfo: ComputedRoomInfo,
poll_updates_rx: UnboundedReceiver<(SingleClientRoomBuffer, RoomInfo)>,
poll_updates_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>,
/// Set while we are already computing roominfo, to block other updates to the room.
/// Holds the next `self.computed_roominfo`.
update_roominfo_rx: Option<oneshot::Receiver<ComputedRoomInfo>>,
computed_roominfo_rx: UnboundedReceiver<ComputedRoomInfo>,
roominfo_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>,
// It's unlikely users will join the same room with more than one account;
// avoid a useless heap allocation for the usual case.
@ -570,16 +566,26 @@ impl RoomBuffer {
initial_client: Client,
room_id: Arc<OwnedRoomId>,
) -> Result<Self> {
let (poll_updates_tx, poll_updates_rx) = unbounded_channel();
let (roominfo_tx, roominfo_rx) = unbounded_channel();
let (computed_roominfo_tx, computed_roominfo_rx) = unbounded_channel();
let room = initial_client
.get_room(&room_id)
.expect("client missing room");
let initial_roominfo_hash = hash_roominfo(room.clone_info());
let computed_roominfo = compute_room_info(room, initial_roominfo_hash).await;
tokio::task::spawn(update_roominfo_worker(room_id.clone(), initial_roominfo_hash, roominfo_rx, computed_roominfo_tx));
let mut self_ = RoomBuffer {
config,
room_id,
computed_roominfo: None,
poll_updates_tx,
poll_updates_rx,
update_roominfo_rx: None,
computed_roominfo_rx,
roominfo_tx,
computed_roominfo,
buffers: SmallVec::new(),
};
self_.add_client(initial_client.clone()).await?;
Ok(self_)
}
@ -630,7 +636,7 @@ impl RoomBuffer {
};
self.buffers.push(buffer.clone());
tokio::task::spawn(buffer.clone().poll_updates(
self.poll_updates_tx.clone(),
self.roominfo_tx.clone(),
write_items,
room.subscribe_info(),
timeline_stream,
@ -639,73 +645,6 @@ impl RoomBuffer {
Ok(())
}
async fn poll_updates_once(&mut self) {
if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() {
// Initialize the computed_roominfo
let (tx, rx) = oneshot::channel();
self.update_roominfo_rx = Some(rx);
let room = self
.buffers
.first()
.expect("missing sub-buffer")
.client
.get_room(&self.room_id)
.expect("client missing room");
let roominfo_hash = hash_roominfo(room.clone_info());
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
}
// Wait for result of processing new updates
let update_roominfo_rx = async {
match self.update_roominfo_rx.as_mut() {
Some(update_roominfo_rx) => Some(
update_roominfo_rx
.await
.unwrap_or_else(|_| panic!("update_roominfo_rx for {} was dropped", self.room_id)),
),
None => std::future::pending().await,
}
};
let mut poll_buffers_timeline: FuturesUnordered<_> = self
.buffers
.iter_mut()
.map(|buf| async {
buf.items_rx.changed().await.unwrap();
buf.items = buf.items_rx.borrow().clone(); // O(1) clone because imbl::Vector
})
.collect();
tokio::select! {
new_update = self.poll_updates_rx.recv() => {
tracing::trace!("Got update for {}", self.room_id);
let (buf, roominfo) = new_update.expect("poll_updates_rx closed");
let roominfo_hash = hash_roominfo(roominfo);
if let Some(computed_roominfo) = self.computed_roominfo.as_ref() {
if computed_roominfo.roominfo_hash == roominfo_hash {
// No visible change, exit early
tracing::trace!("no visible change to {}", self.room_id);
return;
}
}
tracing::trace!("visible change to {}", self.room_id);
let (tx, rx) = oneshot::channel();
self.update_roominfo_rx = Some(rx); // Prevents spawning more update_room_info() calls while
// this one is still running
let room = buf.client.get_room(&self.room_id).expect("client missing room");
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
},
computed_roominfo = update_roominfo_rx => {
// Received result of a computation
tracing::trace!("Got computed_roominfo {}", self.room_id);
self.computed_roominfo = computed_roominfo;
self.update_roominfo_rx = None;
}
_ = poll_buffers_timeline.next() => {
}
}
}
}
#[async_trait]
@ -713,8 +652,7 @@ impl Buffer for RoomBuffer {
fn short_name(&self) -> String {
self
.computed_roominfo
.as_ref()
.and_then(|roominfo| roominfo.display_name.as_ref())
.display_name.as_ref()
.map(|dn| dn.to_string())
.unwrap_or_else(|| {
self
@ -737,15 +675,13 @@ impl Buffer for RoomBuffer {
fn parent(&self) -> Option<BufferId> {
self
.computed_roominfo
.as_ref()
.and_then(|roominfo| roominfo.parent.as_ref())
.parent.as_ref()
.map(|parent| BufferId::Room(parent.clone()))
}
fn children(&self) -> Option<SortedVec<(BufferSortKey, BufferId)>> {
self
.computed_roominfo
.as_ref()
.and_then(|roominfo| roominfo.children.as_ref())
.children.as_ref()
.map(|children: &SortedVec<_>| {
let children = children
.iter()
@ -755,10 +691,24 @@ impl Buffer for RoomBuffer {
unsafe { SortedVec::from_sorted(children) }
})
}
async fn poll_updates_once(&mut self) {
let mut poll_buffers_timeline: FuturesUnordered<_> = self
.buffers
.iter_mut()
.map(|buf| async {
buf.items_rx.changed().await.unwrap();
buf.items = buf.items_rx.borrow().clone(); // O(1) clone because imbl::Vector
})
.collect();
async fn poll_updates(&mut self) {
while !self.buffers.is_empty() {
self.poll_updates_once().await;
tokio::select! {
computed_roominfo = self.computed_roominfo_rx.recv() => {
// Received result of a computation
tracing::trace!("Got computed_roominfo {}", self.room_id);
self.computed_roominfo = computed_roominfo.expect("reached end of computed_roominfo_rx");
}
_ = poll_buffers_timeline.next() => {
}
}
}
@ -825,8 +775,7 @@ impl Buffer for RoomBuffer {
fn fully_read(&self) -> FullyReadStatus {
match self
.computed_roominfo
.as_ref()
.and_then(|ri| ri.fully_read_at.as_ref())
.fully_read_at.as_ref()
{
None => FullyReadStatus::All, // Unknown, assume it's read for now, we'll probably find out later
Some(fully_read_at) => {
@ -897,11 +846,7 @@ impl Buffer for RoomBuffer {
}
}
async fn compute_room_info(
room: Room,
done_tx: oneshot::Sender<ComputedRoomInfo>,
roominfo_hash: u64,
) {
async fn compute_room_info(room: Room, roominfo_hash: u64) -> ComputedRoomInfo {
let (parent, children, fully_read_at, display_name) = tokio::join!(
async {
// Get parent space
@ -1020,16 +965,13 @@ async fn compute_room_info(
} else {
tracing::debug!("{} has no parent", room.room_id());
}
let computed_roominfo = ComputedRoomInfo {
ComputedRoomInfo {
display_name,
parent,
children,
fully_read_at,
roominfo_hash,
};
done_tx
.send(computed_roominfo)
.unwrap_or_else(|e| tracing::error!("update_room_info could not notify it was done: {:?}", e));
}
}
#[derive(Debug)]
@ -1052,3 +994,31 @@ fn hash_roominfo(mut roominfo: RoomInfo) -> u64 {
format!("{:?}", roominfo).hash(&mut roominfo_hasher);
roominfo_hasher.finish()
}
/// Gets updates from [`SingleClientRoomBuffer`], and aggregates them for a
/// [`RoomBuffer`]
async fn update_roominfo_worker(
room_id: Arc<OwnedRoomId>,
initial_roominfo_hash: u64,
mut roominfo_rx: UnboundedReceiver<(SingleClientRoomBuffer, RoomInfo)>,
computed_roominfo_tx: UnboundedSender<ComputedRoomInfo>,
) {
let mut last_roominfo_hash = initial_roominfo_hash;
loop {
tracing::trace!("Got update for {}", room_id);
let (buf, roominfo) = roominfo_rx.recv().await.expect("poll_updates_rx closed");
let roominfo_hash = hash_roominfo(roominfo);
if last_roominfo_hash == roominfo_hash {
// No visible change, exit early
tracing::trace!("no visible change to {}", room_id);
continue;
}
last_roominfo_hash = roominfo_hash;
tracing::trace!("visible change to {}", room_id);
let room = buf
.client
.get_room(&room_id)
.expect("client missing room");
computed_roominfo_tx.send(compute_room_info(room, roominfo_hash).await).expect("failed to send to computed_roominfo_tx");
}
}