From d5e5639ba7fcd4b6ff825c7ecf88fecb897d5fba Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sat, 4 Nov 2023 20:06:44 +0100 Subject: [PATCH] Create room buffers when new rooms are returned by server sync --- src/app.rs | 86 ++++++++++++++++++++++++++++----------------- src/buffers/mod.rs | 4 +++ src/buffers/room.rs | 6 +++- 3 files changed, 63 insertions(+), 33 deletions(-) diff --git a/src/app.rs b/src/app.rs index 577debc..3a38940 100644 --- a/src/app.rs +++ b/src/app.rs @@ -15,8 +15,9 @@ */ use std::cell::RefCell; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use color_eyre::eyre::{eyre, Result, WrapErr}; use crossterm::event::KeyEvent; @@ -45,7 +46,7 @@ pub struct App { pub frame_rate: f64, pub components: Vec>, pub buffers: Buffers, - pub should_quit: AtomicBool, + pub should_quit: Arc, pub should_suspend: bool, pub mode: Mode, pub last_tick_key_events: RefCell>, @@ -129,7 +130,7 @@ impl App { tick_rate, frame_rate, components: vec![Box::new(home), Box::new(fps)], - should_quit: AtomicBool::new(false), + should_quit: Arc::new(AtomicBool::new(false)), should_suspend: false, buffers: Buffers::new(Box::new(LogBuffer::new(log_receiver))), clients, @@ -164,48 +165,42 @@ impl App { component.init(tui.size()?)?; } - futures::future::join_all( - self - .clients - .iter() - .flat_map(|client| { - client - .joined_rooms() - .into_iter() - .map(|room| (client.clone(), room)) - }) - .map(|(client, room)| RoomBuffer::new(client, room.room_id().to_owned())) - .map(|fut| fut.map(|res| res.expect("Failed to create RoomBuffer at startup"))), - ) - .await - .into_iter() - .map(|res| res) - .map(Box::new) - .for_each(|buffer| self.buffers.push(buffer)); + self + .create_room_buffers(self.clients.clone().iter().flat_map(|client| { + tracing::info!("joined rooms {:?}", client.joined_rooms()); + client + .joined_rooms() + .into_iter() + .map(move |room| (client.clone(), room.room_id().to_owned())) + })) + .await; let sync_clients = self.clients.clone(); let mut sync_results = FuturesUnordered::new(); for client in sync_clients.iter() { - client.add_event_handler( - |ev: matrix_sdk::ruma::events::room::message::SyncRoomMessageEvent| async move { - // TODO - }, - ); - let server_name = client.user_id().expect("missing user id").server_name(); + let user_id = client.user_id().expect("missing user id"); + let server_name = user_id.server_name(); let mut sync_settings = matrix_sdk::config::SyncSettings::default(); if server_name == "matrix.org" { // matrix.org is slow and frequently hits the timeout on initial sync, // let's quadruple the default sync_settings = sync_settings.timeout(std::time::Duration::from_secs(120)); } - sync_results.push(client.sync_with_result_callback(sync_settings, |res| { + tracing::info!("Syncing with {}...", user_id); + let should_quit = self.should_quit.clone(); + let sync_responses_tx = sync_responses_tx.clone(); + sync_results.push(client.sync_with_result_callback(sync_settings, move |res| { let client2 = client.clone(); - async { + let should_quit = should_quit.clone(); + let sync_responses_tx = sync_responses_tx.clone(); + async move { let sync_response = res.expect("Failed sync"); + let user_id = client2.user_id().expect("missing user id").to_string(); sync_responses_tx .send((client2, sync_response)) .expect("could not send sync response"); - if self.should_quit.load(Ordering::Acquire) { + tracing::info!("Synced with {}", user_id); + if should_quit.load(Ordering::Acquire) { Ok(matrix_sdk::LoopCtrl::Break) } else { Ok(matrix_sdk::LoopCtrl::Continue) @@ -364,12 +359,39 @@ impl App { } async fn handle_sync_response( - &self, + &mut self, action_tx: &mpsc::UnboundedSender, client: matrix_sdk::Client, sync_response: matrix_sdk::sync::SyncResponse, ) -> Result<()> { - // TODO + let known_rooms: HashSet<&matrix_sdk::ruma::RoomId> = + self.buffers.iter().flat_map(|buf| buf.room_id()).collect(); + let new_rooms: Vec<_> = sync_response + .rooms + .join + .keys() + .filter(|room_id: &&matrix_sdk::ruma::OwnedRoomId| { + !known_rooms.contains::(room_id.as_ref()) + }) + .map(|room_id| (client.clone(), room_id.to_owned())) + .collect(); + self.create_room_buffers(new_rooms.into_iter()).await; Ok(()) } + + async fn create_room_buffers( + &mut self, + rooms: impl Iterator, + ) { + futures::future::join_all( + rooms + .map(|(client, room)| RoomBuffer::new(client, room)) + .map(|fut| fut.map(|res| res.expect("Failed to create RoomBuffer at startup"))), + ) + .await + .into_iter() + .map(|res| res) + .map(Box::new) + .for_each(|buffer| self.buffers.push(buffer)); + } } diff --git a/src/buffers/mod.rs b/src/buffers/mod.rs index 92b90a8..5f74768 100644 --- a/src/buffers/mod.rs +++ b/src/buffers/mod.rs @@ -28,6 +28,10 @@ pub use room::RoomBuffer; pub trait Buffer: Send + Sync { /// A short human-readable name for the room, eg. to show in compact buflist fn short_name(&self) -> String; + /// If this is a room buffer, returns the associated room id. + fn room_id(&self) -> Option<&matrix_sdk::ruma::RoomId> { + None + } /// Returns if there are any updates to apply. async fn poll_updates(&mut self); fn content(&self) -> Vec; // TODO: make this lazy, only the last few are used diff --git a/src/buffers/room.rs b/src/buffers/room.rs index bc1ef4e..17c94d8 100644 --- a/src/buffers/room.rs +++ b/src/buffers/room.rs @@ -22,7 +22,7 @@ use eyeball_im::VectorDiff; use futures::{FutureExt, Stream, StreamExt}; use itertools::Itertools; use matrix_sdk::async_trait; -use matrix_sdk::ruma::OwnedRoomId; +use matrix_sdk::ruma::{OwnedRoomId, RoomId}; use matrix_sdk::Client; use matrix_sdk::Room; use matrix_sdk_ui::timeline::{ @@ -290,6 +290,10 @@ impl Buffer for RoomBuffer { self.room_id.as_str().to_owned() } + fn room_id(&self) -> Option<&RoomId> { + Some(&self.room_id) + } + async fn poll_updates(&mut self) { futures::future::join_all( self