diff --git a/src/buffers/room.rs b/src/buffers/room.rs index ff1b666..4391558 100644 --- a/src/buffers/room.rs +++ b/src/buffers/room.rs @@ -20,12 +20,13 @@ use std::sync::{Arc, OnceLock}; use chrono::{offset::Local, DateTime}; use color_eyre::eyre::{eyre, Result}; use eyeball_im::VectorDiff; +use futures::future::OptionFuture; +use futures::stream::FuturesUnordered; use futures::{FutureExt, Stream, StreamExt}; use itertools::Itertools; use matrix_sdk::async_trait; use matrix_sdk::ruma::{OwnedRoomId, RoomId}; -use matrix_sdk::Client; -use matrix_sdk::Room; +use matrix_sdk::{Client, DisplayName, Room, RoomInfo}; use matrix_sdk_ui::timeline::{ BackPaginationStatus, PaginationOptions, RoomExt, Timeline, TimelineItem, TimelineItemKind, VirtualTimelineItem, @@ -80,11 +81,14 @@ macro_rules! text { pub struct SingleClientRoomBuffer { room_id: OwnedRoomId, client: Client, + items: imbl::vector::Vector<(OwnedBufferItemContent, Prerender)>, // TODO: get rid of this trait object, we know it's matrix_sdk_ui::timeline::TimelineStream - stream: Box>>> + Send + Sync + Unpin>, + timeline_stream: Box>>> + Send + Sync + Unpin>, timeline: Arc, back_pagination_request: AtomicU16, + + roominfo_subscriber: eyeball::Subscriber, } impl DynamicUsage for SingleClientRoomBuffer { @@ -111,17 +115,26 @@ impl DynamicUsage for SingleClientRoomBuffer { } impl SingleClientRoomBuffer { - async fn poll_updates(&mut self) { + /// If the room info was updated, returns it + async fn poll_updates(&mut self) -> Option { let back_pagination_request = self.back_pagination_request.swap(0, Ordering::Relaxed); if back_pagination_request > 0 { - // TODO: run this concurrently with stream.next() below + // TODO: run this concurrently with timeline_stream.next() below self.spawn_back_pagination(back_pagination_request).await; } - if let Some(changes) = self.stream.next().await { - for change in changes { - change - .map(|item| (self.format_timeline_item(item), Prerender::new())) - .apply(&mut self.items); + tokio::select! { + biased; + roominfo = self.roominfo_subscriber.next() => { + Some(roominfo.expect("reached end of roominfo_subscriber")) + }, + changes = self.timeline_stream.next() => { + let Some(changes) = changes else { return None; }; + for change in changes { + change + .map(|item| (self.format_timeline_item(item), Prerender::new())) + .apply(&mut self.items); + } + None } } } @@ -315,6 +328,9 @@ impl SingleClientRoomBuffer { pub struct RoomBuffer { room_id: OwnedRoomId, + initialized_roominfo: bool, + display_name: Option, + // It's unlikely users will join the same room with more than one account; // avoid a useless heap allocation for the usual case. buffers: SmallVec<[SingleClientRoomBuffer; 1]>, @@ -341,29 +357,27 @@ impl DynamicUsage for RoomBuffer { impl RoomBuffer { pub async fn new(initial_client: Client, room_id: OwnedRoomId) -> Result { let mut self_ = RoomBuffer { - buffers: SmallVec::new(), room_id, + initialized_roominfo: false, + display_name: None, + buffers: SmallVec::new(), }; - self_.add_client(initial_client).await?; + self_.add_client(initial_client.clone()).await?; Ok(self_) } pub async fn add_client(&mut self, client: Client) -> Result<()> { - let timeline = client - .get_room(&self.room_id) - .ok_or_else(|| { - tracing::error!( - "Adding {:?} for {:?}, but it does not know this room ({} other clients know this room)", - client, - self.room_id, - self.buffers.len() - ); - eyre!("Unknown room {} for client {:?}", self.room_id, client) - })? - .timeline_builder() - .build() - .await; - let (items, stream) = timeline.subscribe_batched().await; + let room = client.get_room(&self.room_id).ok_or_else(|| { + tracing::error!( + "Adding {:?} for {:?}, but it does not know this room ({} other clients know this room)", + client, + self.room_id, + self.buffers.len() + ); + eyre!("Unknown room {} for client {:?}", self.room_id, client) + })?; + let timeline = room.timeline_builder().build().await; + let (items, timeline_stream) = timeline.subscribe_batched().await; tracing::info!( "Added client for {}, initial items: {:?}", self.room_id, @@ -377,8 +391,9 @@ impl RoomBuffer { .into_iter() .map(|item| (text!("Initial item: {:#?}", item), Prerender::new())) .collect(), - stream: Box::new(stream), + timeline_stream: Box::new(timeline_stream), back_pagination_request: AtomicU16::new(0), + roominfo_subscriber: room.subscribe_info(), }); Ok(()) } @@ -388,13 +403,20 @@ impl RoomBuffer { impl Buffer for RoomBuffer { fn short_name(&self) -> String { self - .buffers - .iter() - .flat_map(|buf| buf.client.get_room(&self.room_id)) - .flat_map(|room| room.canonical_alias()) // TODO: .display_name() is better, but async :( - .map(|alias| alias.as_str().to_owned()) - .next() - .unwrap_or(self.room_id.as_str().to_owned()) + .display_name + .as_ref() + .map(|dn| dn.to_string()) + .unwrap_or_else(|| { + self + .buffers + .iter() + .flat_map(|buf| buf.client.get_room(&self.room_id)) + .flat_map(|room| room.canonical_alias()) + .map(|alias| alias.as_str().to_owned()) + .next() + .unwrap_or(self.room_id.as_str().to_owned()) + .clone() + }) } fn room_id(&self) -> Option<&RoomId> { @@ -402,13 +424,86 @@ impl Buffer for RoomBuffer { } async fn poll_updates(&mut self) { - futures::future::join_all( - self - .buffers - .iter_mut() - .map(SingleClientRoomBuffer::poll_updates), - ) - .await; + let room = if self.initialized_roominfo { + None + } else { + Some( + self + .buffers + .first() + .unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id)) + .client + .get_room(&self.room_id) + .unwrap_or_else(|| panic!("Room {} disappeared", self.room_id)), + ) + }; + let mut roominfo_update = self + .buffers + .iter_mut() + .map(|buf| async { + let roominfo = buf.poll_updates().await; + (buf, roominfo) + }) + .collect::>(); + + let res = if self.initialized_roominfo { + roominfo_update.next().await + } else { + let room = room.unwrap(); // Set above iff !initialized_roominfo + + // Poll both roominfo_update and the display name, so we start getting new messages + // early if the user is in a hurry + tokio::select! { + biased; + dn = room.display_name() => { + match dn { + Ok(dn) => { + tracing::debug!("Initialized display name for {}: {}", self.room_id, dn); + self.display_name = Some(dn); + }, + Err(e) => { + tracing::error!( + "Error while resolving initial display name for {}: {}", + self.room_id, + e + ); + }, + }; + self.initialized_roominfo = true; + None + } + res = roominfo_update.next() => { res } + } + }; + let Some((buf, roominfo)) = res else { + return; + }; + let Some(roominfo) = roominfo else { + return; + }; + let Some(room) = buf.client.get_room(&self.room_id) else { + return; + }; + + // This blocks any other update to the room while matrix-sdk computes the display + // name. Let's pretend it's a feature. (Although it's probably pretty bad when + // joined to the room with multiple clients and they all get the same update and + // have to resolve the name one by one...) + self.display_name = match room.display_name().await { + Ok(dn) => { + tracing::debug!("Setting display name for {}: {}", self.room_id, dn); + Some(dn) + }, + Err(e) => { + tracing::error!( + "Error while resolving display name for {}: {}", + self.room_id, + e + ); + None + }, + }; + self.initialized_roominfo = true; } fn content<'a>(&'a self) -> Box> + 'a> {