Compare commits

...

4 Commits

Author SHA1 Message Date
Val Lorentz 79635e1e4e [WIP] images
Some checks failed
CI / lint (push) Failing after 45s
CI / Build and test (, 1.76.0) (push) Failing after 1m3s
CI / Build and test (, beta) (push) Failing after 1m6s
CI / Build and test (, nightly) (push) Failing after 48s
2024-02-10 23:11:40 +01:00
Val Lorentz 11bf79e95e rustfmt
Some checks failed
CI / lint (push) Successful in 3m33s
CI / Build and test (, nightly) (push) Failing after 1m33s
CI / Build and test (, beta) (push) Successful in 6m31s
CI / Build and test (, 1.76.0) (push) Successful in 6m34s
2024-02-10 20:28:09 +01:00
Val Lorentz e627e3b9a2 Update ratatui to match ansi-to-tui's version
Some checks failed
CI / Build and test (, nightly) (push) Waiting to run
CI / lint (push) Failing after 29s
CI / Build and test (, 1.76.0) (push) Has been cancelled
CI / Build and test (, beta) (push) Has been cancelled
2024-02-10 20:25:23 +01:00
Val Lorentz fa6c171916 Use long-running tasks for polling
Some checks failed
CI / lint (push) Failing after 33s
CI / Build and test (, 1.73.0) (push) Failing after 49s
CI / Build and test (, nightly) (push) Failing after 1m33s
CI / Build and test (, beta) (push) Failing after 3m17s
Instead of making poll_updates() create and cancel tasks every time it is
called (ie. on every tick)

This saves a lot of CPU, avoids a known memory leak that isn't patched
yet (https://github.com/jplatte/eyeball/pull/42).

It also seems to be a requirement to support async rendering code (eg.
to fetch images) as they cause `poll_updates` to be re-entrant, which
corrupted the timeline as `poll_updates` synchronized it through
`VectorDiff` and expected not to run again while a previous call was
still processing the previous `VectorDiff`.
2024-02-10 20:19:01 +01:00
10 changed files with 370 additions and 160 deletions

View File

@ -30,7 +30,7 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: '1.73.0'
toolchain: '1.76.0'
components: rustfmt
override: true
@ -55,7 +55,7 @@ jobs:
rust:
- nightly
- beta
- '1.73.0'
- '1.76.0'
features:
- ''

View File

@ -8,11 +8,19 @@ repository = "https://git.tf/val/ratatrix"
authors = ["Val Lorentz"]
license = "AGPL-3.0-only AND MIT"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = [
"images",
]
images = [
"dep:image",
"dep:ratatui-image",
]
[dependencies]
# Async
async-std = { version = "1.12.0", features = ["tokio1", "attributes"] }
futures = "0.3.28"
tokio = { version = "1.32.0", features = ["full"] }
tokio-util = "0.7.9"
@ -64,8 +72,8 @@ sorted-vec = "0.8.3"
eyeball = "0.8.7" # data structures observer returned by matrix-sdk-ui
eyeball-im = "0.4.2" # immutable data structures observer returned by matrix-sdk-ui
imbl = "2.0" # ditto
matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "5c37acb81ce624d83be54b5140cd60399b556fb2", features = ["eyre", "markdown"] }
matrix-sdk-ui = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "5c37acb81ce624d83be54b5140cd60399b556fb2" }
matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "7bbd07cc7703051e5276b87f26bf88b6239d64a4", features = ["eyre", "markdown"] }
matrix-sdk-ui = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "7bbd07cc7703051e5276b87f26bf88b6239d64a4" }
#matrix-sdk = { path = "../matrix-rust-sdk/crates/matrix-sdk", features = ["eyre", "markdown"] }
#matrix-sdk-ui = { path = "../matrix-rust-sdk/crates/matrix-sdk-ui" }
@ -73,13 +81,18 @@ matrix-sdk-ui = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev
ansi-to-tui = "3.1.0"
chrono = "0.4.31"
crossterm = { version = "0.27.0", features = ["serde", "event-stream"] }
ratatui = { version = "0.24.0", features = ["serde", "macros"] }
ratatui = { version = "0.26.0", features = ["serde", "macros"] }
strip-ansi-escapes = "0.2.0"
tui-textarea = "0.3.0"
unicode-width = "0.1"
# UI (images)
ratatui-image = { version = "0.8.0", optional = true }
image = { version = "0.24.8", optional = true }
[patch.crates-io]
#ratatui = { path = "../ratatui", features = ["serde", "macros"] }
eyeball-im = { path = "../eyeball/eyeball-im" }
[dev-dependencies]
pretty_assertions = "1.4.0"

