Only fetch display_name/parent/children if there was any update to the room state

Previously, fetching those pinned a whole CPU core, and panicked with
`called `Option::unwrap()` on a `None` value` here:
https://github.com/bikeshedder/deadpool/blob/deadpool-sync-v0.1.2/sync/src/lib.rs#L122
presumably because the high churn in created and dropped futures.

Technically we should re-fetch the parent if any space was updated (because
`Room::parent_spaces()` checks relationships are reciprocal), but
let's do this later
This commit is contained in:
2023-11-11 10:36:04 +01:00
parent 89e011dd1b
commit b1d98c0da0
2 changed files with 192 additions and 127 deletions

View File

@ -199,7 +199,7 @@ impl App {
sync_responses_tx
.send((client2, sync_response))
.expect("could not send sync response");
tracing::info!("Synced with {}", user_id);
tracing::info!("Got updates for {}", user_id);
if should_quit.load(Ordering::Acquire) {
Ok(matrix_sdk::LoopCtrl::Break)
} else {

View File

@ -15,6 +15,7 @@
*/
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{Arc, OnceLock};
@ -39,6 +40,7 @@ use matrix_sdk_ui::timeline::{
use memuse::DynamicUsage;
use ratatui::text::Text;
use smallvec::SmallVec;
use tokio::sync::oneshot;
use super::{Buffer, BufferId, BufferItem, BufferItemContent};
use crate::widgets::Prerender;
@ -333,10 +335,11 @@ impl SingleClientRoomBuffer {
pub struct RoomBuffer {
room_id: OwnedRoomId,
initialized_roominfo: bool,
parent: Option<OwnedRoomId>,
children: Option<Vec<OwnedRoomId>>,
display_name: Option<DisplayName>,
computed_roominfo: Option<ComputedRoomInfo>,
/// Set while we are already computing roominfo, to block other updates to the room.
/// Holds the next `self.computed_roominfo`.
update_roominfo_rx: Option<oneshot::Receiver<ComputedRoomInfo>>,
// It's unlikely users will join the same room with more than one account;
// avoid a useless heap allocation for the usual case.
@ -365,10 +368,8 @@ impl RoomBuffer {
pub async fn new(initial_client: Client, room_id: OwnedRoomId) -> Result<Self> {
let mut self_ = RoomBuffer {
room_id,
initialized_roominfo: false,
parent: None,
children: None,
display_name: None,
computed_roominfo: None,
update_roominfo_rx: None,
buffers: SmallVec::new(),
};
self_.add_client(initial_client.clone()).await?;
@ -406,94 +407,15 @@ impl RoomBuffer {
});
Ok(())
}
async fn update_room_info(&mut self, room: &Room) {
(self.parent, self.children, self.display_name) = tokio::join!(
async {
// Get parent space
match room.parent_spaces().await {
Ok(parents) => {
parents
.flat_map_unordered(None, |parent| {
futures::stream::iter(match parent {
ParentSpace::Reciprocal(space) | ParentSpace::WithPowerlevel(space) => {
Some(space.room_id().to_owned())
},
ParentSpace::Unverifiable(_) | ParentSpace::Illegitimate(_) => None,
})
})
.next() // Get the first one to be ready. TODO: take the canonical space
.await
},
Err(e) => {
tracing::error!("Failed to get parent spaces of {}: {:?}", self.room_id, e);
None
},
}
},
async {
// Get child rooms
match room
.get_state_events_static::<SpaceChildEventContent>()
.await
{
Ok(child_events) => {
Some(
child_events
.into_iter()
// Extract state key (ie. the child's id) and sender
.flat_map(|parent_event| {
match parent_event.deserialize() {
Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
Some(e.state_key.to_owned())
},
Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
Ok(SyncOrStrippedState::Stripped(e)) => Some(e.state_key.to_owned()),
Err(_) => None, // Ignore deserialization errors
}
})
.collect(),
)
},
Err(e) => {
tracing::error!("Failed to get child rooms of {}: {:?}", self.room_id, e);
None
},
}
},
async {
match room.display_name().await {
Ok(dn) => {
tracing::debug!("Setting display name for {}: {}", self.room_id, dn);
Some(dn)
},
Err(e) => {
tracing::error!(
"Error while resolving display name for {}: {}",
self.room_id,
e
);
None
},
}
}
);
if let Some(parent) = self.parent.as_ref() {
tracing::debug!("{} has parent {}", self.room_id, parent);
} else {
tracing::debug!("{} has no parent", self.room_id);
}
self.initialized_roominfo = true;
}
}
#[async_trait]
impl Buffer for RoomBuffer {
fn short_name(&self) -> String {
self
.display_name
.computed_roominfo
.as_ref()
.and_then(|roominfo| roominfo.display_name.as_ref())
.map(|dn| dn.to_string())
.unwrap_or_else(|| {
self
@ -514,56 +436,89 @@ impl Buffer for RoomBuffer {
fn parent(&self) -> Option<BufferId> {
self
.parent
.computed_roominfo
.as_ref()
.and_then(|roominfo| roominfo.parent.as_ref())
.map(|parent| BufferId::Room(parent.to_owned()))
}
fn children(&self) -> Option<Vec<BufferId>> {
self.children.as_ref().map(|children| {
children
.iter()
.map(|child| BufferId::Room(child.to_owned()))
.collect()
})
self
.computed_roominfo
.as_ref()
.and_then(|roominfo| roominfo.children.as_ref())
.map(|children| {
children
.iter()
.map(|child| BufferId::Room(child.to_owned()))
.collect()
})
}
async fn poll_updates(&mut self) {
let room = if self.initialized_roominfo {
let mut roominfo_update = self
.buffers
.iter_mut()
.map(|buf| async {
let roominfo = buf.poll_updates().await;
(buf, roominfo)
})
.collect::<FuturesUnordered<_>>();
let Some((buf, roominfo)) = roominfo_update.next().await else {
return;
};
let Some(roominfo) = roominfo else {
return;
};
let Some(room) = buf.client.get_room(&self.room_id) else {
return;
};
room
} else {
self
if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() {
// Initialize the computed_roominfo
let (tx, rx) = oneshot::channel();
self.update_roominfo_rx = Some(rx);
let room = self
.buffers
.first()
.unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id))
.expect("missing sub-buffer")
.client
.get_room(&self.room_id)
.expect("Room not found in first client")
.expect("client missing room");
let roominfo_hash = hash_roominfo(room.clone_info());
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
}
let mut roominfo_update = self
.buffers
.iter_mut()
.map(|buf| async {
let roominfo = buf.poll_updates().await;
(buf, roominfo)
})
.collect::<FuturesUnordered<_>>();
let update_roominfo_rx = async {
match self.update_roominfo_rx.as_mut() {
Some(update_roominfo_rx) => Some(
update_roominfo_rx
.await
.unwrap_or_else(|_| panic!("update_roominfo_rx for {} was dropped", self.room_id)),
),
None => std::future::pending().await,
}
};
// This blocks any other update to the room while matrix-sdk computes the display name
// and parent space. Let's pretend it's a feature. (Although it's probably pretty bad when
// joined to the room with multiple clients and they all get the same update and
// have to resolve the name one by one...)
self.update_room_info(&room).await
tokio::select! {
new_update = roominfo_update.next() => {
tracing::trace!("Got update for {}", self.room_id);
let Some((buf, Some(roominfo))) = new_update else {
tracing::trace!("None change to {}", self.room_id);
return;
};
let roominfo_hash = hash_roominfo(roominfo);
if let Some(computed_roominfo) = self.computed_roominfo.as_ref() {
if computed_roominfo.roominfo_hash == roominfo_hash {
// No visible change, exit early
tracing::trace!("no visible change to {}", self.room_id);
return;
}
}
tracing::trace!("visible change to {}", self.room_id);
let (tx, rx) = oneshot::channel();
self.update_roominfo_rx = Some(rx); // Prevents spawning more update_room_info() calls while
// this one is still running
let room = buf.client.get_room(&self.room_id).expect("client missing room");
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
},
computed_roominfo = update_roominfo_rx => {
// Received result of a computation
tracing::trace!("Got computed_roominfo {}", self.room_id);
self.computed_roominfo = computed_roominfo;
self.update_roominfo_rx = None;
}
}
}
fn content<'a>(&'a self) -> Box<dyn Iterator<Item = BufferItem<'a>> + 'a> {
@ -597,3 +552,113 @@ impl Buffer for RoomBuffer {
.fetch_max(num, Ordering::Relaxed);
}
}
async fn compute_room_info(
room: Room,
done_tx: oneshot::Sender<ComputedRoomInfo>,
roominfo_hash: u64,
) {
let (parent, children, display_name) = tokio::join!(
async {
// Get parent space
match room.parent_spaces().await {
Ok(parents) => {
parents
.flat_map_unordered(None, |parent| {
futures::stream::iter(match parent {
ParentSpace::Reciprocal(space) | ParentSpace::WithPowerlevel(space) => {
Some(space.room_id().to_owned())
},
ParentSpace::Unverifiable(_) | ParentSpace::Illegitimate(_) => None,
})
})
.next() // Get the first one to be ready. TODO: take the canonical space
.await
},
Err(e) => {
tracing::error!("Failed to get parent spaces of {}: {:?}", room.room_id(), e);
None
},
}
},
async {
// Get child rooms
match room
.get_state_events_static::<SpaceChildEventContent>()
.await
{
Ok(child_events) => {
Some(
child_events
.into_iter()
// Extract state key (ie. the child's id) and sender
.flat_map(|parent_event| {
match parent_event.deserialize() {
Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
Some(e.state_key.to_owned())
},
Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
Ok(SyncOrStrippedState::Stripped(e)) => Some(e.state_key.to_owned()),
Err(_) => None, // Ignore deserialization errors
}
})
.collect(),
)
},
Err(e) => {
tracing::error!("Failed to get child rooms of {}: {:?}", room.room_id(), e);
None
},
}
},
async {
match room.display_name().await {
Ok(dn) => {
tracing::debug!("Setting display name for {}: {}", room.room_id(), dn);
Some(dn)
},
Err(e) => {
tracing::error!(
"Error while resolving display name for {}: {}",
room.room_id(),
e
);
None
},
}
}
);
if let Some(parent) = parent.as_ref() {
tracing::debug!("{} has parent {}", room.room_id(), parent);
} else {
tracing::debug!("{} has no parent", room.room_id());
}
let computed_roominfo = ComputedRoomInfo {
display_name,
parent,
children,
roominfo_hash,
};
done_tx
.send(computed_roominfo)
.unwrap_or_else(|e| tracing::error!("update_room_info could not notify it was done: {:?}", e));
}
#[derive(Debug)]
struct ComputedRoomInfo {
display_name: Option<DisplayName>,
parent: Option<OwnedRoomId>,
children: Option<Vec<OwnedRoomId>>,
roominfo_hash: u64,
}
fn hash_roominfo(mut roominfo: RoomInfo) -> u64 {
let mut roominfo_hasher = std::collections::hash_map::DefaultHasher::new();
roominfo.set_prev_batch(None); // Changes often and doesn't affect UI
// TODO: make matrix-sdk and ruma implement Hash on their structures instead
// of using this awful workaround
format!("{:?}", roominfo).hash(&mut roominfo_hasher);
roominfo_hasher.finish()
}