961 lines
34 KiB
Rust
961 lines
34 KiB
Rust
/*
|
|
* Copyright (C) 2023 Valentin Lorentz
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License version 3,
|
|
* as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU Affero General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
use std::collections::HashSet;
|
|
use std::hash::{Hash, Hasher};
|
|
use std::sync::atomic::{AtomicU16, Ordering};
|
|
use std::sync::{Arc, OnceLock};
|
|
|
|
use chrono::{offset::Local, DateTime};
|
|
use color_eyre::eyre::{eyre, Result};
|
|
use eyeball_im::VectorDiff;
|
|
use futures::future::OptionFuture;
|
|
use futures::stream::FuturesUnordered;
|
|
use futures::{FutureExt, Stream, StreamExt};
|
|
use itertools::Itertools;
|
|
use matrix_sdk::async_trait;
|
|
use matrix_sdk::deserialized_responses::SyncOrStrippedState;
|
|
use matrix_sdk::room::ParentSpace;
|
|
use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
|
|
use matrix_sdk::ruma::events::room::message::{
|
|
FormattedBody, MessageFormat, MessageType, TextMessageEventContent,
|
|
};
|
|
use matrix_sdk::ruma::events::space::child::SpaceChildEventContent;
|
|
use matrix_sdk::ruma::events::RoomAccountDataEvent;
|
|
use matrix_sdk::ruma::events::SyncStateEvent;
|
|
use matrix_sdk::ruma::html::{HtmlSanitizerMode, RemoveReplyFallback};
|
|
use matrix_sdk::ruma::{OwnedEventId, OwnedRoomId, RoomId};
|
|
use matrix_sdk::sync::UnreadNotificationsCount;
|
|
use matrix_sdk::{Client, DisplayName, Room, RoomInfo};
|
|
use matrix_sdk_ui::timeline::{
|
|
BackPaginationStatus, PaginationOptions, RoomExt, Timeline, TimelineItem, TimelineItemKind,
|
|
VirtualTimelineItem,
|
|
};
|
|
use memuse::DynamicUsage;
|
|
use ratatui::text::Text;
|
|
use smallvec::SmallVec;
|
|
use sorted_vec::SortedVec;
|
|
use tokio::sync::oneshot;
|
|
|
|
use super::{Buffer, BufferId, BufferItem, BufferItemContent, BufferSortKey, FullyReadStatus};
|
|
use crate::config::Config;
|
|
use crate::html::{escape_html, format_html, markup_colored_by_mxid};
|
|
use crate::widgets::Prerender;
|
|
|
|
/// Like [`BufferItemContent`] but owned.
|
|
#[derive(Debug, Clone)]
|
|
pub enum OwnedBufferItemContent {
|
|
SimpleText {
|
|
event_id: Option<OwnedEventId>,
|
|
is_message: bool,
|
|
text: Text<'static>,
|
|
},
|
|
Text {
|
|
event_id: Option<OwnedEventId>,
|
|
is_message: bool,
|
|
/// `(padding, content)` pairs
|
|
text: Vec<(String, Text<'static>)>,
|
|
},
|
|
Divider(String),
|
|
Empty,
|
|
}
|
|
|
|
impl DynamicUsage for OwnedBufferItemContent {
|
|
fn dynamic_usage(&self) -> usize {
|
|
std::mem::size_of::<Self>()
|
|
+ match self {
|
|
OwnedBufferItemContent::SimpleText { text, .. } => {
|
|
text.width() * text.height() * 4 // FIXME: rough approx
|
|
},
|
|
OwnedBufferItemContent::Text { text, .. } => {
|
|
text
|
|
.iter()
|
|
.map(|item| item.1.width() * item.1.height() * 4)
|
|
.sum() // FIXME: rough approx
|
|
},
|
|
OwnedBufferItemContent::Divider(s) => s.dynamic_usage(),
|
|
OwnedBufferItemContent::Empty => 0,
|
|
}
|
|
}
|
|
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
|
|
let (min, max) = match self {
|
|
OwnedBufferItemContent::SimpleText { text, .. } => {
|
|
let area = text.width() * text.height();
|
|
(area, Some(area * 12)) // FIXME: rough approx
|
|
},
|
|
OwnedBufferItemContent::Text { text, .. } => {
|
|
let area = text
|
|
.iter()
|
|
.map(|item| item.1.width() * item.1.height())
|
|
.sum();
|
|
(area, Some(area * 12)) // FIXME: rough approx
|
|
},
|
|
OwnedBufferItemContent::Divider(s) => s.dynamic_usage_bounds(),
|
|
OwnedBufferItemContent::Empty => (0, Some(0)),
|
|
};
|
|
match max {
|
|
Some(max) => (
|
|
min + std::mem::size_of::<Self>(),
|
|
Some(max + std::mem::size_of::<Self>()),
|
|
),
|
|
None => (min + std::mem::size_of::<Self>(), None),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct SingleClientRoomBuffer {
|
|
config: Arc<Config>,
|
|
room_id: OwnedRoomId,
|
|
client: Client,
|
|
|
|
/// Newest first
|
|
items: imbl::vector::Vector<(Option<u64>, OwnedBufferItemContent, Prerender)>,
|
|
latest_event_id: Option<OwnedEventId>,
|
|
// TODO: get rid of this trait object, we know it's matrix_sdk_ui::timeline::TimelineStream
|
|
timeline_stream: Box<dyn Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + Send + Sync + Unpin>,
|
|
timeline: Arc<Timeline>,
|
|
back_pagination_request: AtomicU16,
|
|
|
|
roominfo_subscriber: eyeball::Subscriber<RoomInfo>,
|
|
}
|
|
|
|
impl DynamicUsage for SingleClientRoomBuffer {
|
|
fn dynamic_usage(&self) -> usize {
|
|
std::mem::size_of::<Self>()
|
|
+ self
|
|
.items
|
|
.iter()
|
|
.map(|(_, content, prerender)| {
|
|
std::mem::size_of::<Option<u64>>() + content.dynamic_usage() + prerender.dynamic_usage()
|
|
})
|
|
.sum::<usize>()
|
|
}
|
|
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
|
|
let self_usage = std::mem::size_of::<Self>();
|
|
crate::utils::sum_memory_bounds(
|
|
self_usage,
|
|
Some(self_usage),
|
|
self
|
|
.items
|
|
.iter()
|
|
.map(|item| item.0.dynamic_usage_bounds())
|
|
.chain(self.items.iter().map(|item| item.1.dynamic_usage_bounds()))
|
|
.chain([(
|
|
std::mem::size_of::<Option<u64>>() * self.items.len(),
|
|
Some(std::mem::size_of::<Option<u64>>() * self.items.len()),
|
|
)]),
|
|
)
|
|
}
|
|
}
|
|
|
|
impl SingleClientRoomBuffer {
|
|
/// If the room info was updated, returns it
|
|
async fn poll_updates(&mut self) -> Option<RoomInfo> {
|
|
let back_pagination_request = self.back_pagination_request.swap(0, Ordering::Relaxed);
|
|
if back_pagination_request > 0 {
|
|
// TODO: run this concurrently with timeline_stream.next() below
|
|
self.spawn_back_pagination(back_pagination_request).await;
|
|
}
|
|
tokio::select! {
|
|
biased;
|
|
roominfo = self.roominfo_subscriber.next() => {
|
|
Some(roominfo.expect("reached end of roominfo_subscriber"))
|
|
},
|
|
changes = self.timeline_stream.next() => {
|
|
let Some(changes) = changes else { return None; };
|
|
for change in changes {
|
|
change
|
|
.map(|item| (Some(item.unique_id()), self.format_timeline_item(item), Prerender::new()))
|
|
.apply(&mut self.items);
|
|
}
|
|
self.latest_event_id = self.timeline.latest_event().await.and_then(|e| e.event_id().map(ToOwned::to_owned));
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
fn format_timeline_item(&self, tl_item: impl AsRef<TimelineItem>) -> OwnedBufferItemContent {
|
|
match tl_item.as_ref().kind() {
|
|
TimelineItemKind::Event(event) => {
|
|
use matrix_sdk_ui::timeline::TimelineItemContent::*;
|
|
|
|
// Like `format!()` but returns OwnedBufferItemContent::Text, with is_message=false
|
|
macro_rules! text {
|
|
($prefix: expr, $($tokens:tt)*) => {
|
|
OwnedBufferItemContent::Text {
|
|
event_id: event.event_id().map(ToOwned::to_owned),
|
|
is_message: false,
|
|
text: format_html(&self.config, $prefix, &format!($($tokens)*))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Like `format!()` but returns OwnedBufferItemContent::Text, with is_message=true
|
|
macro_rules! msg {
|
|
($prefix: expr, $($tokens:tt)*) => {
|
|
OwnedBufferItemContent::Text {
|
|
event_id: event.event_id().map(ToOwned::to_owned),
|
|
is_message: true,
|
|
text: format_html(&self.config, $prefix, &format!($($tokens)*))
|
|
}
|
|
}
|
|
}
|
|
|
|
let sender = escape_html(
|
|
event
|
|
.sender()
|
|
.as_str()
|
|
.strip_prefix('@')
|
|
.expect("missing @ prefix"),
|
|
);
|
|
let sender = markup_colored_by_mxid(&sender, &sender);
|
|
match event.content() {
|
|
Message(message) => match message.msgtype() {
|
|
MessageType::Text(TextMessageEventContent {
|
|
formatted: Some(formatted),
|
|
..
|
|
}) => {
|
|
let mut formatted = formatted.to_owned();
|
|
formatted.sanitize_html(HtmlSanitizerMode::Strict, RemoveReplyFallback::No);
|
|
assert_eq!(
|
|
formatted.format,
|
|
MessageFormat::Html,
|
|
"FormattedBody::sanitize_html set type to {:?} instead of Html",
|
|
formatted.format
|
|
);
|
|
msg!(" ", "<{}> {}", sender, formatted.body)
|
|
},
|
|
MessageType::Text(TextMessageEventContent { body, .. }) => {
|
|
msg!(" ", "<{}> {}", sender, escape_html(body))
|
|
},
|
|
_ =>
|
|
// Fallback to text body
|
|
{
|
|
msg!(
|
|
" ",
|
|
"<{}> {}",
|
|
sender,
|
|
escape_html(&message.body().replace('\n', "\n "))
|
|
)
|
|
},
|
|
},
|
|
RedactedMessage => msg!("xx ", "<{}> [redacted]", sender),
|
|
Sticker(sticker) => msg!(
|
|
"st ",
|
|
"<{}> {}",
|
|
sender,
|
|
escape_html(&sticker.content().body)
|
|
),
|
|
UnableToDecrypt(_) => text!("xx ", "<{}> [unable to decrypt]", sender),
|
|
MembershipChange(change) => {
|
|
use matrix_sdk_ui::timeline::MembershipChange::*;
|
|
if change.user_id() == event.sender() {
|
|
let Some(change_kind) = change.change() else {
|
|
return text!(
|
|
"--- ",
|
|
"{} made incomprehensible changes to themselves",
|
|
sender
|
|
);
|
|
};
|
|
match change_kind {
|
|
None => text!(
|
|
"--- ",
|
|
"{} made no discernable changes to themselves",
|
|
sender
|
|
),
|
|
Error => text!(
|
|
"xxx ",
|
|
"{} made a change to themselves that made matrix-sdk-ui error",
|
|
sender
|
|
),
|
|
Joined => text!("--> ", "{} joined", sender),
|
|
Left => text!("<-- ", "{} left", sender),
|
|
Banned => text!("-x- ", "{} banned themselves", sender),
|
|
Unbanned => text!("-x- ", "{} unbanned themselves", sender),
|
|
Kicked => text!("<!- ", "{} kicked themselves", sender),
|
|
Invited => text!("-o- ", "{} invited themselves", sender),
|
|
KickedAndBanned => text!("<!x ", "{} kicked and banned themselves", sender),
|
|
InvitationAccepted => text!("-o> ", "{} accepted an invite", sender),
|
|
InvitationRejected => text!("-ox ", "{} rejected an invite", sender),
|
|
InvitationRevoked => text!("--x ", "{} revoked an invite", sender),
|
|
Knocked => text!("-?> ", "{} knocked", sender),
|
|
KnockAccepted => text!("-?o ", "{} accepted a knock", sender),
|
|
KnockRetracted => text!("-?x ", "{} retracted a knock", sender),
|
|
KnockDenied => text!("-?x ", "{} denied a knock", sender),
|
|
NotImplemented => text!(
|
|
"xxx ",
|
|
"{} made a change matrix-sdk-ui does not support yet",
|
|
sender
|
|
),
|
|
}
|
|
} else if change.user_id() == "" {
|
|
let Some(change_kind) = change.change() else {
|
|
return text!("--- ", "{} made incomprehensible changes", sender);
|
|
};
|
|
match change_kind {
|
|
None => text!("--- ", "{} made no discernable changes", sender),
|
|
Error => text!(
|
|
"xxx ",
|
|
"{} made a change that made matrix-sdk-ui error",
|
|
sender
|
|
),
|
|
Joined | Left | Banned | Unbanned | Kicked | Invited | KickedAndBanned
|
|
| InvitationAccepted | InvitationRejected | InvitationRevoked | Knocked
|
|
| KnockAccepted | KnockRetracted | KnockDenied => {
|
|
text!(
|
|
"--> ",
|
|
"{} made a non-sensical change: {:?}",
|
|
sender,
|
|
change
|
|
)
|
|
},
|
|
NotImplemented => text!(
|
|
"xxx ",
|
|
"{} made a change matrix-sdk-ui does not support yet",
|
|
sender
|
|
),
|
|
}
|
|
} else {
|
|
let target = escape_html(
|
|
change
|
|
.user_id()
|
|
.as_str()
|
|
.strip_prefix('@')
|
|
.expect("missing @ prefix"),
|
|
);
|
|
let target = markup_colored_by_mxid(&target, &target);
|
|
let Some(change_kind) = change.change() else {
|
|
return text!(
|
|
"--- ",
|
|
"{} made incomprehensible changes to {}",
|
|
sender,
|
|
target
|
|
);
|
|
};
|
|
match change_kind {
|
|
None => text!(
|
|
"--- ",
|
|
"{} made no discernable changes to {}",
|
|
sender,
|
|
target
|
|
),
|
|
Error => text!(
|
|
"xxx ",
|
|
"{} made a change to {} that made matrix-sdk-ui error",
|
|
sender,
|
|
target
|
|
),
|
|
Joined | Left => text!(
|
|
"--> ",
|
|
"{} made a non-sensical change to {}: {:?}",
|
|
sender,
|
|
target,
|
|
change
|
|
),
|
|
Banned => text!("-x- ", "{} banned {}", sender, target),
|
|
Unbanned => text!("-x- ", "{} unbanned {}", sender, target),
|
|
Kicked => text!("<!- ", "{} kicked {}", sender, target),
|
|
Invited => text!("-o- ", "{} invited {}", sender, target),
|
|
KickedAndBanned => text!("<!x ", "{} kicked and banned {}", sender, target),
|
|
InvitationAccepted => text!("-o> ", "{} accepted an invite to {}", sender, target),
|
|
InvitationRejected => text!("-ox ", "{} rejected an invite to {}", sender, target),
|
|
InvitationRevoked => text!("--x ", "{} revoked an invite to {}", sender, target),
|
|
Knocked => text!("-?> ", "{} made {} knock", sender, target),
|
|
KnockAccepted => text!("-?o ", "{} accepted {}'s knock", sender, target),
|
|
KnockRetracted => text!("-?x ", "{} retracted {}'s knock", sender, target),
|
|
KnockDenied => text!("-?x ", "{} denied {}'s knock", sender, target),
|
|
NotImplemented => text!(
|
|
"xxx ",
|
|
"{} made a change to {} that matrix-sdk-ui does not support yet",
|
|
sender,
|
|
target
|
|
),
|
|
}
|
|
}
|
|
},
|
|
|
|
ProfileChange(_) => text!("--- ", "{} updated their profile", sender),
|
|
OtherState(state) => {
|
|
if state.state_key() == "" {
|
|
text!("--- ", "{} changed the room: {:?}", sender, state.content())
|
|
} else {
|
|
text!(
|
|
"--- ",
|
|
"{} changed {}: {:?}",
|
|
sender,
|
|
escape_html(state.state_key()),
|
|
state.content() // TODO: escape html
|
|
)
|
|
}
|
|
},
|
|
FailedToParseMessageLike { event_type, error } => text!(
|
|
"xxx ",
|
|
"{} sent a {} message that made matrix-sdk-ui error: {:?}",
|
|
sender,
|
|
escape_html(&event_type.to_string()),
|
|
error // TODO: escape html
|
|
),
|
|
FailedToParseState {
|
|
event_type,
|
|
state_key,
|
|
error,
|
|
} => {
|
|
text!(
|
|
"xxx ",
|
|
"{} made a {} change to {} that made matrix-sdk-ui error: {:?}",
|
|
sender,
|
|
escape_html(&event_type.to_string()),
|
|
escape_html(state_key),
|
|
error // TODO: escape html
|
|
)
|
|
},
|
|
Poll(_) => text!("-?- ", "{} acted on a poll", sender),
|
|
}
|
|
},
|
|
TimelineItemKind::Virtual(VirtualTimelineItem::ReadMarker) => {
|
|
OwnedBufferItemContent::Divider("read marker".to_string())
|
|
},
|
|
TimelineItemKind::Virtual(VirtualTimelineItem::DayDivider(day_divider)) => {
|
|
match day_divider.to_system_time() {
|
|
Some(system_time) => {
|
|
let datetime: DateTime<Local> = system_time.into();
|
|
OwnedBufferItemContent::Divider(format!("{}", datetime.format("%Y-%m-%d")))
|
|
},
|
|
None => {
|
|
tracing::warn!("Could not convert {:?} to system time", day_divider);
|
|
OwnedBufferItemContent::Empty
|
|
},
|
|
}
|
|
},
|
|
}
|
|
}
|
|
|
|
async fn spawn_back_pagination(&self, num: u16) {
|
|
let room_id = self.room_id.clone();
|
|
let timeline = self.timeline.clone();
|
|
let mut back_pagination_status = timeline.back_pagination_status();
|
|
match back_pagination_status.get() {
|
|
BackPaginationStatus::Paginating | BackPaginationStatus::TimelineStartReached => {
|
|
// We are already waiting for a backfill from the server
|
|
},
|
|
BackPaginationStatus::Idle => {
|
|
tokio::spawn(async move {
|
|
tracing::debug!("Starting pagination for {}", room_id);
|
|
timeline
|
|
.paginate_backwards(matrix_sdk_ui::timeline::PaginationOptions::until_num_items(
|
|
num, num,
|
|
))
|
|
.await
|
|
.unwrap_or_else(|e| tracing::error!("Failed to paginate {} backward: {}", room_id, e));
|
|
tracing::debug!("Ended pagination for {}", room_id);
|
|
});
|
|
|
|
// Wait for the task we just spawned to change the status, so we don't risk starting
|
|
// a new one in the meantime
|
|
back_pagination_status.next().await;
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct RoomBuffer {
|
|
config: Arc<Config>,
|
|
|
|
room_id: OwnedRoomId,
|
|
|
|
computed_roominfo: Option<ComputedRoomInfo>,
|
|
|
|
/// Set while we are already computing roominfo, to block other updates to the room.
|
|
/// Holds the next `self.computed_roominfo`.
|
|
update_roominfo_rx: Option<oneshot::Receiver<ComputedRoomInfo>>,
|
|
|
|
// It's unlikely users will join the same room with more than one account;
|
|
// avoid a useless heap allocation for the usual case.
|
|
buffers: SmallVec<[SingleClientRoomBuffer; 1]>,
|
|
}
|
|
|
|
impl DynamicUsage for RoomBuffer {
|
|
fn dynamic_usage(&self) -> usize {
|
|
std::mem::size_of::<Self>()
|
|
+ self
|
|
.buffers
|
|
.iter()
|
|
.map(|buf| buf.dynamic_usage())
|
|
.sum::<usize>()
|
|
}
|
|
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
|
|
crate::utils::sum_memory_bounds(
|
|
std::mem::size_of::<Self>(),
|
|
Some(std::mem::size_of::<Self>()),
|
|
self.buffers.iter().map(|buf| buf.dynamic_usage_bounds()),
|
|
)
|
|
}
|
|
}
|
|
|
|
impl RoomBuffer {
|
|
pub async fn new(
|
|
config: Arc<Config>,
|
|
initial_client: Client,
|
|
room_id: OwnedRoomId,
|
|
) -> Result<Self> {
|
|
let mut self_ = RoomBuffer {
|
|
config,
|
|
room_id,
|
|
computed_roominfo: None,
|
|
update_roominfo_rx: None,
|
|
buffers: SmallVec::new(),
|
|
};
|
|
self_.add_client(initial_client.clone()).await?;
|
|
Ok(self_)
|
|
}
|
|
|
|
pub async fn add_client(&mut self, client: Client) -> Result<()> {
|
|
let room = client.get_room(&self.room_id).ok_or_else(|| {
|
|
tracing::error!(
|
|
"Adding {:?} for {:?}, but it does not know this room ({} other clients know this room)",
|
|
client,
|
|
self.room_id,
|
|
self.buffers.len()
|
|
);
|
|
eyre!("Unknown room {} for client {:?}", self.room_id, client)
|
|
})?;
|
|
let timeline = room.timeline_builder().build().await;
|
|
let (items, timeline_stream) = timeline.subscribe_batched().await;
|
|
tracing::info!(
|
|
"Added client for {}, initial items: {:?}",
|
|
self.room_id,
|
|
items
|
|
);
|
|
self.buffers.push(SingleClientRoomBuffer {
|
|
config: self.config.clone(),
|
|
room_id: self.room_id.clone(),
|
|
client,
|
|
timeline: Arc::new(timeline),
|
|
items: items // FIXME: it's always empty. why?
|
|
.into_iter()
|
|
.map(|item| {
|
|
(
|
|
None,
|
|
OwnedBufferItemContent::SimpleText {
|
|
event_id: None,
|
|
is_message: false,
|
|
text: Text::raw(format!("Initial item: {:#?}", item)),
|
|
},
|
|
Prerender::new(),
|
|
)
|
|
})
|
|
.collect(),
|
|
latest_event_id: None,
|
|
timeline_stream: Box::new(timeline_stream),
|
|
back_pagination_request: AtomicU16::new(0),
|
|
roominfo_subscriber: room.subscribe_info(),
|
|
});
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Buffer for RoomBuffer {
|
|
fn short_name(&self) -> String {
|
|
self
|
|
.computed_roominfo
|
|
.as_ref()
|
|
.and_then(|roominfo| roominfo.display_name.as_ref())
|
|
.map(|dn| dn.to_string())
|
|
.unwrap_or_else(|| {
|
|
self
|
|
.buffers
|
|
.iter()
|
|
.flat_map(|buf| buf.client.get_room(&self.room_id))
|
|
.flat_map(|room| room.canonical_alias())
|
|
.map(|alias| alias.as_str().to_owned())
|
|
.next()
|
|
.unwrap_or(self.room_id.as_str().to_owned())
|
|
.clone()
|
|
})
|
|
}
|
|
|
|
fn id(&self) -> BufferId {
|
|
BufferId::Room(self.room_id.to_owned())
|
|
}
|
|
|
|
fn parent(&self) -> Option<BufferId> {
|
|
self
|
|
.computed_roominfo
|
|
.as_ref()
|
|
.and_then(|roominfo| roominfo.parent.as_ref())
|
|
.map(|parent| BufferId::Room(parent.to_owned()))
|
|
}
|
|
fn children(&self) -> Option<SortedVec<(BufferSortKey, BufferId)>> {
|
|
self
|
|
.computed_roominfo
|
|
.as_ref()
|
|
.and_then(|roominfo| roominfo.children.as_ref())
|
|
.map(|children: &SortedVec<_>| {
|
|
let children = children
|
|
.iter()
|
|
.map(|(sort_key, room_id)| (sort_key.clone(), BufferId::Room(room_id.to_owned())))
|
|
.collect();
|
|
// This is safe because the map above preserves order
|
|
unsafe { SortedVec::from_sorted(children) }
|
|
})
|
|
}
|
|
|
|
async fn poll_updates(&mut self) {
|
|
if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() {
|
|
// Initialize the computed_roominfo
|
|
let (tx, rx) = oneshot::channel();
|
|
self.update_roominfo_rx = Some(rx);
|
|
let room = self
|
|
.buffers
|
|
.first()
|
|
.expect("missing sub-buffer")
|
|
.client
|
|
.get_room(&self.room_id)
|
|
.expect("client missing room");
|
|
let roominfo_hash = hash_roominfo(room.clone_info());
|
|
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
|
|
}
|
|
|
|
// Poll for new updates from the server
|
|
let mut roominfo_update = self
|
|
.buffers
|
|
.iter_mut()
|
|
.map(|buf| async {
|
|
let roominfo = buf.poll_updates().await;
|
|
(buf, roominfo)
|
|
})
|
|
.collect::<FuturesUnordered<_>>();
|
|
|
|
// Wait for result of processing new updates
|
|
let update_roominfo_rx = async {
|
|
match self.update_roominfo_rx.as_mut() {
|
|
Some(update_roominfo_rx) => Some(
|
|
update_roominfo_rx
|
|
.await
|
|
.unwrap_or_else(|_| panic!("update_roominfo_rx for {} was dropped", self.room_id)),
|
|
),
|
|
None => std::future::pending().await,
|
|
}
|
|
};
|
|
|
|
tokio::select! {
|
|
new_update = roominfo_update.next() => {
|
|
tracing::trace!("Got update for {}", self.room_id);
|
|
let Some((buf, Some(roominfo))) = new_update else {
|
|
tracing::trace!("None change to {}", self.room_id);
|
|
return;
|
|
};
|
|
let roominfo_hash = hash_roominfo(roominfo);
|
|
if let Some(computed_roominfo) = self.computed_roominfo.as_ref() {
|
|
if computed_roominfo.roominfo_hash == roominfo_hash {
|
|
// No visible change, exit early
|
|
tracing::trace!("no visible change to {}", self.room_id);
|
|
return;
|
|
}
|
|
}
|
|
tracing::trace!("visible change to {}", self.room_id);
|
|
let (tx, rx) = oneshot::channel();
|
|
self.update_roominfo_rx = Some(rx); // Prevents spawning more update_room_info() calls while
|
|
// this one is still running
|
|
let room = buf.client.get_room(&self.room_id).expect("client missing room");
|
|
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
|
|
},
|
|
computed_roominfo = update_roominfo_rx => {
|
|
// Received result of a computation
|
|
tracing::trace!("Got computed_roominfo {}", self.room_id);
|
|
self.computed_roominfo = computed_roominfo;
|
|
self.update_roominfo_rx = None;
|
|
}
|
|
}
|
|
}
|
|
|
|
fn content<'a>(&'a self) -> Box<dyn ExactSizeIterator<Item = BufferItem<'a>> + 'a> {
|
|
// TODO: merge buffers, etc.
|
|
Box::new(
|
|
self
|
|
.buffers
|
|
.first()
|
|
.unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id))
|
|
.items
|
|
.iter()
|
|
.rev()
|
|
.map(|(line_id, line, prerender)| BufferItem {
|
|
content: match line {
|
|
OwnedBufferItemContent::SimpleText { text, .. } => {
|
|
BufferItemContent::SimpleText(text.clone())
|
|
},
|
|
OwnedBufferItemContent::Text { text, .. } => BufferItemContent::Text(text.clone()),
|
|
OwnedBufferItemContent::Divider(text) => BufferItemContent::Divider(Text::raw(text)),
|
|
OwnedBufferItemContent::Empty => BufferItemContent::Empty,
|
|
},
|
|
prerender,
|
|
unique_id: *line_id,
|
|
}),
|
|
)
|
|
}
|
|
|
|
fn request_back_pagination(&self, num: u16) {
|
|
// TODO: pick a client at random instead of just the first one, etc.
|
|
self
|
|
.buffers
|
|
.first()
|
|
.unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id))
|
|
.back_pagination_request
|
|
.fetch_max(num, Ordering::Relaxed);
|
|
}
|
|
|
|
fn unread_notification_counts(&self) -> UnreadNotificationsCount {
|
|
let mut acc = UnreadNotificationsCount::default();
|
|
for buf in &self.buffers {
|
|
let Some(room) = buf.client.get_room(&self.room_id) else {
|
|
continue;
|
|
};
|
|
let UnreadNotificationsCount {
|
|
highlight_count,
|
|
notification_count,
|
|
} = room.unread_notification_counts();
|
|
acc.highlight_count += highlight_count;
|
|
acc.notification_count += notification_count;
|
|
}
|
|
acc
|
|
}
|
|
|
|
fn fully_read(&self) -> FullyReadStatus {
|
|
match self
|
|
.computed_roominfo
|
|
.as_ref()
|
|
.and_then(|ri| ri.fully_read_at.as_ref())
|
|
{
|
|
None => FullyReadStatus::All, // Unknown, assume it's read for now, we'll probably find out later
|
|
Some(fully_read_at) => {
|
|
// Iterate through all buffers, and if any buffer's last event is not the one where
|
|
// the m.fully_read marker is at, assume the buffer is not read.
|
|
//
|
|
// Note that, if we joined the room with more than one account and two accounts'
|
|
// timelines aren't perfectly in sync, then the marker is guaranteed not to match
|
|
// the last event of at least one.
|
|
// Technically, this is a bug. However this shouldn't be a big deal in practice as
|
|
// such a desync usually means the message is very recent, hence unread. (Or it's
|
|
// a federation desync, and we should probably tell the user about it somehow. But
|
|
// this is out of scope for this function.)
|
|
let mut status = FullyReadStatus::All;
|
|
for buf in &self.buffers {
|
|
let mut reached_fully_read = false;
|
|
for item in buf.items.iter().rev() {
|
|
#[allow(clippy::single_match)]
|
|
match item {
|
|
(
|
|
_id,
|
|
OwnedBufferItemContent::Text {
|
|
event_id: Some(event_id),
|
|
is_message,
|
|
..
|
|
},
|
|
_prerender,
|
|
) => {
|
|
if Some(event_id) == fully_read_at.as_ref() {
|
|
reached_fully_read = true;
|
|
break;
|
|
}
|
|
if *is_message {
|
|
status = FullyReadStatus::min(status, FullyReadStatus::Not);
|
|
// We found an unread message, no point in looking further
|
|
break;
|
|
} else {
|
|
status = FullyReadStatus::min(status, FullyReadStatus::OnlyMessages);
|
|
}
|
|
},
|
|
_ => {}, // Ignore everything else, they can't be marked read or unread
|
|
}
|
|
}
|
|
if !reached_fully_read && status != FullyReadStatus::Not {
|
|
// The latest read event is not in the buffer, request a small backfill
|
|
// to try to reach it, unless:
|
|
// * we already backfilled a lot so we don't exhaust the available memory
|
|
// * we already reached an unread message, in which case it's pointless to
|
|
// backfill as we already know there are unread messages.
|
|
if buf.items.len() < self.config.history.max_prefetch_unread
|
|
&& self.config.history.prefetch_unread > 0
|
|
{
|
|
buf
|
|
.back_pagination_request
|
|
.fetch_max(self.config.history.prefetch_unread, Ordering::Relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
status
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn compute_room_info(
|
|
room: Room,
|
|
done_tx: oneshot::Sender<ComputedRoomInfo>,
|
|
roominfo_hash: u64,
|
|
) {
|
|
let (parent, children, fully_read_at, display_name) = tokio::join!(
|
|
async {
|
|
// Get parent space
|
|
match room.parent_spaces().await {
|
|
Ok(parents) => {
|
|
parents
|
|
.flat_map_unordered(None, |parent| {
|
|
futures::stream::iter(match parent {
|
|
Ok(ParentSpace::Reciprocal(space)) | Ok(ParentSpace::WithPowerlevel(space)) => {
|
|
Some(space.room_id().to_owned())
|
|
},
|
|
Ok(ParentSpace::Unverifiable(_)) | Ok(ParentSpace::Illegitimate(_)) => None,
|
|
Err(e) => {
|
|
tracing::error!(
|
|
"Failed to confirm parent space of {}: {:?}",
|
|
room.room_id(),
|
|
e
|
|
);
|
|
None
|
|
},
|
|
})
|
|
})
|
|
.next() // Get the first one to be ready. TODO: take the canonical space
|
|
.await
|
|
},
|
|
Err(e) => {
|
|
tracing::error!("Failed to get parent spaces of {}: {:?}", room.room_id(), e);
|
|
None
|
|
},
|
|
}
|
|
},
|
|
async {
|
|
// Get child rooms
|
|
match room
|
|
.get_state_events_static::<SpaceChildEventContent>()
|
|
.await
|
|
{
|
|
Ok(child_events) => {
|
|
Some(SortedVec::from_unsorted(
|
|
child_events
|
|
.into_iter()
|
|
// Extract state key (ie. the child's id) and sender
|
|
.flat_map(|parent_event| {
|
|
match parent_event.deserialize() {
|
|
Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => Some((
|
|
BufferSortKey {
|
|
explicit_order: e.content.order,
|
|
origin_server_ts: Some(e.origin_server_ts),
|
|
},
|
|
e.state_key.to_owned(),
|
|
)),
|
|
Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
|
|
Ok(SyncOrStrippedState::Stripped(e)) => Some((
|
|
BufferSortKey {
|
|
explicit_order: None,
|
|
origin_server_ts: None, // Why don't stripped events have origin_server_ts!?
|
|
},
|
|
e.state_key.to_owned(),
|
|
)),
|
|
|
|
Err(_) => None, // Ignore deserialization errors
|
|
}
|
|
})
|
|
.collect(),
|
|
))
|
|
},
|
|
Err(e) => {
|
|
tracing::error!("Failed to get child rooms of {}: {:?}", room.room_id(), e);
|
|
None
|
|
},
|
|
}
|
|
},
|
|
async {
|
|
// Get m.fully_read
|
|
match room.account_data_static::<FullyReadEventContent>().await {
|
|
Ok(Some(event)) => match event.deserialize() {
|
|
Ok(RoomAccountDataEvent {
|
|
content: FullyReadEventContent { event_id, .. },
|
|
}) => Some(Some(event_id)),
|
|
Err(e) => {
|
|
tracing::error!(
|
|
"Failed to deserialize m.fully_read for {}: {:?}",
|
|
room.room_id(),
|
|
e
|
|
);
|
|
None
|
|
},
|
|
},
|
|
Ok(None) => Some(None), // Nothing in the room is read
|
|
Err(e) => {
|
|
tracing::error!("Failed to get m.fully_read for {}: {:?}", room.room_id(), e);
|
|
None
|
|
},
|
|
}
|
|
},
|
|
async {
|
|
match room.display_name().await {
|
|
Ok(dn) => {
|
|
tracing::debug!("Setting display name for {}: {}", room.room_id(), dn);
|
|
Some(dn)
|
|
},
|
|
Err(e) => {
|
|
tracing::error!(
|
|
"Error while resolving display name for {}: {}",
|
|
room.room_id(),
|
|
e
|
|
);
|
|
None
|
|
},
|
|
}
|
|
}
|
|
);
|
|
if let Some(parent) = parent.as_ref() {
|
|
tracing::debug!("{} has parent {}", room.room_id(), parent);
|
|
} else {
|
|
tracing::debug!("{} has no parent", room.room_id());
|
|
}
|
|
let computed_roominfo = ComputedRoomInfo {
|
|
display_name,
|
|
parent,
|
|
children,
|
|
fully_read_at,
|
|
roominfo_hash,
|
|
};
|
|
done_tx
|
|
.send(computed_roominfo)
|
|
.unwrap_or_else(|e| tracing::error!("update_room_info could not notify it was done: {:?}", e));
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct ComputedRoomInfo {
|
|
display_name: Option<DisplayName>,
|
|
parent: Option<OwnedRoomId>,
|
|
children: Option<SortedVec<(BufferSortKey, OwnedRoomId)>>,
|
|
/// `None` if unknown, `Some(None)` if nothing is read, `Some(Some(last_fully_read_event_id))`
|
|
/// otherwise
|
|
fully_read_at: Option<Option<OwnedEventId>>,
|
|
roominfo_hash: u64,
|
|
}
|
|
|
|
fn hash_roominfo(mut roominfo: RoomInfo) -> u64 {
|
|
let mut roominfo_hasher = std::collections::hash_map::DefaultHasher::new();
|
|
roominfo.set_prev_batch(None); // Changes often and doesn't affect UI
|
|
|
|
// TODO: make matrix-sdk and ruma implement Hash on their structures instead
|
|
// of using this awful workaround
|
|
format!("{:?}", roominfo).hash(&mut roominfo_hasher);
|
|
roominfo_hasher.finish()
|
|
}
|