From e7f3eeb33120b4642c857d1b029a61e0071b48b3 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 11 Feb 2024 09:02:40 +0100 Subject: [PATCH 1/2] poll_updates still needs to return when there are changes in order to update the UI --- src/buffers/log.rs | 2 +- src/buffers/mod.rs | 6 +-- src/buffers/room.rs | 104 +++++++++++++++++++++----------------------- 3 files changed, 53 insertions(+), 59 deletions(-) diff --git a/src/buffers/log.rs b/src/buffers/log.rs index 729955c..4ce2617 100644 --- a/src/buffers/log.rs +++ b/src/buffers/log.rs @@ -64,7 +64,7 @@ impl Buffer for LogBuffer { BufferId::Log } - async fn poll_updates(&mut self) { + async fn poll_updates_once(&mut self) { let line = self .receiver .recv() diff --git a/src/buffers/mod.rs b/src/buffers/mod.rs index 42dee1f..7c8e523 100644 --- a/src/buffers/mod.rs +++ b/src/buffers/mod.rs @@ -134,7 +134,7 @@ pub trait Buffer: Send + Sync + memuse::DynamicUsage { None } /// Returns if there are any updates to apply. - async fn poll_updates(&mut self); + async fn poll_updates_once(&mut self); fn content<'a>(&'a self) -> Box> + 'a>; /// Called when the user is being showned the oldest items this buffer returned. /// @@ -201,7 +201,7 @@ impl Buffers { let next_reorder = self.next_reorder; let mut updates_future = self .iter_mut() - .map(|buf| buf.poll_updates()) + .map(|buf| buf.poll_updates_once()) .collect::>(); let reorder_future = async { @@ -213,7 +213,7 @@ impl Buffers { select! { res = updates_future.next() => { - res.expect("poll_updates reached the end of the never-ending stream"); + res.expect("poll_updates_once reached the end of the never-ending stream"); false }, _ = reorder_future => { diff --git a/src/buffers/room.rs b/src/buffers/room.rs index 1fc5d54..93f0d01 100644 --- a/src/buffers/room.rs +++ b/src/buffers/room.rs @@ -639,6 +639,55 @@ impl RoomBuffer { Ok(()) } +} + +#[async_trait] +impl Buffer for RoomBuffer { + fn short_name(&self) -> String { + self + .computed_roominfo + .as_ref() + .and_then(|roominfo| roominfo.display_name.as_ref()) + .map(|dn| dn.to_string()) + .unwrap_or_else(|| { + self + .buffers + .iter() + .flat_map(|buf| buf.client.get_room(&self.room_id)) + .flat_map(|room| room.canonical_alias()) + .map(|alias| alias.as_str().to_owned()) + .next() + .unwrap_or(self.room_id.as_str().to_owned()) + .clone() + }) + } + + #[inline] + fn id(&self) -> BufferId { + BufferId::Room(self.room_id.clone()) + } + + fn parent(&self) -> Option { + self + .computed_roominfo + .as_ref() + .and_then(|roominfo| roominfo.parent.as_ref()) + .map(|parent| BufferId::Room(parent.clone())) + } + fn children(&self) -> Option> { + self + .computed_roominfo + .as_ref() + .and_then(|roominfo| roominfo.children.as_ref()) + .map(|children: &SortedVec<_>| { + let children = children + .iter() + .map(|(sort_key, room_id)| (sort_key.clone(), BufferId::Room(room_id.clone()))) + .collect(); + // This is safe because the map above preserves order + unsafe { SortedVec::from_sorted(children) } + }) + } async fn poll_updates_once(&mut self) { if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() { @@ -706,61 +755,6 @@ impl RoomBuffer { } } } -} - -#[async_trait] -impl Buffer for RoomBuffer { - fn short_name(&self) -> String { - self - .computed_roominfo - .as_ref() - .and_then(|roominfo| roominfo.display_name.as_ref()) - .map(|dn| dn.to_string()) - .unwrap_or_else(|| { - self - .buffers - .iter() - .flat_map(|buf| buf.client.get_room(&self.room_id)) - .flat_map(|room| room.canonical_alias()) - .map(|alias| alias.as_str().to_owned()) - .next() - .unwrap_or(self.room_id.as_str().to_owned()) - .clone() - }) - } - - #[inline] - fn id(&self) -> BufferId { - BufferId::Room(self.room_id.clone()) - } - - fn parent(&self) -> Option { - self - .computed_roominfo - .as_ref() - .and_then(|roominfo| roominfo.parent.as_ref()) - .map(|parent| BufferId::Room(parent.clone())) - } - fn children(&self) -> Option> { - self - .computed_roominfo - .as_ref() - .and_then(|roominfo| roominfo.children.as_ref()) - .map(|children: &SortedVec<_>| { - let children = children - .iter() - .map(|(sort_key, room_id)| (sort_key.clone(), BufferId::Room(room_id.clone()))) - .collect(); - // This is safe because the map above preserves order - unsafe { SortedVec::from_sorted(children) } - }) - } - - async fn poll_updates(&mut self) { - while !self.buffers.is_empty() { - self.poll_updates_once().await; - } - } fn content<'a>(&'a self) -> Box> + 'a> { // TODO: merge buffers, etc. From df4134829b73ea1e344609601b64ee3546046576 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 11 Feb 2024 10:24:01 +0100 Subject: [PATCH 2/2] Move ComputedRoomInfo creation to its own long-running task This should remove another source of Futures churn. Plus, it makes the code simpler as computed_roominfo doesn't need to be an Option anymore in this new design. --- src/buffers/room.rs | 138 ++++++++++++++++++-------------------------- 1 file changed, 57 insertions(+), 81 deletions(-) diff --git a/src/buffers/room.rs b/src/buffers/room.rs index 93f0d01..3061165 100644 --- a/src/buffers/room.rs +++ b/src/buffers/room.rs @@ -482,12 +482,12 @@ impl SingleClientRoomBuffer { } async fn back_pagination_worker(self) { + let room_id = self.room_id.clone(); let mut request_subscriber = self.back_pagination_request.subscribe(); let mut num_requested_items = request_subscriber.next_now(); loop { if num_requested_items != 0 { let inner = self.inner.read().await; - let room_id = self.room_id.clone(); let timeline = inner.timeline.clone(); let back_pagination_status = timeline.back_pagination_status(); match back_pagination_status.get() { @@ -532,14 +532,10 @@ pub struct RoomBuffer { room_id: Arc, - computed_roominfo: Option, + computed_roominfo: ComputedRoomInfo, - poll_updates_rx: UnboundedReceiver<(SingleClientRoomBuffer, RoomInfo)>, - poll_updates_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>, - - /// Set while we are already computing roominfo, to block other updates to the room. - /// Holds the next `self.computed_roominfo`. - update_roominfo_rx: Option>, + computed_roominfo_rx: UnboundedReceiver, + roominfo_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>, // It's unlikely users will join the same room with more than one account; // avoid a useless heap allocation for the usual case. @@ -570,16 +566,26 @@ impl RoomBuffer { initial_client: Client, room_id: Arc, ) -> Result { - let (poll_updates_tx, poll_updates_rx) = unbounded_channel(); + let (roominfo_tx, roominfo_rx) = unbounded_channel(); + let (computed_roominfo_tx, computed_roominfo_rx) = unbounded_channel(); + + let room = initial_client + .get_room(&room_id) + .expect("client missing room"); + let initial_roominfo_hash = hash_roominfo(room.clone_info()); + let computed_roominfo = compute_room_info(room, initial_roominfo_hash).await; + + tokio::task::spawn(update_roominfo_worker(room_id.clone(), initial_roominfo_hash, roominfo_rx, computed_roominfo_tx)); + let mut self_ = RoomBuffer { config, room_id, - computed_roominfo: None, - poll_updates_tx, - poll_updates_rx, - update_roominfo_rx: None, + computed_roominfo_rx, + roominfo_tx, + computed_roominfo, buffers: SmallVec::new(), }; + self_.add_client(initial_client.clone()).await?; Ok(self_) } @@ -630,7 +636,7 @@ impl RoomBuffer { }; self.buffers.push(buffer.clone()); tokio::task::spawn(buffer.clone().poll_updates( - self.poll_updates_tx.clone(), + self.roominfo_tx.clone(), write_items, room.subscribe_info(), timeline_stream, @@ -646,8 +652,7 @@ impl Buffer for RoomBuffer { fn short_name(&self) -> String { self .computed_roominfo - .as_ref() - .and_then(|roominfo| roominfo.display_name.as_ref()) + .display_name.as_ref() .map(|dn| dn.to_string()) .unwrap_or_else(|| { self @@ -670,15 +675,13 @@ impl Buffer for RoomBuffer { fn parent(&self) -> Option { self .computed_roominfo - .as_ref() - .and_then(|roominfo| roominfo.parent.as_ref()) + .parent.as_ref() .map(|parent| BufferId::Room(parent.clone())) } fn children(&self) -> Option> { self .computed_roominfo - .as_ref() - .and_then(|roominfo| roominfo.children.as_ref()) + .children.as_ref() .map(|children: &SortedVec<_>| { let children = children .iter() @@ -688,35 +691,7 @@ impl Buffer for RoomBuffer { unsafe { SortedVec::from_sorted(children) } }) } - async fn poll_updates_once(&mut self) { - if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() { - // Initialize the computed_roominfo - let (tx, rx) = oneshot::channel(); - self.update_roominfo_rx = Some(rx); - let room = self - .buffers - .first() - .expect("missing sub-buffer") - .client - .get_room(&self.room_id) - .expect("client missing room"); - let roominfo_hash = hash_roominfo(room.clone_info()); - tokio::spawn(compute_room_info(room, tx, roominfo_hash)); - } - - // Wait for result of processing new updates - let update_roominfo_rx = async { - match self.update_roominfo_rx.as_mut() { - Some(update_roominfo_rx) => Some( - update_roominfo_rx - .await - .unwrap_or_else(|_| panic!("update_roominfo_rx for {} was dropped", self.room_id)), - ), - None => std::future::pending().await, - } - }; - let mut poll_buffers_timeline: FuturesUnordered<_> = self .buffers .iter_mut() @@ -727,29 +702,10 @@ impl Buffer for RoomBuffer { .collect(); tokio::select! { - new_update = self.poll_updates_rx.recv() => { - tracing::trace!("Got update for {}", self.room_id); - let (buf, roominfo) = new_update.expect("poll_updates_rx closed"); - let roominfo_hash = hash_roominfo(roominfo); - if let Some(computed_roominfo) = self.computed_roominfo.as_ref() { - if computed_roominfo.roominfo_hash == roominfo_hash { - // No visible change, exit early - tracing::trace!("no visible change to {}", self.room_id); - return; - } - } - tracing::trace!("visible change to {}", self.room_id); - let (tx, rx) = oneshot::channel(); - self.update_roominfo_rx = Some(rx); // Prevents spawning more update_room_info() calls while - // this one is still running - let room = buf.client.get_room(&self.room_id).expect("client missing room"); - tokio::spawn(compute_room_info(room, tx, roominfo_hash)); - }, - computed_roominfo = update_roominfo_rx => { + computed_roominfo = self.computed_roominfo_rx.recv() => { // Received result of a computation tracing::trace!("Got computed_roominfo {}", self.room_id); - self.computed_roominfo = computed_roominfo; - self.update_roominfo_rx = None; + self.computed_roominfo = computed_roominfo.expect("reached end of computed_roominfo_rx"); } _ = poll_buffers_timeline.next() => { } @@ -819,8 +775,7 @@ impl Buffer for RoomBuffer { fn fully_read(&self) -> FullyReadStatus { match self .computed_roominfo - .as_ref() - .and_then(|ri| ri.fully_read_at.as_ref()) + .fully_read_at.as_ref() { None => FullyReadStatus::All, // Unknown, assume it's read for now, we'll probably find out later Some(fully_read_at) => { @@ -891,11 +846,7 @@ impl Buffer for RoomBuffer { } } -async fn compute_room_info( - room: Room, - done_tx: oneshot::Sender, - roominfo_hash: u64, -) { +async fn compute_room_info(room: Room, roominfo_hash: u64) -> ComputedRoomInfo { let (parent, children, fully_read_at, display_name) = tokio::join!( async { // Get parent space @@ -1014,16 +965,13 @@ async fn compute_room_info( } else { tracing::debug!("{} has no parent", room.room_id()); } - let computed_roominfo = ComputedRoomInfo { + ComputedRoomInfo { display_name, parent, children, fully_read_at, roominfo_hash, - }; - done_tx - .send(computed_roominfo) - .unwrap_or_else(|e| tracing::error!("update_room_info could not notify it was done: {:?}", e)); + } } #[derive(Debug)] @@ -1046,3 +994,31 @@ fn hash_roominfo(mut roominfo: RoomInfo) -> u64 { format!("{:?}", roominfo).hash(&mut roominfo_hasher); roominfo_hasher.finish() } + +/// Gets updates from [`SingleClientRoomBuffer`], and aggregates them for a +/// [`RoomBuffer`] +async fn update_roominfo_worker( + room_id: Arc, + initial_roominfo_hash: u64, + mut roominfo_rx: UnboundedReceiver<(SingleClientRoomBuffer, RoomInfo)>, + computed_roominfo_tx: UnboundedSender, +) { + let mut last_roominfo_hash = initial_roominfo_hash; + loop { + tracing::trace!("Got update for {}", room_id); + let (buf, roominfo) = roominfo_rx.recv().await.expect("poll_updates_rx closed"); + let roominfo_hash = hash_roominfo(roominfo); + if last_roominfo_hash == roominfo_hash { + // No visible change, exit early + tracing::trace!("no visible change to {}", room_id); + continue; + } + last_roominfo_hash = roominfo_hash; + tracing::trace!("visible change to {}", room_id); + let room = buf + .client + .get_room(&room_id) + .expect("client missing room"); + computed_roominfo_tx.send(compute_room_info(room, roominfo_hash).await).expect("failed to send to computed_roominfo_tx"); + } +}