Merge branch 'main' into images

This commit is contained in:
2024-02-11 10:56:19 +01:00
3 changed files with 76 additions and 106 deletions

View File

@ -64,7 +64,7 @@ impl Buffer for LogBuffer {
BufferId::Log BufferId::Log
} }
async fn poll_updates(&mut self) { async fn poll_updates_once(&mut self) {
let line = self let line = self
.receiver .receiver
.recv() .recv()

View File

@ -138,7 +138,7 @@ pub trait Buffer: Send + Sync + memuse::DynamicUsage {
None None
} }
/// Returns if there are any updates to apply. /// Returns if there are any updates to apply.
async fn poll_updates(&mut self); async fn poll_updates_once(&mut self);
fn content<'a>(&'a self) -> Box<dyn ExactSizeIterator<Item = BufferItem<'a>> + 'a>; fn content<'a>(&'a self) -> Box<dyn ExactSizeIterator<Item = BufferItem<'a>> + 'a>;
/// Called when the user is being showned the oldest items this buffer returned. /// Called when the user is being showned the oldest items this buffer returned.
/// ///
@ -205,7 +205,7 @@ impl Buffers {
let next_reorder = self.next_reorder; let next_reorder = self.next_reorder;
let mut updates_future = self let mut updates_future = self
.iter_mut() .iter_mut()
.map(|buf| buf.poll_updates()) .map(|buf| buf.poll_updates_once())
.collect::<FuturesUnordered<_>>(); .collect::<FuturesUnordered<_>>();
let reorder_future = async { let reorder_future = async {
@ -217,7 +217,7 @@ impl Buffers {
select! { select! {
res = updates_future.next() => { res = updates_future.next() => {
res.expect("poll_updates reached the end of the never-ending stream"); res.expect("poll_updates_once reached the end of the never-ending stream");
false false
}, },
_ = reorder_future => { _ = reorder_future => {

View File

@ -516,12 +516,12 @@ impl SingleClientRoomBuffer {
} }
async fn back_pagination_worker(self) { async fn back_pagination_worker(self) {
let room_id = self.room_id.clone();
let mut request_subscriber = self.back_pagination_request.subscribe(); let mut request_subscriber = self.back_pagination_request.subscribe();
let mut num_requested_items = request_subscriber.next_now(); let mut num_requested_items = request_subscriber.next_now();
loop { loop {
if num_requested_items != 0 { if num_requested_items != 0 {
let inner = self.inner.read().await; let inner = self.inner.read().await;
let room_id = self.room_id.clone();
let timeline = inner.timeline.clone(); let timeline = inner.timeline.clone();
let back_pagination_status = timeline.back_pagination_status(); let back_pagination_status = timeline.back_pagination_status();
match back_pagination_status.get() { match back_pagination_status.get() {
@ -566,14 +566,10 @@ pub struct RoomBuffer {
room_id: Arc<OwnedRoomId>, room_id: Arc<OwnedRoomId>,
computed_roominfo: Option<ComputedRoomInfo>, computed_roominfo: ComputedRoomInfo,
poll_updates_rx: UnboundedReceiver<(SingleClientRoomBuffer, RoomInfo)>, computed_roominfo_rx: UnboundedReceiver<ComputedRoomInfo>,
poll_updates_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>, roominfo_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>,
/// Set while we are already computing roominfo, to block other updates to the room.
/// Holds the next `self.computed_roominfo`.
update_roominfo_rx: Option<oneshot::Receiver<ComputedRoomInfo>>,
// It's unlikely users will join the same room with more than one account; // It's unlikely users will join the same room with more than one account;
// avoid a useless heap allocation for the usual case. // avoid a useless heap allocation for the usual case.
@ -604,16 +600,26 @@ impl RoomBuffer {
initial_client: Client, initial_client: Client,
room_id: Arc<OwnedRoomId>, room_id: Arc<OwnedRoomId>,
) -> Result<Self> { ) -> Result<Self> {
let (poll_updates_tx, poll_updates_rx) = unbounded_channel(); let (roominfo_tx, roominfo_rx) = unbounded_channel();
let (computed_roominfo_tx, computed_roominfo_rx) = unbounded_channel();
let room = initial_client
.get_room(&room_id)
.expect("client missing room");
let initial_roominfo_hash = hash_roominfo(room.clone_info());
let computed_roominfo = compute_room_info(room, initial_roominfo_hash).await;
tokio::task::spawn(update_roominfo_worker(room_id.clone(), initial_roominfo_hash, roominfo_rx, computed_roominfo_tx));
let mut self_ = RoomBuffer { let mut self_ = RoomBuffer {
config, config,
room_id, room_id,
computed_roominfo: None, computed_roominfo_rx,
poll_updates_tx, roominfo_tx,
poll_updates_rx, computed_roominfo,
update_roominfo_rx: None,
buffers: SmallVec::new(), buffers: SmallVec::new(),
}; };
self_.add_client(initial_client.clone()).await?; self_.add_client(initial_client.clone()).await?;
Ok(self_) Ok(self_)
} }
@ -669,7 +675,7 @@ impl RoomBuffer {
}; };
self.buffers.push(buffer.clone()); self.buffers.push(buffer.clone());
tokio::task::spawn(buffer.clone().poll_updates( tokio::task::spawn(buffer.clone().poll_updates(
self.poll_updates_tx.clone(), self.roominfo_tx.clone(),
write_items, write_items,
room.subscribe_info(), room.subscribe_info(),
timeline_stream, timeline_stream,
@ -678,73 +684,6 @@ impl RoomBuffer {
Ok(()) Ok(())
} }
async fn poll_updates_once(&mut self) {
if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() {
// Initialize the computed_roominfo
let (tx, rx) = oneshot::channel();
self.update_roominfo_rx = Some(rx);
let room = self
.buffers
.first()
.expect("missing sub-buffer")
.client
.get_room(&self.room_id)
.expect("client missing room");
let roominfo_hash = hash_roominfo(room.clone_info());
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
}
// Wait for result of processing new updates
let update_roominfo_rx = async {
match self.update_roominfo_rx.as_mut() {
Some(update_roominfo_rx) => Some(
update_roominfo_rx
.await
.unwrap_or_else(|_| panic!("update_roominfo_rx for {} was dropped", self.room_id)),
),
None => std::future::pending().await,
}
};
let mut poll_buffers_timeline: FuturesUnordered<_> = self
.buffers
.iter_mut()
.map(|buf| async {
buf.items_rx.changed().await.unwrap();
buf.items = buf.items_rx.borrow().clone(); // O(1) clone because imbl::Vector
})
.collect();
tokio::select! {
new_update = self.poll_updates_rx.recv() => {
tracing::trace!("Got update for {}", self.room_id);
let (buf, roominfo) = new_update.expect("poll_updates_rx closed");
let roominfo_hash = hash_roominfo(roominfo);
if let Some(computed_roominfo) = self.computed_roominfo.as_ref() {
if computed_roominfo.roominfo_hash == roominfo_hash {
// No visible change, exit early
tracing::trace!("no visible change to {}", self.room_id);
return;
}
}
tracing::trace!("visible change to {}", self.room_id);
let (tx, rx) = oneshot::channel();
self.update_roominfo_rx = Some(rx); // Prevents spawning more update_room_info() calls while
// this one is still running
let room = buf.client.get_room(&self.room_id).expect("client missing room");
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
},
computed_roominfo = update_roominfo_rx => {
// Received result of a computation
tracing::trace!("Got computed_roominfo {}", self.room_id);
self.computed_roominfo = computed_roominfo;
self.update_roominfo_rx = None;
}
_ = poll_buffers_timeline.next() => {
}
}
}
} }
#[async_trait] #[async_trait]
@ -752,8 +691,7 @@ impl Buffer for RoomBuffer {
fn short_name(&self) -> String { fn short_name(&self) -> String {
self self
.computed_roominfo .computed_roominfo
.as_ref() .display_name.as_ref()
.and_then(|roominfo| roominfo.display_name.as_ref())
.map(|dn| dn.to_string()) .map(|dn| dn.to_string())
.unwrap_or_else(|| { .unwrap_or_else(|| {
self self
@ -776,15 +714,13 @@ impl Buffer for RoomBuffer {
fn parent(&self) -> Option<BufferId> { fn parent(&self) -> Option<BufferId> {
self self
.computed_roominfo .computed_roominfo
.as_ref() .parent.as_ref()
.and_then(|roominfo| roominfo.parent.as_ref())
.map(|parent| BufferId::Room(parent.clone())) .map(|parent| BufferId::Room(parent.clone()))
} }
fn children(&self) -> Option<SortedVec<(BufferSortKey, BufferId)>> { fn children(&self) -> Option<SortedVec<(BufferSortKey, BufferId)>> {
self self
.computed_roominfo .computed_roominfo
.as_ref() .children.as_ref()
.and_then(|roominfo| roominfo.children.as_ref())
.map(|children: &SortedVec<_>| { .map(|children: &SortedVec<_>| {
let children = children let children = children
.iter() .iter()
@ -794,10 +730,24 @@ impl Buffer for RoomBuffer {
unsafe { SortedVec::from_sorted(children) } unsafe { SortedVec::from_sorted(children) }
}) })
} }
async fn poll_updates_once(&mut self) {
let mut poll_buffers_timeline: FuturesUnordered<_> = self
.buffers
.iter_mut()
.map(|buf| async {
buf.items_rx.changed().await.unwrap();
buf.items = buf.items_rx.borrow().clone(); // O(1) clone because imbl::Vector
})
.collect();
async fn poll_updates(&mut self) { tokio::select! {
while !self.buffers.is_empty() { computed_roominfo = self.computed_roominfo_rx.recv() => {
self.poll_updates_once().await; // Received result of a computation
tracing::trace!("Got computed_roominfo {}", self.room_id);
self.computed_roominfo = computed_roominfo.expect("reached end of computed_roominfo_rx");
}
_ = poll_buffers_timeline.next() => {
}
} }
} }
@ -865,8 +815,7 @@ impl Buffer for RoomBuffer {
fn fully_read(&self) -> FullyReadStatus { fn fully_read(&self) -> FullyReadStatus {
match self match self
.computed_roominfo .computed_roominfo
.as_ref() .fully_read_at.as_ref()
.and_then(|ri| ri.fully_read_at.as_ref())
{ {
None => FullyReadStatus::All, // Unknown, assume it's read for now, we'll probably find out later None => FullyReadStatus::All, // Unknown, assume it's read for now, we'll probably find out later
Some(fully_read_at) => { Some(fully_read_at) => {
@ -937,11 +886,7 @@ impl Buffer for RoomBuffer {
} }
} }
async fn compute_room_info( async fn compute_room_info(room: Room, roominfo_hash: u64) -> ComputedRoomInfo {
room: Room,
done_tx: oneshot::Sender<ComputedRoomInfo>,
roominfo_hash: u64,
) {
let (parent, children, fully_read_at, display_name) = tokio::join!( let (parent, children, fully_read_at, display_name) = tokio::join!(
async { async {
// Get parent space // Get parent space
@ -1060,16 +1005,13 @@ async fn compute_room_info(
} else { } else {
tracing::debug!("{} has no parent", room.room_id()); tracing::debug!("{} has no parent", room.room_id());
} }
let computed_roominfo = ComputedRoomInfo { ComputedRoomInfo {
display_name, display_name,
parent, parent,
children, children,
fully_read_at, fully_read_at,
roominfo_hash, roominfo_hash,
}; }
done_tx
.send(computed_roominfo)
.unwrap_or_else(|e| tracing::error!("update_room_info could not notify it was done: {:?}", e));
} }
#[derive(Debug)] #[derive(Debug)]
@ -1092,3 +1034,31 @@ fn hash_roominfo(mut roominfo: RoomInfo) -> u64 {
format!("{:?}", roominfo).hash(&mut roominfo_hasher); format!("{:?}", roominfo).hash(&mut roominfo_hasher);
roominfo_hasher.finish() roominfo_hasher.finish()
} }
/// Gets updates from [`SingleClientRoomBuffer`], and aggregates them for a
/// [`RoomBuffer`]
async fn update_roominfo_worker(
room_id: Arc<OwnedRoomId>,
initial_roominfo_hash: u64,
mut roominfo_rx: UnboundedReceiver<(SingleClientRoomBuffer, RoomInfo)>,
computed_roominfo_tx: UnboundedSender<ComputedRoomInfo>,
) {
let mut last_roominfo_hash = initial_roominfo_hash;
loop {
tracing::trace!("Got update for {}", room_id);
let (buf, roominfo) = roominfo_rx.recv().await.expect("poll_updates_rx closed");
let roominfo_hash = hash_roominfo(roominfo);
if last_roominfo_hash == roominfo_hash {
// No visible change, exit early
tracing::trace!("no visible change to {}", room_id);
continue;
}
last_roominfo_hash = roominfo_hash;
tracing::trace!("visible change to {}", room_id);
let room = buf
.client
.get_room(&room_id)
.expect("client missing room");
computed_roominfo_tx.send(compute_room_info(room, roominfo_hash).await).expect("failed to send to computed_roominfo_tx");
}
}