Compare commits
4 Commits
f3dbd43783
...
79635e1e4e
Author | SHA1 | Date | |
---|---|---|---|
Val Lorentz | 79635e1e4e | ||
Val Lorentz | 11bf79e95e | ||
Val Lorentz | e627e3b9a2 | ||
Val Lorentz | fa6c171916 |
|
@ -30,7 +30,7 @@ jobs:
|
|||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: minimal
|
||||
toolchain: '1.73.0'
|
||||
toolchain: '1.76.0'
|
||||
components: rustfmt
|
||||
override: true
|
||||
|
||||
|
@ -55,7 +55,7 @@ jobs:
|
|||
rust:
|
||||
- nightly
|
||||
- beta
|
||||
- '1.73.0'
|
||||
- '1.76.0'
|
||||
|
||||
features:
|
||||
- ''
|
||||
|
|
21
Cargo.toml
21
Cargo.toml
|
@ -8,11 +8,19 @@ repository = "https://git.tf/val/ratatrix"
|
|||
authors = ["Val Lorentz"]
|
||||
license = "AGPL-3.0-only AND MIT"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
[features]
|
||||
default = [
|
||||
"images",
|
||||
]
|
||||
images = [
|
||||
"dep:image",
|
||||
"dep:ratatui-image",
|
||||
]
|
||||
|
||||
[dependencies]
|
||||
|
||||
# Async
|
||||
async-std = { version = "1.12.0", features = ["tokio1", "attributes"] }
|
||||
futures = "0.3.28"
|
||||
tokio = { version = "1.32.0", features = ["full"] }
|
||||
tokio-util = "0.7.9"
|
||||
|
@ -64,8 +72,8 @@ sorted-vec = "0.8.3"
|
|||
eyeball = "0.8.7" # data structures observer returned by matrix-sdk-ui
|
||||
eyeball-im = "0.4.2" # immutable data structures observer returned by matrix-sdk-ui
|
||||
imbl = "2.0" # ditto
|
||||
matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "5c37acb81ce624d83be54b5140cd60399b556fb2", features = ["eyre", "markdown"] }
|
||||
matrix-sdk-ui = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "5c37acb81ce624d83be54b5140cd60399b556fb2" }
|
||||
matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "7bbd07cc7703051e5276b87f26bf88b6239d64a4", features = ["eyre", "markdown"] }
|
||||
matrix-sdk-ui = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "7bbd07cc7703051e5276b87f26bf88b6239d64a4" }
|
||||
#matrix-sdk = { path = "../matrix-rust-sdk/crates/matrix-sdk", features = ["eyre", "markdown"] }
|
||||
#matrix-sdk-ui = { path = "../matrix-rust-sdk/crates/matrix-sdk-ui" }
|
||||
|
||||
|
@ -73,13 +81,18 @@ matrix-sdk-ui = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev
|
|||
ansi-to-tui = "3.1.0"
|
||||
chrono = "0.4.31"
|
||||
crossterm = { version = "0.27.0", features = ["serde", "event-stream"] }
|
||||
ratatui = { version = "0.24.0", features = ["serde", "macros"] }
|
||||
ratatui = { version = "0.26.0", features = ["serde", "macros"] }
|
||||
strip-ansi-escapes = "0.2.0"
|
||||
tui-textarea = "0.3.0"
|
||||
unicode-width = "0.1"
|
||||
|
||||
# UI (images)
|
||||
ratatui-image = { version = "0.8.0", optional = true }
|
||||
image = { version = "0.24.8", optional = true }
|
||||
|
||||
[patch.crates-io]
|
||||
#ratatui = { path = "../ratatui", features = ["serde", "macros"] }
|
||||
eyeball-im = { path = "../eyeball/eyeball-im" }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = "1.4.0"
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
[toolchain]
|
||||
channel = "1.73.0"
|
||||
channel = "1.76.0"
|
||||
|
|
|
@ -214,7 +214,7 @@ impl App {
|
|||
let (client, sync_response) = sync_response.expect("sync_responses_rx unexpectedly closed");
|
||||
self.handle_sync_response(&action_tx, client, sync_response).await.context("Error while handling sync response")?;
|
||||
}
|
||||
poll_updates = self.buffers.poll_updates() => {
|
||||
poll_updates = self.buffers.poll_updates_once() => {
|
||||
changes_since_last_render = true;
|
||||
}
|
||||
sync_result = sync_results.next() => {
|
||||
|
|
|
@ -32,7 +32,7 @@ use crate::widgets::Prerender;
|
|||
|
||||
mod log;
|
||||
pub use log::LogBuffer;
|
||||
mod room;
|
||||
pub(crate) mod room;
|
||||
pub use room::RoomBuffer;
|
||||
|
||||
/// Maximum time before reordering the buffer list based on parent/child relationships.
|
||||
|
@ -105,6 +105,10 @@ pub enum BufferItemContent<'buf> {
|
|||
SimpleText(Text<'buf>),
|
||||
/// Pairs of `(padding, content)`
|
||||
Text(Vec<(String, Text<'buf>)>),
|
||||
#[cfg(feature = "images")]
|
||||
Image {
|
||||
image: &'buf image::DynamicImage,
|
||||
},
|
||||
Divider(Text<'buf>),
|
||||
Empty,
|
||||
}
|
||||
|
@ -196,7 +200,7 @@ impl Buffers {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn poll_updates(&mut self) {
|
||||
pub async fn poll_updates_once(&mut self) {
|
||||
let reorder_now = {
|
||||
let next_reorder = self.next_reorder;
|
||||
let mut updates_future = self
|
||||
|
|
|
@ -20,18 +20,21 @@ use std::sync::atomic::{AtomicU16, Ordering};
|
|||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
use chrono::{offset::Local, DateTime};
|
||||
use color_eyre::eyre::{eyre, Result};
|
||||
use color_eyre::eyre::{eyre, Result, WrapErr};
|
||||
use eyeball::SharedObservable;
|
||||
use eyeball_im::VectorDiff;
|
||||
use futures::future::join_all;
|
||||
use futures::future::OptionFuture;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt, Stream, StreamExt};
|
||||
use image::{DynamicImage, ImageResult};
|
||||
use itertools::Itertools;
|
||||
use matrix_sdk::async_trait;
|
||||
use matrix_sdk::deserialized_responses::SyncOrStrippedState;
|
||||
use matrix_sdk::room::ParentSpace;
|
||||
use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
|
||||
use matrix_sdk::ruma::events::room::message::{
|
||||
FormattedBody, MessageFormat, MessageType, TextMessageEventContent,
|
||||
FormattedBody, ImageMessageEventContent, MessageFormat, MessageType, TextMessageEventContent,
|
||||
};
|
||||
use matrix_sdk::ruma::events::space::child::SpaceChildEventContent;
|
||||
use matrix_sdk::ruma::events::RoomAccountDataEvent;
|
||||
|
@ -48,7 +51,9 @@ use memuse::DynamicUsage;
|
|||
use ratatui::text::Text;
|
||||
use smallvec::SmallVec;
|
||||
use sorted_vec::SortedVec;
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{Buffer, BufferId, BufferItem, BufferItemContent, BufferSortKey, FullyReadStatus};
|
||||
use crate::config::Config;
|
||||
|
@ -69,6 +74,12 @@ pub enum OwnedBufferItemContent {
|
|||
/// `(padding, content)` pairs
|
||||
text: Vec<(String, Text<'static>)>,
|
||||
},
|
||||
#[cfg(feature = "images")]
|
||||
Image {
|
||||
event_id: Option<OwnedEventId>,
|
||||
is_message: bool,
|
||||
image: DynamicImage,
|
||||
},
|
||||
Divider(String),
|
||||
Empty,
|
||||
}
|
||||
|
@ -86,6 +97,7 @@ impl DynamicUsage for OwnedBufferItemContent {
|
|||
.map(|item| item.1.width() * item.1.height() * 4)
|
||||
.sum() // FIXME: rough approx
|
||||
},
|
||||
OwnedBufferItemContent::Image { .. } => 10 * 1024 * 1024, // FIXME: that's just the maximum size
|
||||
OwnedBufferItemContent::Divider(s) => s.dynamic_usage(),
|
||||
OwnedBufferItemContent::Empty => 0,
|
||||
}
|
||||
|
@ -103,6 +115,7 @@ impl DynamicUsage for OwnedBufferItemContent {
|
|||
.sum();
|
||||
(area, Some(area * 12)) // FIXME: rough approx
|
||||
},
|
||||
OwnedBufferItemContent::Image { .. } => (0, Some(10 * 1024 * 1024)), // FIXME: that's just the bounds
|
||||
OwnedBufferItemContent::Divider(s) => s.dynamic_usage_bounds(),
|
||||
OwnedBufferItemContent::Empty => (0, Some(0)),
|
||||
};
|
||||
|
@ -116,6 +129,25 @@ impl DynamicUsage for OwnedBufferItemContent {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct SingleClientRoomBufferInner {
|
||||
latest_event_id: Option<OwnedEventId>,
|
||||
timeline: Arc<Timeline>,
|
||||
}
|
||||
|
||||
impl DynamicUsage for SingleClientRoomBufferInner {
|
||||
fn dynamic_usage(&self) -> usize {
|
||||
std::mem::size_of::<Self>()
|
||||
}
|
||||
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
|
||||
(
|
||||
std::mem::size_of::<Self>(),
|
||||
Some(std::mem::size_of::<Self>()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: use a single Arc for all the values inside
|
||||
#[derive(Clone)]
|
||||
pub struct SingleClientRoomBuffer {
|
||||
config: Arc<Config>,
|
||||
room_id: Arc<OwnedRoomId>,
|
||||
|
@ -123,31 +155,32 @@ pub struct SingleClientRoomBuffer {
|
|||
|
||||
/// Newest first
|
||||
items: imbl::vector::Vector<(Option<u64>, OwnedBufferItemContent, Prerender)>,
|
||||
latest_event_id: Option<OwnedEventId>,
|
||||
// TODO: get rid of this trait object, we know it's matrix_sdk_ui::timeline::TimelineStream
|
||||
timeline_stream: Box<dyn Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + Send + Sync + Unpin>,
|
||||
timeline: Arc<Timeline>,
|
||||
back_pagination_request: AtomicU16,
|
||||
|
||||
roominfo_subscriber: eyeball::Subscriber<RoomInfo>,
|
||||
items_rx: tokio::sync::watch::Receiver<
|
||||
imbl::vector::Vector<(Option<u64>, OwnedBufferItemContent, Prerender)>,
|
||||
>,
|
||||
back_pagination_request: SharedObservable<u16>,
|
||||
|
||||
inner: Arc<RwLock<SingleClientRoomBufferInner>>,
|
||||
}
|
||||
|
||||
impl DynamicUsage for SingleClientRoomBuffer {
|
||||
fn dynamic_usage(&self) -> usize {
|
||||
std::mem::size_of::<Self>()
|
||||
self.inner.blocking_read().dynamic_usage()
|
||||
+ self
|
||||
.items
|
||||
.iter()
|
||||
.map(|(_, content, prerender)| {
|
||||
.map(|item| {
|
||||
let (_, content, prerender) = item;
|
||||
std::mem::size_of::<Option<u64>>() + content.dynamic_usage() + prerender.dynamic_usage()
|
||||
})
|
||||
.sum::<usize>()
|
||||
}
|
||||
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
|
||||
let self_usage = std::mem::size_of::<Self>();
|
||||
let (min, max) = self.inner.blocking_read().dynamic_usage_bounds();
|
||||
crate::utils::sum_memory_bounds(
|
||||
self_usage,
|
||||
Some(self_usage),
|
||||
min,
|
||||
max,
|
||||
self
|
||||
.items
|
||||
.iter()
|
||||
|
@ -162,32 +195,60 @@ impl DynamicUsage for SingleClientRoomBuffer {
|
|||
}
|
||||
|
||||
impl SingleClientRoomBuffer {
|
||||
/// If the room info was updated, returns it
|
||||
async fn poll_updates(&mut self) -> Option<RoomInfo> {
|
||||
let back_pagination_request = self.back_pagination_request.swap(0, Ordering::Relaxed);
|
||||
if back_pagination_request > 0 {
|
||||
// TODO: run this concurrently with timeline_stream.next() below
|
||||
self.spawn_back_pagination(back_pagination_request).await;
|
||||
}
|
||||
tokio::select! {
|
||||
biased;
|
||||
roominfo = self.roominfo_subscriber.next() => {
|
||||
Some(roominfo.expect("reached end of roominfo_subscriber"))
|
||||
},
|
||||
changes = self.timeline_stream.next() => {
|
||||
let Some(changes) = changes else { return None; };
|
||||
for change in changes {
|
||||
change
|
||||
.map(|item| (Some(item.unique_id()), self.format_timeline_item(item), Prerender::new()))
|
||||
.apply(&mut self.items);
|
||||
async fn poll_updates(
|
||||
self,
|
||||
updates_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>,
|
||||
write_items: tokio::sync::watch::Sender<
|
||||
imbl::vector::Vector<(Option<u64>, OwnedBufferItemContent, Prerender)>,
|
||||
>,
|
||||
mut roominfo_subscriber: eyeball::Subscriber<RoomInfo>,
|
||||
mut timeline_stream: impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + Send + Sync + Unpin,
|
||||
) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
roominfo = roominfo_subscriber.next() => {
|
||||
match roominfo {
|
||||
Some(roominfo) => {
|
||||
updates_tx.send((self.clone(), roominfo)).expect("updates_tx closed");
|
||||
}
|
||||
None => {
|
||||
log::info!("Closing {:?} client for {}", self.client, self.room_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
changes = timeline_stream.next() => {
|
||||
if let Some(changes) = changes {
|
||||
let changes = join_all(changes.into_iter().map(|change|
|
||||
change.async_map(|tl_item| self.make_timeline_item(tl_item))
|
||||
)).await;
|
||||
write_items.send_modify(|items| for change in changes {
|
||||
change.apply(items)
|
||||
});
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.latest_event_id = inner.timeline.latest_event().await.and_then(|e| e.event_id().map(ToOwned::to_owned));
|
||||
}
|
||||
}
|
||||
self.latest_event_id = self.timeline.latest_event().await.and_then(|e| e.event_id().map(ToOwned::to_owned));
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn format_timeline_item(&self, tl_item: impl AsRef<TimelineItem>) -> OwnedBufferItemContent {
|
||||
async fn make_timeline_item(
|
||||
&self,
|
||||
tl_item: impl AsRef<TimelineItem>,
|
||||
) -> (Option<u64>, OwnedBufferItemContent, Prerender) {
|
||||
(
|
||||
Some(tl_item.as_ref().unique_id()),
|
||||
self.format_timeline_item(tl_item).await,
|
||||
Prerender::new(),
|
||||
)
|
||||
}
|
||||
|
||||
async fn format_timeline_item(
|
||||
&self,
|
||||
tl_item: impl AsRef<TimelineItem>,
|
||||
) -> OwnedBufferItemContent {
|
||||
match tl_item.as_ref().kind() {
|
||||
TimelineItemKind::Event(event) => {
|
||||
use matrix_sdk_ui::timeline::TimelineItemContent::*;
|
||||
|
@ -241,6 +302,17 @@ impl SingleClientRoomBuffer {
|
|||
MessageType::Text(TextMessageEventContent { body, .. }) => {
|
||||
msg!(" ", "<{}> {}", sender, escape_html(body))
|
||||
},
|
||||
#[cfg(feature = "images")]
|
||||
MessageType::Image(content) => {
|
||||
crate::images::decode_image(
|
||||
&self.client,
|
||||
&self.config,
|
||||
event.event_id().map(ToOwned::to_owned),
|
||||
&sender,
|
||||
content,
|
||||
)
|
||||
.await
|
||||
},
|
||||
_ =>
|
||||
// Fallback to text body
|
||||
{
|
||||
|
@ -443,31 +515,49 @@ impl SingleClientRoomBuffer {
|
|||
}
|
||||
}
|
||||
|
||||
async fn spawn_back_pagination(&self, num: u16) {
|
||||
let room_id = self.room_id.clone();
|
||||
let timeline = self.timeline.clone();
|
||||
let mut back_pagination_status = timeline.back_pagination_status();
|
||||
match back_pagination_status.get() {
|
||||
BackPaginationStatus::Paginating | BackPaginationStatus::TimelineStartReached => {
|
||||
// We are already waiting for a backfill from the server
|
||||
},
|
||||
BackPaginationStatus::Idle => {
|
||||
tokio::spawn(async move {
|
||||
tracing::debug!("Starting pagination for {}", room_id);
|
||||
timeline
|
||||
.paginate_backwards(matrix_sdk_ui::timeline::PaginationOptions::until_num_items(
|
||||
num, num,
|
||||
))
|
||||
.await
|
||||
.unwrap_or_else(|e| tracing::error!("Failed to paginate {} backward: {}", room_id, e));
|
||||
tracing::debug!("Ended pagination for {}", room_id);
|
||||
});
|
||||
|
||||
// Wait for the task we just spawned to change the status, so we don't risk starting
|
||||
// a new one in the meantime
|
||||
back_pagination_status.next().await;
|
||||
},
|
||||
async fn back_pagination_worker(self) {
|
||||
let mut request_subscriber = self.back_pagination_request.subscribe();
|
||||
let mut num_requested_items = request_subscriber.next_now();
|
||||
loop {
|
||||
if num_requested_items != 0 {
|
||||
let inner = self.inner.read().await;
|
||||
let room_id = self.room_id.clone();
|
||||
let timeline = inner.timeline.clone();
|
||||
let back_pagination_status = timeline.back_pagination_status();
|
||||
match back_pagination_status.get() {
|
||||
BackPaginationStatus::Paginating | BackPaginationStatus::TimelineStartReached => {
|
||||
// We are already waiting for a backfill from the server
|
||||
tracing::debug!("Pending pagination for {}", room_id);
|
||||
},
|
||||
BackPaginationStatus::Idle => {
|
||||
tracing::debug!("Starting pagination for {}", room_id);
|
||||
timeline
|
||||
.paginate_backwards(matrix_sdk_ui::timeline::PaginationOptions::until_num_items(
|
||||
num_requested_items,
|
||||
num_requested_items,
|
||||
))
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::error!("Failed to paginate {} backward: {}", room_id, e)
|
||||
});
|
||||
tracing::debug!("Ended pagination for {}", room_id);
|
||||
self.back_pagination_request.set(0);
|
||||
},
|
||||
}
|
||||
}
|
||||
num_requested_items = match request_subscriber.next().await {
|
||||
Some(num_requested_items) => num_requested_items,
|
||||
None => break,
|
||||
};
|
||||
if num_requested_items != 0 {
|
||||
log::debug!(
|
||||
"got back_pagination_request for {}: {}",
|
||||
self.room_id,
|
||||
num_requested_items
|
||||
);
|
||||
}
|
||||
}
|
||||
log::debug!("end of back_pagination_worker for {}", self.room_id);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -478,6 +568,9 @@ pub struct RoomBuffer {
|
|||
|
||||
computed_roominfo: Option<ComputedRoomInfo>,
|
||||
|
||||
poll_updates_rx: UnboundedReceiver<(SingleClientRoomBuffer, RoomInfo)>,
|
||||
poll_updates_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>,
|
||||
|
||||
/// Set while we are already computing roominfo, to block other updates to the room.
|
||||
/// Holds the next `self.computed_roominfo`.
|
||||
update_roominfo_rx: Option<oneshot::Receiver<ComputedRoomInfo>>,
|
||||
|
@ -511,10 +604,13 @@ impl RoomBuffer {
|
|||
initial_client: Client,
|
||||
room_id: Arc<OwnedRoomId>,
|
||||
) -> Result<Self> {
|
||||
let (poll_updates_tx, poll_updates_rx) = unbounded_channel();
|
||||
let mut self_ = RoomBuffer {
|
||||
config,
|
||||
room_id,
|
||||
computed_roominfo: None,
|
||||
poll_updates_tx,
|
||||
poll_updates_rx,
|
||||
update_roominfo_rx: None,
|
||||
buffers: SmallVec::new(),
|
||||
};
|
||||
|
@ -532,39 +628,123 @@ impl RoomBuffer {
|
|||
);
|
||||
eyre!("Unknown room {} for client {:?}", self.room_id, client)
|
||||
})?;
|
||||
let timeline = room.timeline_builder().build().await;
|
||||
let timeline = room.timeline_builder().build().await.with_context(|| {
|
||||
format!(
|
||||
"Could not get timeline for {:?} from {:?}",
|
||||
self.room_id, client
|
||||
)
|
||||
})?;
|
||||
let (items, timeline_stream) = timeline.subscribe_batched().await;
|
||||
tracing::info!(
|
||||
"Added client for {}, initial items: {:?}",
|
||||
self.room_id,
|
||||
items
|
||||
);
|
||||
self.buffers.push(SingleClientRoomBuffer {
|
||||
let items: imbl::Vector<_> = items // FIXME: it's always empty. why?
|
||||
.into_iter()
|
||||
.map(|item| {
|
||||
(
|
||||
None,
|
||||
OwnedBufferItemContent::SimpleText {
|
||||
event_id: None,
|
||||
is_message: false,
|
||||
text: Text::raw(format!("Initial item: {:#?}", item)),
|
||||
},
|
||||
Prerender::new(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
let (write_items, items_rx) = tokio::sync::watch::channel(items.clone());
|
||||
let buffer = SingleClientRoomBuffer {
|
||||
inner: Arc::new(RwLock::new(SingleClientRoomBufferInner {
|
||||
timeline: Arc::new(timeline),
|
||||
latest_event_id: None,
|
||||
})),
|
||||
config: self.config.clone(),
|
||||
room_id: self.room_id.clone(),
|
||||
client,
|
||||
timeline: Arc::new(timeline),
|
||||
items: items // FIXME: it's always empty. why?
|
||||
.into_iter()
|
||||
.map(|item| {
|
||||
(
|
||||
None,
|
||||
OwnedBufferItemContent::SimpleText {
|
||||
event_id: None,
|
||||
is_message: false,
|
||||
text: Text::raw(format!("Initial item: {:#?}", item)),
|
||||
},
|
||||
Prerender::new(),
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
latest_event_id: None,
|
||||
timeline_stream: Box::new(timeline_stream),
|
||||
back_pagination_request: AtomicU16::new(0),
|
||||
roominfo_subscriber: room.subscribe_info(),
|
||||
});
|
||||
back_pagination_request: SharedObservable::new(0),
|
||||
items,
|
||||
items_rx,
|
||||
};
|
||||
self.buffers.push(buffer.clone());
|
||||
tokio::task::spawn(buffer.clone().poll_updates(
|
||||
self.poll_updates_tx.clone(),
|
||||
write_items,
|
||||
room.subscribe_info(),
|
||||
timeline_stream,
|
||||
));
|
||||
tokio::task::spawn(buffer.back_pagination_worker());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn poll_updates_once(&mut self) {
|
||||
if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() {
|
||||
// Initialize the computed_roominfo
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.update_roominfo_rx = Some(rx);
|
||||
let room = self
|
||||
.buffers
|
||||
.first()
|
||||
.expect("missing sub-buffer")
|
||||
.client
|
||||
.get_room(&self.room_id)
|
||||
.expect("client missing room");
|
||||
let roominfo_hash = hash_roominfo(room.clone_info());
|
||||
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
|
||||
}
|
||||
|
||||
// Wait for result of processing new updates
|
||||
let update_roominfo_rx = async {
|
||||
match self.update_roominfo_rx.as_mut() {
|
||||
Some(update_roominfo_rx) => Some(
|
||||
update_roominfo_rx
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("update_roominfo_rx for {} was dropped", self.room_id)),
|
||||
),
|
||||
None => std::future::pending().await,
|
||||
}
|
||||
};
|
||||
|
||||
let mut poll_buffers_timeline: FuturesUnordered<_> = self
|
||||
.buffers
|
||||
.iter_mut()
|
||||
.map(|buf| async {
|
||||
buf.items_rx.changed().await.unwrap();
|
||||
buf.items = buf.items_rx.borrow().clone(); // O(1) clone because imbl::Vector
|
||||
})
|
||||
.collect();
|
||||
|
||||
tokio::select! {
|
||||
new_update = self.poll_updates_rx.recv() => {
|
||||
tracing::trace!("Got update for {}", self.room_id);
|
||||
let (buf, roominfo) = new_update.expect("poll_updates_rx closed");
|
||||
let roominfo_hash = hash_roominfo(roominfo);
|
||||
if let Some(computed_roominfo) = self.computed_roominfo.as_ref() {
|
||||
if computed_roominfo.roominfo_hash == roominfo_hash {
|
||||
// No visible change, exit early
|
||||
tracing::trace!("no visible change to {}", self.room_id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
tracing::trace!("visible change to {}", self.room_id);
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.update_roominfo_rx = Some(rx); // Prevents spawning more update_room_info() calls while
|
||||
// this one is still running
|
||||
let room = buf.client.get_room(&self.room_id).expect("client missing room");
|
||||
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
|
||||
},
|
||||
computed_roominfo = update_roominfo_rx => {
|
||||
// Received result of a computation
|
||||
tracing::trace!("Got computed_roominfo {}", self.room_id);
|
||||
self.computed_roominfo = computed_roominfo;
|
||||
self.update_roominfo_rx = None;
|
||||
}
|
||||
_ = poll_buffers_timeline.next() => {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -616,71 +796,8 @@ impl Buffer for RoomBuffer {
|
|||
}
|
||||
|
||||
async fn poll_updates(&mut self) {
|
||||
if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() {
|
||||
// Initialize the computed_roominfo
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.update_roominfo_rx = Some(rx);
|
||||
let room = self
|
||||
.buffers
|
||||
.first()
|
||||
.expect("missing sub-buffer")
|
||||
.client
|
||||
.get_room(&self.room_id)
|
||||
.expect("client missing room");
|
||||
let roominfo_hash = hash_roominfo(room.clone_info());
|
||||
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
|
||||
}
|
||||
|
||||
// Poll for new updates from the server
|
||||
let mut roominfo_update = self
|
||||
.buffers
|
||||
.iter_mut()
|
||||
.map(|buf| async {
|
||||
let roominfo = buf.poll_updates().await;
|
||||
(buf, roominfo)
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
// Wait for result of processing new updates
|
||||
let update_roominfo_rx = async {
|
||||
match self.update_roominfo_rx.as_mut() {
|
||||
Some(update_roominfo_rx) => Some(
|
||||
update_roominfo_rx
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("update_roominfo_rx for {} was dropped", self.room_id)),
|
||||
),
|
||||
None => std::future::pending().await,
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
new_update = roominfo_update.next() => {
|
||||
tracing::trace!("Got update for {}", self.room_id);
|
||||
let Some((buf, Some(roominfo))) = new_update else {
|
||||
tracing::trace!("None change to {}", self.room_id);
|
||||
return;
|
||||
};
|
||||
let roominfo_hash = hash_roominfo(roominfo);
|
||||
if let Some(computed_roominfo) = self.computed_roominfo.as_ref() {
|
||||
if computed_roominfo.roominfo_hash == roominfo_hash {
|
||||
// No visible change, exit early
|
||||
tracing::trace!("no visible change to {}", self.room_id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
tracing::trace!("visible change to {}", self.room_id);
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.update_roominfo_rx = Some(rx); // Prevents spawning more update_room_info() calls while
|
||||
// this one is still running
|
||||
let room = buf.client.get_room(&self.room_id).expect("client missing room");
|
||||
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
|
||||
},
|
||||
computed_roominfo = update_roominfo_rx => {
|
||||
// Received result of a computation
|
||||
tracing::trace!("Got computed_roominfo {}", self.room_id);
|
||||
self.computed_roominfo = computed_roominfo;
|
||||
self.update_roominfo_rx = None;
|
||||
}
|
||||
while !self.buffers.is_empty() {
|
||||
self.poll_updates_once().await;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -700,6 +817,7 @@ impl Buffer for RoomBuffer {
|
|||
BufferItemContent::SimpleText(text.clone())
|
||||
},
|
||||
OwnedBufferItemContent::Text { text, .. } => BufferItemContent::Text(text.clone()),
|
||||
OwnedBufferItemContent::Image { image, .. } => BufferItemContent::Image { image },
|
||||
OwnedBufferItemContent::Divider(text) => BufferItemContent::Divider(Text::raw(text)),
|
||||
OwnedBufferItemContent::Empty => BufferItemContent::Empty,
|
||||
},
|
||||
|
@ -716,7 +834,16 @@ impl Buffer for RoomBuffer {
|
|||
.first()
|
||||
.unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id))
|
||||
.back_pagination_request
|
||||
.fetch_max(num, Ordering::Relaxed);
|
||||
.update_if(|old| {
|
||||
if *old < num {
|
||||
log::info!("updating back_pagination_request for {}", self.room_id);
|
||||
*old = num;
|
||||
true
|
||||
} else {
|
||||
log::info!("NOT updating back_pagination_request for {}", self.room_id);
|
||||
false
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn unread_notification_counts(&self) -> UnreadNotificationsCount {
|
||||
|
@ -792,9 +919,14 @@ impl Buffer for RoomBuffer {
|
|||
if buf.items.len() < self.config.history.max_prefetch_unread
|
||||
&& self.config.history.prefetch_unread > 0
|
||||
{
|
||||
buf
|
||||
.back_pagination_request
|
||||
.fetch_max(self.config.history.prefetch_unread, Ordering::Relaxed);
|
||||
buf.back_pagination_request.update_if(|old| {
|
||||
if *old < self.config.history.prefetch_unread {
|
||||
*old = self.config.history.prefetch_unread;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -140,6 +140,8 @@ impl Backlog {
|
|||
)
|
||||
.scroll(scroll)
|
||||
.into(),
|
||||
#[cfg(feature = "images")]
|
||||
BufferItemContent::Image { image } => crate::widgets::BottomAlignedImage::new(image).into(),
|
||||
BufferItemContent::Divider(text) => {
|
||||
if scroll == 0 {
|
||||
Divider::new(Paragraph::new(text).alignment(Alignment::Center)).into()
|
||||
|
|
|
@ -10,6 +10,8 @@ pub mod commands;
|
|||
pub mod components;
|
||||
pub mod config;
|
||||
pub mod html;
|
||||
#[cfg(feature = "images")]
|
||||
pub mod images;
|
||||
pub mod log;
|
||||
pub mod mode;
|
||||
pub mod plugins;
|
||||
|
|
|
@ -15,7 +15,7 @@ async fn tokio_main() -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> Result<()> {
|
||||
if let Err(e) = tokio_main().await {
|
||||
eprintln!("{} error: Something went wrong", env!("CARGO_PKG_NAME"));
|
||||
|
|
|
@ -51,6 +51,8 @@ pub enum BacklogItemWidget<'a> {
|
|||
Paragraph(BottomAlignedParagraph<'a>),
|
||||
Container(BottomAlignedContainer<'a>),
|
||||
Divider(Divider<'a>),
|
||||
#[cfg(feature = "images")]
|
||||
Image(BottomAlignedImage<'a>),
|
||||
Empty(EmptyWidget),
|
||||
}
|
||||
|
||||
|
@ -66,6 +68,61 @@ impl OverlappableWidget for EmptyWidget {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "images")]
|
||||
mod images {
|
||||
use super::*;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref PROTOCOL_PICKER: ratatui_image::picker::Picker = make_protocol_picker();
|
||||
}
|
||||
|
||||
fn make_protocol_picker() -> ratatui_image::picker::Picker {
|
||||
let mut picker = ratatui_image::picker::Picker::from_termios()
|
||||
.expect("ratatui_image::picker::Picker::from_termios() failed");
|
||||
picker.guess_protocol();
|
||||
picker
|
||||
}
|
||||
|
||||
pub struct BottomAlignedImage<'a>(&'a image::DynamicImage);
|
||||
|
||||
impl<'a> BottomAlignedImage<'a> {
|
||||
pub fn new(image: &'a image::DynamicImage) -> Self {
|
||||
BottomAlignedImage(image)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> OverlappableWidget for BottomAlignedImage<'a> {
|
||||
fn height(&self, _width: u16) -> u64 {
|
||||
20 // TODO
|
||||
}
|
||||
fn render_overlap(self, area: Rect, buf: &mut Buffer) -> (u16, u16) {
|
||||
let width = u16::min(100, area.width);
|
||||
let width_px: u32 = (width as u32) * (PROTOCOL_PICKER.font_size.0 as u32);
|
||||
let height_px = width_px * self.0.height() / self.0.width();
|
||||
let height = u32::min(u16::MAX.into(), height_px / (PROTOCOL_PICKER.font_size.1 as u32)) as u16;
|
||||
|
||||
let protocol = PROTOCOL_PICKER
|
||||
.clone()
|
||||
.new_protocol(
|
||||
self.0.clone(),
|
||||
Rect {
|
||||
x: area.x,
|
||||
y: area.y,
|
||||
width,
|
||||
height,
|
||||
},
|
||||
ratatui_image::Resize::Crop,
|
||||
)
|
||||
.expect("picker.new_protocol failed");
|
||||
let widget= ratatui_image::Image::new(protocol.as_ref());
|
||||
widget.render(area, buf); // TODO: scroll
|
||||
(area.width, area.height)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub use images::*;
|
||||
|
||||
/*
|
||||
impl<W: OverlappableWidget> Widget for W {
|
||||
fn render(self, area: Rect, buf: &mut Buffer) {
|
||||
|
|
Loading…
Reference in New Issue