Move ComputedRoomInfo creation to its own long-running task
This should remove another source of Futures churn. Plus, it makes the code simpler as computed_roominfo doesn't need to be an Option anymore in this new design.
This commit is contained in:
@ -482,12 +482,12 @@ impl SingleClientRoomBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn back_pagination_worker(self) {
|
async fn back_pagination_worker(self) {
|
||||||
|
let room_id = self.room_id.clone();
|
||||||
let mut request_subscriber = self.back_pagination_request.subscribe();
|
let mut request_subscriber = self.back_pagination_request.subscribe();
|
||||||
let mut num_requested_items = request_subscriber.next_now();
|
let mut num_requested_items = request_subscriber.next_now();
|
||||||
loop {
|
loop {
|
||||||
if num_requested_items != 0 {
|
if num_requested_items != 0 {
|
||||||
let inner = self.inner.read().await;
|
let inner = self.inner.read().await;
|
||||||
let room_id = self.room_id.clone();
|
|
||||||
let timeline = inner.timeline.clone();
|
let timeline = inner.timeline.clone();
|
||||||
let back_pagination_status = timeline.back_pagination_status();
|
let back_pagination_status = timeline.back_pagination_status();
|
||||||
match back_pagination_status.get() {
|
match back_pagination_status.get() {
|
||||||
@ -532,14 +532,10 @@ pub struct RoomBuffer {
|
|||||||
|
|
||||||
room_id: Arc<OwnedRoomId>,
|
room_id: Arc<OwnedRoomId>,
|
||||||
|
|
||||||
computed_roominfo: Option<ComputedRoomInfo>,
|
computed_roominfo: ComputedRoomInfo,
|
||||||
|
|
||||||
poll_updates_rx: UnboundedReceiver<(SingleClientRoomBuffer, RoomInfo)>,
|
computed_roominfo_rx: UnboundedReceiver<ComputedRoomInfo>,
|
||||||
poll_updates_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>,
|
roominfo_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>,
|
||||||
|
|
||||||
/// Set while we are already computing roominfo, to block other updates to the room.
|
|
||||||
/// Holds the next `self.computed_roominfo`.
|
|
||||||
update_roominfo_rx: Option<oneshot::Receiver<ComputedRoomInfo>>,
|
|
||||||
|
|
||||||
// It's unlikely users will join the same room with more than one account;
|
// It's unlikely users will join the same room with more than one account;
|
||||||
// avoid a useless heap allocation for the usual case.
|
// avoid a useless heap allocation for the usual case.
|
||||||
@ -570,16 +566,26 @@ impl RoomBuffer {
|
|||||||
initial_client: Client,
|
initial_client: Client,
|
||||||
room_id: Arc<OwnedRoomId>,
|
room_id: Arc<OwnedRoomId>,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let (poll_updates_tx, poll_updates_rx) = unbounded_channel();
|
let (roominfo_tx, roominfo_rx) = unbounded_channel();
|
||||||
|
let (computed_roominfo_tx, computed_roominfo_rx) = unbounded_channel();
|
||||||
|
|
||||||
|
let room = initial_client
|
||||||
|
.get_room(&room_id)
|
||||||
|
.expect("client missing room");
|
||||||
|
let initial_roominfo_hash = hash_roominfo(room.clone_info());
|
||||||
|
let computed_roominfo = compute_room_info(room, initial_roominfo_hash).await;
|
||||||
|
|
||||||
|
tokio::task::spawn(update_roominfo_worker(room_id.clone(), initial_roominfo_hash, roominfo_rx, computed_roominfo_tx));
|
||||||
|
|
||||||
let mut self_ = RoomBuffer {
|
let mut self_ = RoomBuffer {
|
||||||
config,
|
config,
|
||||||
room_id,
|
room_id,
|
||||||
computed_roominfo: None,
|
computed_roominfo_rx,
|
||||||
poll_updates_tx,
|
roominfo_tx,
|
||||||
poll_updates_rx,
|
computed_roominfo,
|
||||||
update_roominfo_rx: None,
|
|
||||||
buffers: SmallVec::new(),
|
buffers: SmallVec::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
self_.add_client(initial_client.clone()).await?;
|
self_.add_client(initial_client.clone()).await?;
|
||||||
Ok(self_)
|
Ok(self_)
|
||||||
}
|
}
|
||||||
@ -630,7 +636,7 @@ impl RoomBuffer {
|
|||||||
};
|
};
|
||||||
self.buffers.push(buffer.clone());
|
self.buffers.push(buffer.clone());
|
||||||
tokio::task::spawn(buffer.clone().poll_updates(
|
tokio::task::spawn(buffer.clone().poll_updates(
|
||||||
self.poll_updates_tx.clone(),
|
self.roominfo_tx.clone(),
|
||||||
write_items,
|
write_items,
|
||||||
room.subscribe_info(),
|
room.subscribe_info(),
|
||||||
timeline_stream,
|
timeline_stream,
|
||||||
@ -646,8 +652,7 @@ impl Buffer for RoomBuffer {
|
|||||||
fn short_name(&self) -> String {
|
fn short_name(&self) -> String {
|
||||||
self
|
self
|
||||||
.computed_roominfo
|
.computed_roominfo
|
||||||
.as_ref()
|
.display_name.as_ref()
|
||||||
.and_then(|roominfo| roominfo.display_name.as_ref())
|
|
||||||
.map(|dn| dn.to_string())
|
.map(|dn| dn.to_string())
|
||||||
.unwrap_or_else(|| {
|
.unwrap_or_else(|| {
|
||||||
self
|
self
|
||||||
@ -670,15 +675,13 @@ impl Buffer for RoomBuffer {
|
|||||||
fn parent(&self) -> Option<BufferId> {
|
fn parent(&self) -> Option<BufferId> {
|
||||||
self
|
self
|
||||||
.computed_roominfo
|
.computed_roominfo
|
||||||
.as_ref()
|
.parent.as_ref()
|
||||||
.and_then(|roominfo| roominfo.parent.as_ref())
|
|
||||||
.map(|parent| BufferId::Room(parent.clone()))
|
.map(|parent| BufferId::Room(parent.clone()))
|
||||||
}
|
}
|
||||||
fn children(&self) -> Option<SortedVec<(BufferSortKey, BufferId)>> {
|
fn children(&self) -> Option<SortedVec<(BufferSortKey, BufferId)>> {
|
||||||
self
|
self
|
||||||
.computed_roominfo
|
.computed_roominfo
|
||||||
.as_ref()
|
.children.as_ref()
|
||||||
.and_then(|roominfo| roominfo.children.as_ref())
|
|
||||||
.map(|children: &SortedVec<_>| {
|
.map(|children: &SortedVec<_>| {
|
||||||
let children = children
|
let children = children
|
||||||
.iter()
|
.iter()
|
||||||
@ -688,35 +691,7 @@ impl Buffer for RoomBuffer {
|
|||||||
unsafe { SortedVec::from_sorted(children) }
|
unsafe { SortedVec::from_sorted(children) }
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn poll_updates_once(&mut self) {
|
async fn poll_updates_once(&mut self) {
|
||||||
if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() {
|
|
||||||
// Initialize the computed_roominfo
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
self.update_roominfo_rx = Some(rx);
|
|
||||||
let room = self
|
|
||||||
.buffers
|
|
||||||
.first()
|
|
||||||
.expect("missing sub-buffer")
|
|
||||||
.client
|
|
||||||
.get_room(&self.room_id)
|
|
||||||
.expect("client missing room");
|
|
||||||
let roominfo_hash = hash_roominfo(room.clone_info());
|
|
||||||
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for result of processing new updates
|
|
||||||
let update_roominfo_rx = async {
|
|
||||||
match self.update_roominfo_rx.as_mut() {
|
|
||||||
Some(update_roominfo_rx) => Some(
|
|
||||||
update_roominfo_rx
|
|
||||||
.await
|
|
||||||
.unwrap_or_else(|_| panic!("update_roominfo_rx for {} was dropped", self.room_id)),
|
|
||||||
),
|
|
||||||
None => std::future::pending().await,
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut poll_buffers_timeline: FuturesUnordered<_> = self
|
let mut poll_buffers_timeline: FuturesUnordered<_> = self
|
||||||
.buffers
|
.buffers
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
@ -727,29 +702,10 @@ impl Buffer for RoomBuffer {
|
|||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
new_update = self.poll_updates_rx.recv() => {
|
computed_roominfo = self.computed_roominfo_rx.recv() => {
|
||||||
tracing::trace!("Got update for {}", self.room_id);
|
|
||||||
let (buf, roominfo) = new_update.expect("poll_updates_rx closed");
|
|
||||||
let roominfo_hash = hash_roominfo(roominfo);
|
|
||||||
if let Some(computed_roominfo) = self.computed_roominfo.as_ref() {
|
|
||||||
if computed_roominfo.roominfo_hash == roominfo_hash {
|
|
||||||
// No visible change, exit early
|
|
||||||
tracing::trace!("no visible change to {}", self.room_id);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tracing::trace!("visible change to {}", self.room_id);
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
self.update_roominfo_rx = Some(rx); // Prevents spawning more update_room_info() calls while
|
|
||||||
// this one is still running
|
|
||||||
let room = buf.client.get_room(&self.room_id).expect("client missing room");
|
|
||||||
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
|
|
||||||
},
|
|
||||||
computed_roominfo = update_roominfo_rx => {
|
|
||||||
// Received result of a computation
|
// Received result of a computation
|
||||||
tracing::trace!("Got computed_roominfo {}", self.room_id);
|
tracing::trace!("Got computed_roominfo {}", self.room_id);
|
||||||
self.computed_roominfo = computed_roominfo;
|
self.computed_roominfo = computed_roominfo.expect("reached end of computed_roominfo_rx");
|
||||||
self.update_roominfo_rx = None;
|
|
||||||
}
|
}
|
||||||
_ = poll_buffers_timeline.next() => {
|
_ = poll_buffers_timeline.next() => {
|
||||||
}
|
}
|
||||||
@ -819,8 +775,7 @@ impl Buffer for RoomBuffer {
|
|||||||
fn fully_read(&self) -> FullyReadStatus {
|
fn fully_read(&self) -> FullyReadStatus {
|
||||||
match self
|
match self
|
||||||
.computed_roominfo
|
.computed_roominfo
|
||||||
.as_ref()
|
.fully_read_at.as_ref()
|
||||||
.and_then(|ri| ri.fully_read_at.as_ref())
|
|
||||||
{
|
{
|
||||||
None => FullyReadStatus::All, // Unknown, assume it's read for now, we'll probably find out later
|
None => FullyReadStatus::All, // Unknown, assume it's read for now, we'll probably find out later
|
||||||
Some(fully_read_at) => {
|
Some(fully_read_at) => {
|
||||||
@ -891,11 +846,7 @@ impl Buffer for RoomBuffer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn compute_room_info(
|
async fn compute_room_info(room: Room, roominfo_hash: u64) -> ComputedRoomInfo {
|
||||||
room: Room,
|
|
||||||
done_tx: oneshot::Sender<ComputedRoomInfo>,
|
|
||||||
roominfo_hash: u64,
|
|
||||||
) {
|
|
||||||
let (parent, children, fully_read_at, display_name) = tokio::join!(
|
let (parent, children, fully_read_at, display_name) = tokio::join!(
|
||||||
async {
|
async {
|
||||||
// Get parent space
|
// Get parent space
|
||||||
@ -1014,16 +965,13 @@ async fn compute_room_info(
|
|||||||
} else {
|
} else {
|
||||||
tracing::debug!("{} has no parent", room.room_id());
|
tracing::debug!("{} has no parent", room.room_id());
|
||||||
}
|
}
|
||||||
let computed_roominfo = ComputedRoomInfo {
|
ComputedRoomInfo {
|
||||||
display_name,
|
display_name,
|
||||||
parent,
|
parent,
|
||||||
children,
|
children,
|
||||||
fully_read_at,
|
fully_read_at,
|
||||||
roominfo_hash,
|
roominfo_hash,
|
||||||
};
|
}
|
||||||
done_tx
|
|
||||||
.send(computed_roominfo)
|
|
||||||
.unwrap_or_else(|e| tracing::error!("update_room_info could not notify it was done: {:?}", e));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -1046,3 +994,31 @@ fn hash_roominfo(mut roominfo: RoomInfo) -> u64 {
|
|||||||
format!("{:?}", roominfo).hash(&mut roominfo_hasher);
|
format!("{:?}", roominfo).hash(&mut roominfo_hasher);
|
||||||
roominfo_hasher.finish()
|
roominfo_hasher.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Gets updates from [`SingleClientRoomBuffer`], and aggregates them for a
|
||||||
|
/// [`RoomBuffer`]
|
||||||
|
async fn update_roominfo_worker(
|
||||||
|
room_id: Arc<OwnedRoomId>,
|
||||||
|
initial_roominfo_hash: u64,
|
||||||
|
mut roominfo_rx: UnboundedReceiver<(SingleClientRoomBuffer, RoomInfo)>,
|
||||||
|
computed_roominfo_tx: UnboundedSender<ComputedRoomInfo>,
|
||||||
|
) {
|
||||||
|
let mut last_roominfo_hash = initial_roominfo_hash;
|
||||||
|
loop {
|
||||||
|
tracing::trace!("Got update for {}", room_id);
|
||||||
|
let (buf, roominfo) = roominfo_rx.recv().await.expect("poll_updates_rx closed");
|
||||||
|
let roominfo_hash = hash_roominfo(roominfo);
|
||||||
|
if last_roominfo_hash == roominfo_hash {
|
||||||
|
// No visible change, exit early
|
||||||
|
tracing::trace!("no visible change to {}", room_id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
last_roominfo_hash = roominfo_hash;
|
||||||
|
tracing::trace!("visible change to {}", room_id);
|
||||||
|
let room = buf
|
||||||
|
.client
|
||||||
|
.get_room(&room_id)
|
||||||
|
.expect("client missing room");
|
||||||
|
computed_roominfo_tx.send(compute_room_info(room, roominfo_hash).await).expect("failed to send to computed_roominfo_tx");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user