Create room buffers when new rooms are returned by server sync

This commit is contained in:
2023-11-04 20:06:44 +01:00
parent 6f8c3a19b2
commit d5e5639ba7
3 changed files with 63 additions and 33 deletions

View File

@ -15,8 +15,9 @@
*/
use std::cell::RefCell;
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use color_eyre::eyre::{eyre, Result, WrapErr};
use crossterm::event::KeyEvent;
@ -45,7 +46,7 @@ pub struct App {
pub frame_rate: f64,
pub components: Vec<Box<dyn Component>>,
pub buffers: Buffers,
pub should_quit: AtomicBool,
pub should_quit: Arc<AtomicBool>,
pub should_suspend: bool,
pub mode: Mode,
pub last_tick_key_events: RefCell<Vec<KeyEvent>>,
@ -129,7 +130,7 @@ impl App {
tick_rate,
frame_rate,
components: vec![Box::new(home), Box::new(fps)],
should_quit: AtomicBool::new(false),
should_quit: Arc::new(AtomicBool::new(false)),
should_suspend: false,
buffers: Buffers::new(Box::new(LogBuffer::new(log_receiver))),
clients,
@ -164,48 +165,42 @@ impl App {
component.init(tui.size()?)?;
}
futures::future::join_all(
self
.clients
.iter()
.flat_map(|client| {
client
.joined_rooms()
.into_iter()
.map(|room| (client.clone(), room))
})
.map(|(client, room)| RoomBuffer::new(client, room.room_id().to_owned()))
.map(|fut| fut.map(|res| res.expect("Failed to create RoomBuffer at startup"))),
)
.await
.into_iter()
.map(|res| res)
.map(Box::new)
.for_each(|buffer| self.buffers.push(buffer));
self
.create_room_buffers(self.clients.clone().iter().flat_map(|client| {
tracing::info!("joined rooms {:?}", client.joined_rooms());
client
.joined_rooms()
.into_iter()
.map(move |room| (client.clone(), room.room_id().to_owned()))
}))
.await;
let sync_clients = self.clients.clone();
let mut sync_results = FuturesUnordered::new();
for client in sync_clients.iter() {
client.add_event_handler(
|ev: matrix_sdk::ruma::events::room::message::SyncRoomMessageEvent| async move {
// TODO
},
);
let server_name = client.user_id().expect("missing user id").server_name();
let user_id = client.user_id().expect("missing user id");
let server_name = user_id.server_name();
let mut sync_settings = matrix_sdk::config::SyncSettings::default();
if server_name == "matrix.org" {
// matrix.org is slow and frequently hits the timeout on initial sync,
// let's quadruple the default
sync_settings = sync_settings.timeout(std::time::Duration::from_secs(120));
}
sync_results.push(client.sync_with_result_callback(sync_settings, |res| {
tracing::info!("Syncing with {}...", user_id);
let should_quit = self.should_quit.clone();
let sync_responses_tx = sync_responses_tx.clone();
sync_results.push(client.sync_with_result_callback(sync_settings, move |res| {
let client2 = client.clone();
async {
let should_quit = should_quit.clone();
let sync_responses_tx = sync_responses_tx.clone();
async move {
let sync_response = res.expect("Failed sync");
let user_id = client2.user_id().expect("missing user id").to_string();
sync_responses_tx
.send((client2, sync_response))
.expect("could not send sync response");
if self.should_quit.load(Ordering::Acquire) {
tracing::info!("Synced with {}", user_id);
if should_quit.load(Ordering::Acquire) {
Ok(matrix_sdk::LoopCtrl::Break)
} else {
Ok(matrix_sdk::LoopCtrl::Continue)
@ -364,12 +359,39 @@ impl App {
}
async fn handle_sync_response(
&self,
&mut self,
action_tx: &mpsc::UnboundedSender<Action>,
client: matrix_sdk::Client,
sync_response: matrix_sdk::sync::SyncResponse,
) -> Result<()> {
// TODO
let known_rooms: HashSet<&matrix_sdk::ruma::RoomId> =
self.buffers.iter().flat_map(|buf| buf.room_id()).collect();
let new_rooms: Vec<_> = sync_response
.rooms
.join
.keys()
.filter(|room_id: &&matrix_sdk::ruma::OwnedRoomId| {
!known_rooms.contains::<matrix_sdk::ruma::RoomId>(room_id.as_ref())
})
.map(|room_id| (client.clone(), room_id.to_owned()))
.collect();
self.create_room_buffers(new_rooms.into_iter()).await;
Ok(())
}
async fn create_room_buffers(
&mut self,
rooms: impl Iterator<Item = (matrix_sdk::Client, matrix_sdk::ruma::OwnedRoomId)>,
) {
futures::future::join_all(
rooms
.map(|(client, room)| RoomBuffer::new(client, room))
.map(|fut| fut.map(|res| res.expect("Failed to create RoomBuffer at startup"))),
)
.await
.into_iter()
.map(|res| res)
.map(Box::new)
.for_each(|buffer| self.buffers.push(buffer));
}
}

View File

@ -28,6 +28,10 @@ pub use room::RoomBuffer;
pub trait Buffer: Send + Sync {
/// A short human-readable name for the room, eg. to show in compact buflist
fn short_name(&self) -> String;
/// If this is a room buffer, returns the associated room id.
fn room_id(&self) -> Option<&matrix_sdk::ruma::RoomId> {
None
}
/// Returns if there are any updates to apply.
async fn poll_updates(&mut self);
fn content(&self) -> Vec<ratatui::text::Text>; // TODO: make this lazy, only the last few are used

View File

@ -22,7 +22,7 @@ use eyeball_im::VectorDiff;
use futures::{FutureExt, Stream, StreamExt};
use itertools::Itertools;
use matrix_sdk::async_trait;
use matrix_sdk::ruma::OwnedRoomId;
use matrix_sdk::ruma::{OwnedRoomId, RoomId};
use matrix_sdk::Client;
use matrix_sdk::Room;
use matrix_sdk_ui::timeline::{
@ -290,6 +290,10 @@ impl Buffer for RoomBuffer {
self.room_id.as_str().to_owned()
}
fn room_id(&self) -> Option<&RoomId> {
Some(&self.room_id)
}
async fn poll_updates(&mut self) {
futures::future::join_all(
self