Poll room updates and display them

This commit is contained in:
2023-11-02 12:00:40 +01:00
parent e0bc599ce5
commit fda09e0127
4 changed files with 137 additions and 22 deletions

View File

@ -39,6 +39,7 @@ human-panic = "1.2.0"
# Internal
inventory = "0.3"
itertools = "0.11.0"
lazy_static = "1.4.0"
libc = "0.2.148"
log = "0.4.20"
@ -47,7 +48,12 @@ signal-hook = "0.3.17"
smallvec = "1.11.1"
# Matrix
eyeball-im = "0.4.1" # immutable data structures returned by matrix-sdk-ui
imbl = "2.0" # ditto
matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "91e7f2f7224b8ada17ab639d60da10dad98aeaf9", features = ["eyre", "markdown"] }
matrix-sdk-ui = { git = "https://github.com/matrix-org/matrix-rust-sdk.git", rev = "91e7f2f7224b8ada17ab639d60da10dad98aeaf9" }
#matrix-sdk = { path = "../matrix-rust-sdk/crates/matrix-sdk", features = ["eyre", "markdown"] }
#matrix-sdk-ui = { path = "../matrix-rust-sdk/crates/matrix-sdk-ui" }
# UI
ansi-to-tui = "3.1.0"

View File

@ -22,7 +22,7 @@ use std::sync::RwLock;
use color_eyre::eyre::{Result, WrapErr};
use crossterm::event::KeyEvent;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use nonempty::NonEmpty;
use ratatui::prelude::Rect;
use serde::{Deserialize, Serialize};
@ -155,14 +155,24 @@ impl App {
component.init(tui.size()?)?;
}
for client in &self.clients {
for room in client.joined_rooms() {
self.buffers.push(Box::new(RoomBuffer::new(
[client.clone()],
room.room_id().to_owned(),
)));
}
}
futures::future::join_all(
self
.clients
.iter()
.flat_map(|client| {
client
.joined_rooms()
.into_iter()
.map(|room| (client.clone(), room))
})
.map(|(client, room)| RoomBuffer::new(client, room.room_id().to_owned()))
.map(|fut| fut.map(|res| res.expect("Failed to create RoomBuffer at startup"))),
)
.await
.into_iter()
.map(|res| res)
.map(Box::new)
.for_each(|buffer| self.buffers.push(buffer));
let sync_clients = self.clients.clone();
let mut sync_results = FuturesUnordered::new();
@ -211,6 +221,9 @@ impl App {
let (client, sync_response) = sync_response.expect("sync_responses_rx unexpectedly closed");
self.handle_sync_response(&action_tx, client, sync_response).await.context("Error while handling sync response")?;
}
poll_updates = futures::future::join_all(
self.buffers.iter_mut().map(|buf| buf.poll_updates())
) => {}
sync_result = sync_results.next() => {
if !self.should_quit.load(Ordering::Acquire) {
panic!("Sync ended unexpected: {:?}", sync_result);

View File

@ -14,6 +14,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use matrix_sdk::async_trait;
use nonempty::NonEmpty;
mod log;
@ -21,9 +22,11 @@ pub use log::LogBuffer;
mod room;
pub use room::RoomBuffer;
pub trait Buffer {
#[async_trait]
pub trait Buffer: Send {
/// A short human-readable name for the room, eg. to show in compact buflist
fn short_name(&self) -> String;
async fn poll_updates(&mut self) {}
fn content(&self) -> ratatui::text::Text;
}
@ -44,6 +47,10 @@ impl Buffers {
self.buffers.iter().map(|buffer_box| &**buffer_box)
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut Box<dyn Buffer>> {
self.buffers.iter_mut()
}
pub fn push(&mut self, buffer: Box<dyn Buffer>) {
self.buffers.push(buffer)
}

View File

@ -14,35 +14,124 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::pin::Pin;
use std::sync::Arc;
use color_eyre::eyre::{eyre, Result};
use eyeball_im::VectorDiff;
use futures::{FutureExt, Stream, StreamExt};
use itertools::Itertools;
use matrix_sdk::async_trait;
use matrix_sdk::ruma::OwnedRoomId;
use matrix_sdk::Client;
use matrix_sdk::Room;
use matrix_sdk_ui::timeline::{RoomExt, Timeline, TimelineItem};
use smallvec::SmallVec;
use tokio::pin;
use super::Buffer;
pub struct RoomBuffer {
// It's unlikely users will join the same room with more than one account;
// avoid a useless heap allocation for the usual case.
clients: SmallVec<[Client; 1]>,
room_id: OwnedRoomId,
pub struct SingleClientRoomBuffer {
client: Client,
items: imbl::vector::Vector<String>,
// TODO: get rid of this trait object, we know it's matrix_sdk_ui::timeline::TimelineStream
stream: Box<dyn Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + Send + Unpin>,
}
impl RoomBuffer {
pub fn new<Clients: IntoIterator<Item = Client>>(clients: Clients, room_id: OwnedRoomId) -> Self {
RoomBuffer {
clients: clients.into_iter().collect(),
room_id,
}
impl SingleClientRoomBuffer {
async fn poll_updates(&mut self) {
self.items.extend(
self
.stream
.next()
.await
.map(|change| format!("New item: {:#?}", change)),
);
}
}
pub struct RoomBuffer {
room_id: OwnedRoomId,
// It's unlikely users will join the same room with more than one account;
// avoid a useless heap allocation for the usual case.
buffers: SmallVec<[SingleClientRoomBuffer; 1]>,
}
fn f(b: RoomBuffer) {
g(b);
}
fn g<B: Send>(b: B) {}
impl RoomBuffer {
pub async fn new(initial_client: Client, room_id: OwnedRoomId) -> Result<Self> {
let mut self_ = RoomBuffer {
buffers: SmallVec::new(),
room_id,
};
self_.add_client(initial_client).await?;
Ok(self_)
}
pub async fn add_client(&mut self, client: Client) -> Result<()> {
let (items, stream) = client
.get_room(&self.room_id)
.ok_or_else(|| {
tracing::error!(
"Adding {:?} for {:?}, but it does not know this room ({} other clients know this room)",
client,
self.room_id,
self.buffers.len()
);
eyre!("Unknown room {} for client {:?}", self.room_id, client)
})?
.timeline_builder()
.build()
.await
.subscribe_batched()
.await;
tracing::info!(
"Added client for {}, initial items: {:?}",
self.room_id,
items
);
self.buffers.push(SingleClientRoomBuffer {
client,
items: items // FIXME: it's always empty. why?
.into_iter()
.map(|item| format!("Initial item: {:#?}", item))
.collect(),
stream: Box::new(stream),
});
Ok(())
}
}
#[async_trait]
impl Buffer for RoomBuffer {
fn short_name(&self) -> String {
self.room_id.as_str().to_owned()
}
async fn poll_updates(&mut self) {
futures::future::join_all(
self
.buffers
.iter_mut()
.map(SingleClientRoomBuffer::poll_updates),
)
.await;
}
fn content(&self) -> ratatui::text::Text {
format!("Timeline of {} goes here", self.room_id).into()
// TODO: merge buffers, etc.
self
.buffers
.first()
.unwrap_or_else(|| panic!("No sub-buffer for {}", self.room_id))
.items
.iter()
.join("\n")
.into()
}
}