View File

@ -1,2 +1,2 @@
[toolchain]
channel = "1.73.0"
channel = "1.76.0"

View File

@ -214,7 +214,7 @@ impl App {
let (client, sync_response) = sync_response.expect("sync_responses_rx unexpectedly closed");
self.handle_sync_response(&action_tx, client, sync_response).await.context("Error while handling sync response")?;
}
poll_updates = self.buffers.poll_updates() => {
poll_updates = self.buffers.poll_updates_once() => {
changes_since_last_render = true;
}
sync_result = sync_results.next() => {

View File

@ -32,7 +32,7 @@ use crate::widgets::Prerender;
mod log;
pub use log::LogBuffer;
mod room;
pub(crate) mod room;
pub use room::RoomBuffer;
/// Maximum time before reordering the buffer list based on parent/child relationships.
@ -105,6 +105,10 @@ pub enum BufferItemContent<'buf> {
SimpleText(Text<'buf>),
/// Pairs of `(padding, content)`
Text(Vec<(String, Text<'buf>)>),
#[cfg(feature = "images")]
Image {
image: &'buf image::DynamicImage,
},
Divider(Text<'buf>),
Empty,
}
@ -196,7 +200,7 @@ impl Buffers {
}
}
pub async fn poll_updates(&mut self) {
pub async fn poll_updates_once(&mut self) {
let reorder_now = {
let next_reorder = self.next_reorder;
let mut updates_future = self

View File

@ -20,18 +20,21 @@ use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{Arc, OnceLock};
use chrono::{offset::Local, DateTime};
use color_eyre::eyre::{eyre, Result};
use color_eyre::eyre::{eyre, Result, WrapErr};
use eyeball::SharedObservable;
use eyeball_im::VectorDiff;
use futures::future::join_all;
use futures::future::OptionFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, Stream, StreamExt};
use image::{DynamicImage, ImageResult};
use itertools::Itertools;
use matrix_sdk::async_trait;
use matrix_sdk::deserialized_responses::SyncOrStrippedState;
use matrix_sdk::room::ParentSpace;
use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
use matrix_sdk::ruma::events::room::message::{
FormattedBody, MessageFormat, MessageType, TextMessageEventContent,
FormattedBody, ImageMessageEventContent, MessageFormat, MessageType, TextMessageEventContent,
};
use matrix_sdk::ruma::events::space::child::SpaceChildEventContent;
use matrix_sdk::ruma::events::RoomAccountDataEvent;
@ -48,7 +51,9 @@ use memuse::DynamicUsage;
use ratatui::text::Text;
use smallvec::SmallVec;
use sorted_vec::SortedVec;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use tokio::sync::RwLock;
use super::{Buffer, BufferId, BufferItem, BufferItemContent, BufferSortKey, FullyReadStatus};
use crate::config::Config;
@ -69,6 +74,12 @@ pub enum OwnedBufferItemContent {
/// `(padding, content)` pairs
text: Vec<(String, Text<'static>)>,
},
#[cfg(feature = "images")]
Image {
event_id: Option<OwnedEventId>,
is_message: bool,
image: DynamicImage,
},
Divider(String),
Empty,
}
@ -86,6 +97,7 @@ impl DynamicUsage for OwnedBufferItemContent {
.map(|item| item.1.width() * item.1.height() * 4)
.sum() // FIXME: rough approx
},
OwnedBufferItemContent::Image { .. } => 10 * 1024 * 1024, // FIXME: that's just the maximum size
OwnedBufferItemContent::Divider(s) => s.dynamic_usage(),
OwnedBufferItemContent::Empty => 0,
}
@ -103,6 +115,7 @@ impl DynamicUsage for OwnedBufferItemContent {
.sum();
(area, Some(area * 12)) // FIXME: rough approx
},
OwnedBufferItemContent::Image { .. } => (0, Some(10 * 1024 * 1024)), // FIXME: that's just the bounds
OwnedBufferItemContent::Divider(s) => s.dynamic_usage_bounds(),
OwnedBufferItemContent::Empty => (0, Some(0)),
};
@ -116,6 +129,25 @@ impl DynamicUsage for OwnedBufferItemContent {
}
}
pub struct SingleClientRoomBufferInner {
latest_event_id: Option<OwnedEventId>,
timeline: Arc<Timeline>,
}
impl DynamicUsage for SingleClientRoomBufferInner {
fn dynamic_usage(&self) -> usize {
std::mem::size_of::<Self>()
}
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
(
std::mem::size_of::<Self>(),
Some(std::mem::size_of::<Self>()),
)
}
}
// FIXME: use a single Arc for all the values inside
#[derive(Clone)]
pub struct SingleClientRoomBuffer {
config: Arc<Config>,
room_id: Arc<OwnedRoomId>,
@ -123,31 +155,32 @@ pub struct SingleClientRoomBuffer {
/// Newest first
items: imbl::vector::Vector<(Option<u64>, OwnedBufferItemContent, Prerender)>,
latest_event_id: Option<OwnedEventId>,
// TODO: get rid of this trait object, we know it's matrix_sdk_ui::timeline::TimelineStream
timeline_stream: Box<dyn Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + Send + Sync + Unpin>,
timeline: Arc<Timeline>,
back_pagination_request: AtomicU16,
roominfo_subscriber: eyeball::Subscriber<RoomInfo>,
items_rx: tokio::sync::watch::Receiver<
imbl::vector::Vector<(Option<u64>, OwnedBufferItemContent, Prerender)>,
>,
back_pagination_request: SharedObservable<u16>,
inner: Arc<RwLock<SingleClientRoomBufferInner>>,
}
impl DynamicUsage for SingleClientRoomBuffer {
fn dynamic_usage(&self) -> usize {
std::mem::size_of::<Self>()
self.inner.blocking_read().dynamic_usage()
+ self
.items
.iter()
.map(|(_, content, prerender)| {
.map(|item| {
let (_, content, prerender) = item;
std::mem::size_of::<Option<u64>>() + content.dynamic_usage() + prerender.dynamic_usage()
})
.sum::<usize>()
}
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
let self_usage = std::mem::size_of::<Self>();
let (min, max) = self.inner.blocking_read().dynamic_usage_bounds();
crate::utils::sum_memory_bounds(
self_usage,
Some(self_usage),
min,
max,
self
.items
.iter()
@ -162,32 +195,60 @@ impl DynamicUsage for SingleClientRoomBuffer {
}
impl SingleClientRoomBuffer {
/// If the room info was updated, returns it
async fn poll_updates(&mut self) -> Option<RoomInfo> {
let back_pagination_request = self.back_pagination_request.swap(0, Ordering::Relaxed);
if back_pagination_request > 0 {
// TODO: run this concurrently with timeline_stream.next() below
self.spawn_back_pagination(back_pagination_request).await;
}
tokio::select! {
biased;
roominfo = self.roominfo_subscriber.next() => {
Some(roominfo.expect("reached end of roominfo_subscriber"))
},
changes = self.timeline_stream.next() => {
let Some(changes) = changes else { return None; };
for change in changes {
change
.map(|item| (Some(item.unique_id()), self.format_timeline_item(item), Prerender::new()))
.apply(&mut self.items);
async fn poll_updates(
self,
updates_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>,
write_items: tokio::sync::watch::Sender<
imbl::vector::Vector<(Option<u64>, OwnedBufferItemContent, Prerender)>,
>,
mut roominfo_subscriber: eyeball::Subscriber<RoomInfo>,
mut timeline_stream: impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + Send + Sync + Unpin,
) {
loop {
tokio::select! {
biased;
roominfo = roominfo_subscriber.next() => {
match roominfo {
Some(roominfo) => {
updates_tx.send((self.clone(), roominfo)).expect("updates_tx closed");
}
None => {
log::info!("Closing {:?} client for {}", self.client, self.room_id);
break;
}
}
},
changes = timeline_stream.next() => {
if let Some(changes) = changes {
let changes = join_all(changes.into_iter().map(|change|
change.async_map(|tl_item| self.make_timeline_item(tl_item))
)).await;
write_items.send_modify(|items| for change in changes {
change.apply(items)
});
let mut inner = self.inner.write().await;
inner.latest_event_id = inner.timeline.latest_event().await.and_then(|e| e.event_id().map(ToOwned::to_owned));
}
}
self.latest_event_id = self.timeline.latest_event().await.and_then(|e| e.event_id().map(ToOwned::to_owned));
None
}
}
}
fn format_timeline_item(&self, tl_item: impl AsRef<TimelineItem>) -> OwnedBufferItemContent {
async fn make_timeline_item(
&self,
tl_item: impl AsRef<TimelineItem>,
) -> (Option<u64>, OwnedBufferItemContent, Prerender) {
(
Some(tl_item.as_ref().unique_id()),
self.format_timeline_item(tl_item).await,
Prerender::new(),
)
}
async fn format_timeline_item(
&self,
tl_item: impl AsRef<TimelineItem>,
) -> OwnedBufferItemContent {
match tl_item.as_ref().kind() {
TimelineItemKind::Event(event) => {
use matrix_sdk_ui::timeline::TimelineItemContent::*;
@ -241,6 +302,17 @@ impl SingleClientRoomBuffer {
MessageType::Text(TextMessageEventContent { body, .. }) => {
msg!(" ", "&lt;{}> {}", sender, escape_html(body))
},
#[cfg(feature = "images")]
MessageType::Image(content) => {
crate::images::decode_image(
&self.client,
&self.config,
event.event_id().map(ToOwned::to_owned),
&sender,
content,
)
.await
},
_ =>
// Fallback to text body
{
@ -443,31 +515,49 @@ impl SingleClientRoomBuffer {
}
}
async fn spawn_back_pagination(&self, num: u16) {
let room_id = self.room_id.clone();
let timeline = self.timeline.clone();
let mut back_pagination_status = timeline.back_pagination_status();
match back_pagination_status.get() {
BackPaginationStatus::Paginating | BackPaginationStatus::TimelineStartReached => {
// We are already waiting for a backfill from the server
},
BackPaginationStatus::Idle => {
tokio::spawn(async move {
tracing::debug!("Starting pagination for {}", room_id);
timeline
.paginate_backwards(matrix_sdk_ui::timeline::PaginationOptions::until_num_items(
num, num,
))
.await
.unwrap_or_else(|e| tracing::error!("Failed to paginate {} backward: {}", room_id, e));
tracing::debug!("Ended pagination for {}", room_id);
});
// Wait for the task we just spawned to change the status, so we don't risk starting
// a new one in the meantime
back_pagination_status.next().await;
},
async fn back_pagination_worker(self) {
let mut request_subscriber = self.back_pagination_request.subscribe();
let mut num_requested_items = request_subscriber.next_now();
loop {
if num_requested_items != 0 {
let inner = self.inner.read().await;
let room_id = self.room_id.clone();
let timeline = inner.timeline.clone();
let back_pagination_status = timeline.back_pagination_status();
match back_pagination_status.get() {
BackPaginationStatus::Paginating | BackPaginationStatus::TimelineStartReached => {
// We are already waiting for a backfill from the server
tracing::debug!("Pending pagination for {}", room_id);
},
BackPaginationStatus::Idle => {
tracing::debug!("Starting pagination for {}", room_id);
timeline
.paginate_backwards(matrix_sdk_ui::timeline::PaginationOptions::until_num_items(
num_requested_items,
num_requested_items,
))
.await
.unwrap_or_else(|e| {
tracing::error!("Failed to paginate {} backward: {}", room_id, e)
});
tracing::debug!("Ended pagination for {}", room_id);
self.back_pagination_request.set(0);
},
}
}
num_requested_items = match request_subscriber.next().await {
Some(num_requested_items) => num_requested_items,
None => break,
};
if num_requested_items != 0 {
log::debug!(
"got back_pagination_request for {}: {}",
self.room_id,
num_requested_items
);
}
}
log::debug!("end of back_pagination_worker for {}", self.room_id);
}
}
@ -478,6 +568,9 @@ pub struct RoomBuffer {
computed_roominfo: Option<ComputedRoomInfo>,
poll_updates_rx: UnboundedReceiver<(SingleClientRoomBuffer, RoomInfo)>,
poll_updates_tx: UnboundedSender<(SingleClientRoomBuffer, RoomInfo)>,
/// Set while we are already computing roominfo, to block other updates to the room.
/// Holds the next `self.computed_roominfo`.
update_roominfo_rx: Option<oneshot::Receiver<ComputedRoomInfo>>,
@ -511,10 +604,13 @@ impl RoomBuffer {
initial_client: Client,
room_id: Arc<OwnedRoomId>,
) -> Result<Self> {
let (poll_updates_tx, poll_updates_rx) = unbounded_channel();
let mut self_ = RoomBuffer {
config,
room_id,
computed_roominfo: None,
poll_updates_tx,
poll_updates_rx,
update_roominfo_rx: None,
buffers: SmallVec::new(),
};
@ -532,39 +628,123 @@ impl RoomBuffer {
);
eyre!("Unknown room {} for client {:?}", self.room_id, client)
})?;
let timeline = room.timeline_builder().build().await;
let timeline = room.timeline_builder().build().await.with_context(|| {
format!(
"Could not get timeline for {:?} from {:?}",
self.room_id, client
)
})?;
let (items, timeline_stream) = timeline.subscribe_batched().await;
tracing::info!(
"Added client for {}, initial items: {:?}",
self.room_id,
items
);
self.buffers.push(SingleClientRoomBuffer {
let items: imbl::Vector<_> = items // FIXME: it's always empty. why?
.into_iter()
.map(|item| {
(
None,
OwnedBufferItemContent::SimpleText {
event_id: None,
is_message: false,
text: Text::raw(format!("Initial item: {:#?}", item)),
},
Prerender::new(),
)
})
.collect();
let (write_items, items_rx) = tokio::sync::watch::channel(items.clone());
let buffer = SingleClientRoomBuffer {
inner: Arc::new(RwLock::new(SingleClientRoomBufferInner {
timeline: Arc::new(timeline),
latest_event_id: None,
})),
config: self.config.clone(),
room_id: self.room_id.clone(),
client,
timeline: Arc::new(timeline),
items: items // FIXME: it's always empty. why?
.into_iter()
.map(|item| {
(
None,
OwnedBufferItemContent::SimpleText {
event_id: None,
is_message: false,
text: Text::raw(format!("Initial item: {:#?}", item)),
},
Prerender::new(),
)
})
.collect(),
latest_event_id: None,
timeline_stream: Box::new(timeline_stream),
back_pagination_request: AtomicU16::new(0),
roominfo_subscriber: room.subscribe_info(),
});
back_pagination_request: SharedObservable::new(0),
items,
items_rx,
};
self.buffers.push(buffer.clone());
tokio::task::spawn(buffer.clone().poll_updates(
self.poll_updates_tx.clone(),
write_items,
room.subscribe_info(),
timeline_stream,
));
tokio::task::spawn(buffer.back_pagination_worker());
Ok(())
}
async fn poll_updates_once(&mut self) {
if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() {
// Initialize the computed_roominfo
let (tx, rx) = oneshot::channel();
self.update_roominfo_rx = Some(rx);
let room = self
.buffers
.first()
.expect("missing sub-buffer")
.client
.get_room(&self.room_id)
.expect("client missing room");
let roominfo_hash = hash_roominfo(room.clone_info());
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
}
// Wait for result of processing new updates
let update_roominfo_rx = async {
match self.update_roominfo_rx.as_mut() {
Some(update_roominfo_rx) => Some(
update_roominfo_rx
.await
.unwrap_or_else(|_| panic!("update_roominfo_rx for {} was dropped", self.room_id)),
),
None => std::future::pending().await,
}
};
let mut poll_buffers_timeline: FuturesUnordered<_> = self
.buffers
.iter_mut()
.map(|buf| async {
buf.items_rx.changed().await.unwrap();
buf.items = buf.items_rx.borrow().clone(); // O(1) clone because imbl::Vector
})
.collect();
tokio::select! {
new_update = self.poll_updates_rx.recv() => {
tracing::trace!("Got update for {}", self.room_id);
let (buf, roominfo) = new_update.expect("poll_updates_rx closed");
let roominfo_hash = hash_roominfo(roominfo);
if let Some(computed_roominfo) = self.computed_roominfo.as_ref() {
if computed_roominfo.roominfo_hash == roominfo_hash {
// No visible change, exit early
tracing::trace!("no visible change to {}", self.room_id);
return;
}
}
tracing::trace!("visible change to {}", self.room_id);
let (tx, rx) = oneshot::channel();
self.update_roominfo_rx = Some(rx); // Prevents spawning more update_room_info() calls while
// this one is still running
let room = buf.client.get_room(&self.room_id).expect("client missing room");
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
},
computed_roominfo = update_roominfo_rx => {
// Received result of a computation
tracing::trace!("Got computed_roominfo {}", self.room_id);
self.computed_roominfo = computed_roominfo;
self.update_roominfo_rx = None;
}
_ = poll_buffers_timeline.next() => {
}
}
}
}
#[async_trait]
@ -616,71 +796,8 @@ impl Buffer for RoomBuffer {
}
async fn poll_updates(&mut self) {
if self.update_roominfo_rx.is_none() && self.computed_roominfo.is_none() {
// Initialize the computed_roominfo
let (tx, rx) = oneshot::channel();
self.update_roominfo_rx = Some(rx);
let room = self
.buffers
.first()
.expect("missing sub-buffer")
.client
.get_room(&self.room_id)
.expect("client missing room");
let roominfo_hash = hash_roominfo(room.clone_info());
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
}
// Poll for new updates from the server
let mut roominfo_update = self
.buffers
.iter_mut()
.map(|buf| async {
let roominfo = buf.poll_updates().await;
(buf, roominfo)
})
.collect::<FuturesUnordered<_>>();
// Wait for result of processing new updates
let update_roominfo_rx = async {
match self.update_roominfo_rx.as_mut() {
Some(update_roominfo_rx) => Some(
update_roominfo_rx
.await
.unwrap_or_else(|_| panic!("update_roominfo_rx for {} was dropped", self.room_id)),
),
None => std::future::pending().await,
}
};
tokio::select! {
new_update = roominfo_update.next() => {
tracing::trace!("Got update for {}", self.room_id);
let Some((buf, Some(roominfo))) = new_update else {
tracing::trace!("None change to {}", self.room_id);
return;
};
let roominfo_hash = hash_roominfo(roominfo);
if let Some(computed_roominfo) = self.computed_roominfo.as_ref() {
if computed_roominfo.roominfo_hash == roominfo_hash {
// No visible change, exit early
tracing::trace!("no visible change to {}", self.room_id);
return;
}
}
tracing::trace!("visible change to {}", self.room_id);
let (tx, rx) = oneshot::channel();
self.update_roominfo_rx = Some(rx); // Prevents spawning more update_room_info() calls while
// this one is still running
let room = buf.client.get_room(&self.room_id).expect("client missing room");
tokio::spawn(compute_room_info(room, tx, roominfo_hash));
},
computed_roominfo = update_roominfo_rx => {
// Received result of a computation
tracing::trace!("Got computed_roominfo {}", self.room_id);
self.computed_roominfo = computed_roominfo;
self.update_roominfo_rx = None;
}
while !self.buffers.is_empty() {
self.poll_updates_once().await;
}
}
@ -700,6 +817,7 @@ impl Buffer for RoomBuffer {
BufferItemContent::SimpleText(text.clone())
},
OwnedBufferItemContent::Text { text, .. } => BufferItemContent::Text(text.clone()),
OwnedBufferItemContent::Image { image, .. } => BufferItemContent::Image { image },
OwnedBufferItemContent::Divider(text) => BufferItemContent::Divider(Text::raw(text)),
OwnedBufferItemContent::Empty => BufferItemContent::Empty,
},
@ -716,7 +834,16 @@ impl Buffer for RoomBuffer {
.first()
.unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id))
.back_pagination_request
.fetch_max(num, Ordering::Relaxed);
.update_if(|old| {
if *old < num {
log::info!("updating back_pagination_request for {}", self.room_id);
*old = num;
true
} else {
log::info!("NOT updating back_pagination_request for {}", self.room_id);
false
}
})
}
fn unread_notification_counts(&self) -> UnreadNotificationsCount {
@ -792,9 +919,14 @@ impl Buffer for RoomBuffer {
if buf.items.len() < self.config.history.max_prefetch_unread
&& self.config.history.prefetch_unread > 0
{
buf
.back_pagination_request
.fetch_max(self.config.history.prefetch_unread, Ordering::Relaxed);
buf.back_pagination_request.update_if(|old| {
if *old < self.config.history.prefetch_unread {
*old = self.config.history.prefetch_unread;
true
} else {
false
}
})
}
}
}

View File

@ -140,6 +140,8 @@ impl Backlog {
)
.scroll(scroll)
.into(),
#[cfg(feature = "images")]
BufferItemContent::Image { image } => crate::widgets::BottomAlignedImage::new(image).into(),
BufferItemContent::Divider(text) => {
if scroll == 0 {
Divider::new(Paragraph::new(text).alignment(Alignment::Center)).into()

View File

@ -10,6 +10,8 @@ pub mod commands;
pub mod components;
pub mod config;
pub mod html;
#[cfg(feature = "images")]
pub mod images;
pub mod log;
pub mod mode;
pub mod plugins;

View File

@ -15,7 +15,7 @@ async fn tokio_main() -> Result<()> {
Ok(())
}
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
if let Err(e) = tokio_main().await {
eprintln!("{} error: Something went wrong", env!("CARGO_PKG_NAME"));

View File

@ -51,6 +51,8 @@ pub enum BacklogItemWidget<'a> {
Paragraph(BottomAlignedParagraph<'a>),
Container(BottomAlignedContainer<'a>),
Divider(Divider<'a>),
#[cfg(feature = "images")]
Image(BottomAlignedImage<'a>),
Empty(EmptyWidget),
}
@ -66,6 +68,61 @@ impl OverlappableWidget for EmptyWidget {
}
}
#[cfg(feature = "images")]
mod images {
use super::*;
lazy_static::lazy_static! {
static ref PROTOCOL_PICKER: ratatui_image::picker::Picker = make_protocol_picker();
}
fn make_protocol_picker() -> ratatui_image::picker::Picker {
let mut picker = ratatui_image::picker::Picker::from_termios()
.expect("ratatui_image::picker::Picker::from_termios() failed");
picker.guess_protocol();
picker
}
pub struct BottomAlignedImage<'a>(&'a image::DynamicImage);
impl<'a> BottomAlignedImage<'a> {
pub fn new(image: &'a image::DynamicImage) -> Self {
BottomAlignedImage(image)
}
}
impl<'a> OverlappableWidget for BottomAlignedImage<'a> {
fn height(&self, _width: u16) -> u64 {
20 // TODO
}
fn render_overlap(self, area: Rect, buf: &mut Buffer) -> (u16, u16) {
let width = u16::min(100, area.width);
let width_px: u32 = (width as u32) * (PROTOCOL_PICKER.font_size.0 as u32);
let height_px = width_px * self.0.height() / self.0.width();
let height = u32::min(u16::MAX.into(), height_px / (PROTOCOL_PICKER.font_size.1 as u32)) as u16;
let protocol = PROTOCOL_PICKER
.clone()
.new_protocol(
self.0.clone(),
Rect {
x: area.x,
y: area.y,
width,
height,
},
ratatui_image::Resize::Crop,
)
.expect("picker.new_protocol failed");
let widget= ratatui_image::Image::new(protocol.as_ref());
widget.render(area, buf); // TODO: scroll
(area.width, area.height)
}
}
}
pub use images::*;
/*
impl<W: OverlappableWidget> Widget for W {
fn render(self, area: Rect, buf: &mut Buffer) {