ratatrix/src/app.rs

399 lines
14 KiB
Rust

/*
* Copyright (C) 2023 Valentin Lorentz
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::cell::RefCell;
use std::collections::{HashSet, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use color_eyre::eyre::{eyre, Result, WrapErr};
use crossterm::event::KeyEvent;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use nonempty::NonEmpty;
use ratatui::prelude::Rect;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use crate::{
action::Action,
buffers::{BufferId, Buffers, LogBuffer, RoomBuffer},
commands::RataCommands,
components::{Component, FpsCounter, Home},
config::Config,
mode::Mode,
tui,
};
pub struct App {
pub config: Arc<Config>,
pub clients: NonEmpty<matrix_sdk::Client>,
pub commands: RataCommands,
pub frame_rate: f64,
pub components: Vec<Box<dyn Component>>,
pub buffers: Buffers,
pub should_quit: Arc<AtomicBool>,
pub should_suspend: bool,
pub mode: Mode,
pub last_tick_key_events: RefCell<Vec<KeyEvent>>,
}
impl App {
pub async fn new(frame_rate: f64, log_receiver: mpsc::UnboundedReceiver<String>) -> Result<Self> {
let config = Config::new()?;
let datadir = config.config._data_dir.join("default");
let future_clients = config.accounts.clone().map(|conf| {
let datadir = datadir.clone();
tokio::spawn(async move {
let server_name = conf.user_id.server_name();
let user_dir = datadir.join(conf.user_id.to_string());
std::fs::create_dir_all(&user_dir)
.with_context(|| format!("Could not create {}", user_dir.display()))?;
// If there is at least one device state for that user, use it. Otherwise,
// generate a random device id
let device_dir = std::fs::read_dir(&user_dir)
.with_context(|| format!("Could not read {}", user_dir.display()))?
.next()
.and_then(|dir_entry| match dir_entry {
Ok(dir_entry) => Some(dir_entry.path()),
Err(e) => {
tracing::error!("Could not read entry in {}: {}", user_dir.display(), e);
None
},
})
.unwrap_or_else(|| user_dir.join(matrix_sdk::ruma::DeviceId::new().as_str()));
// Extract device id from the path
let device_id = device_dir
.as_path()
.file_name()
.expect("empty path to state store")
.to_str()
.ok_or_else(|| eyre!("device id in {} is not valid UTF8", device_dir.display()))?;
let client = matrix_sdk::Client::builder()
.server_name(server_name)
.sqlite_store(&device_dir, conf.data_passphrase.as_deref())
.build()
.await
.with_context(|| format!("Could not initialize client for {}", server_name))?;
client
.matrix_auth()
.login_username(&conf.user_id, &conf.password)
.initial_device_display_name(&conf.device_name)
.device_id(device_id)
.send()
.await
.with_context(|| format!("Could not login as {}", conf.user_id))?;
tracing::info!(
"Logged in as {}",
client
.user_id()
.ok_or_else(|| eyre!("Unexpectedly not logged in as {}", conf.user_id))?,
);
Ok::<_, color_eyre::eyre::Report>(client)
})
});
let mut clients = Vec::new();
for client in futures::future::join_all(future_clients).await {
clients.push(
client
.context("Could not join client init task")?
.context("Could not connect to all accounts")?,
);
}
let clients = NonEmpty::collect(clients).expect("map on NonEmpty returned empty vec");
let config = Arc::new(config);
let home = Home::new(config.clone());
let fps = FpsCounter::default();
let mode = Mode::Home;
let mut app = Self {
frame_rate,
components: vec![Box::new(home), Box::new(fps)],
should_quit: Arc::new(AtomicBool::new(false)),
should_suspend: false,
buffers: Buffers::new(Box::new(LogBuffer::new(log_receiver))),
clients,
config,
commands: RataCommands::new(),
mode,
last_tick_key_events: RefCell::new(Vec::new()),
};
crate::plugins::load_all(&mut app).context("Could not load plugins")?;
Ok(app)
}
pub async fn run(&mut self) -> Result<()> {
let (action_tx, mut action_rx) = mpsc::unbounded_channel();
let (sync_responses_tx, mut sync_responses_rx) = mpsc::unbounded_channel();
let mut tui = tui::Tui::new()?
.frame_rate(self.frame_rate)
.mouse(self.config.mouse.enable);
tui.enter()?;
for component in self.components.iter_mut() {
component.register_action_handler(action_tx.clone())?;
}
for component in self.components.iter_mut() {
component.init(tui.size()?)?;
}
self
.create_room_buffers(self.clients.clone().iter().flat_map(|client| {
tracing::info!("joined rooms {:?}", client.joined_rooms());
client
.joined_rooms()
.into_iter()
.map(move |room| (client.clone(), room.room_id().to_owned()))
}))
.await;
let sync_clients = self.clients.clone();
let mut sync_results = FuturesUnordered::new();
for client in sync_clients.iter() {
let user_id = client.user_id().expect("missing user id");
let server_name = user_id.server_name();
let mut sync_settings = matrix_sdk::config::SyncSettings::default();
if server_name == "matrix.org" {
// matrix.org is slow and frequently hits the timeout on initial sync,
// let's quadruple the default
sync_settings = sync_settings.timeout(std::time::Duration::from_secs(120));
}
tracing::info!("Syncing with {}...", user_id);
let should_quit = self.should_quit.clone();
let sync_responses_tx = sync_responses_tx.clone();
sync_results.push(client.sync_with_result_callback(sync_settings, move |res| {
let client2 = client.clone();
let should_quit = should_quit.clone();
let sync_responses_tx = sync_responses_tx.clone();
async move {
let sync_response = res.expect("Failed sync");
let user_id = client2.user_id().expect("missing user id").to_string();
sync_responses_tx
.send((client2, sync_response))
.expect("could not send sync response");
tracing::info!("Got updates for {}", user_id);
if should_quit.load(Ordering::Acquire) {
Ok(matrix_sdk::LoopCtrl::Break)
} else {
Ok(matrix_sdk::LoopCtrl::Continue)
}
}
}));
}
let mut changes_since_last_render = true;
loop {
tokio::select! {
e = tui.next() => {
if let Some(e) = e {
self.handle_tui_event(&action_tx, e.clone()).context("Error while handling TUI event")?;
for component in self.components.iter_mut() {
if let Some(action) = component.handle_events(Some(e.clone()))? {
action_tx.send(action).context("Error while sending action from component")?;
}
}
}
}
sync_response = sync_responses_rx.recv() => {
let (client, sync_response) = sync_response.expect("sync_responses_rx unexpectedly closed");
self.handle_sync_response(&action_tx, client, sync_response).await.context("Error while handling sync response")?;
}
poll_updates = self.buffers.poll_updates_once() => {
changes_since_last_render = true;
}
sync_result = sync_results.next() => {
if !self.should_quit.load(Ordering::Acquire) {
panic!("Sync ended unexpected: {:?}", sync_result);
}
}
}
while let Ok(action) = action_rx.try_recv() {
if action != Action::Render {
log::debug!("{action:?}");
}
match action {
Action::Quit => self.should_quit.store(true, Ordering::Release),
Action::Suspend => self.should_suspend = true,
Action::Resume => self.should_suspend = false,
Action::NextBuffer => {
changes_since_last_render = true;
// self.buffer implements wrap-around itself
self
.buffers
.set_active_index(self.buffers.active_index() as isize + 1)
},
Action::PreviousBuffer => {
changes_since_last_render = true;
// self.buffer implements wrap-around itself
self
.buffers
.set_active_index(self.buffers.active_index() as isize - 1)
},
Action::ToBuffer(buffer_id) => {
changes_since_last_render = true;
self.buffers.set_active_index(buffer_id)
},
Action::Resize(w, h) => {
changes_since_last_render = true;
tui.resize(Rect::new(0, 0, w, h))?;
tui.draw(|f| {
for component in self.components.iter_mut() {
let r = component.draw(f, f.size(), &self.buffers);
if let Err(e) = r {
action_tx
.send(Action::Error(format!("Failed to draw: {:?}", e)))
.unwrap();
}
}
})?;
},
Action::ShouldRender => {
changes_since_last_render = true;
},
Action::Render => {
if changes_since_last_render {
tui.draw(|f| {
for component in self.components.iter_mut() {
let r = component.draw(f, f.size(), &self.buffers);
if let Err(e) = r {
action_tx
.send(Action::Error(format!("Failed to draw: {:?}", e)))
.unwrap();
}
}
})?;
changes_since_last_render = false;
}
},
Action::RunCommand(ref command_line) => {
changes_since_last_render = true;
log::info!("Got command: {command_line}");
crate::commands::run_command(command_line, self, &action_tx)?;
},
_ => {},
}
for component in self.components.iter_mut() {
if let Some(action) = component.update(&action)? {
action_tx.send(action)?
};
}
}
if self.should_suspend {
tui.suspend()?;
action_tx.send(Action::Resume)?;
tui = tui::Tui::new()?
.frame_rate(self.frame_rate)
.mouse(self.config.mouse.enable);
tui.enter()?;
} else if self.should_quit.load(Ordering::Acquire) {
tui.stop()?;
break;
}
}
tui.exit()?;
Ok(())
}
fn handle_tui_event(
&mut self,
action_tx: &mpsc::UnboundedSender<Action>,
e: tui::Event,
) -> Result<()> {
match e {
tui::Event::Quit => action_tx.send(Action::Quit)?,
tui::Event::Render => action_tx.send(Action::Render)?,
tui::Event::Resize(x, y) => action_tx.send(Action::Resize(x, y))?,
tui::Event::Mouse(event) => {
// Don't go through the action queue, we need to process mouse events immediately
// or stuff may move on screen before we process the click
for component in &mut self.components {
if let Some(action) = component.handle_mouse_events(event)? {
action_tx.send(action)?;
}
}
},
tui::Event::Key(key) => {
if let Some(keymap) = self.config.keybindings.get(&self.mode) {
if let Some(command_line) = keymap.get(&vec![key]) {
action_tx.send(Action::RunCommand(command_line.clone()))?;
} else {
let mut last_tick_key_events = self.last_tick_key_events.borrow_mut();
// If the key was not handled as a single key action,
// then consider it for multi-key combinations.
last_tick_key_events.push(key);
// Check for multi-key combinations
if let Some(command_line) = keymap.get(&*last_tick_key_events) {
action_tx.send(Action::RunCommand(command_line.clone()))?;
}
}
};
},
_ => {},
}
Ok(())
}
async fn handle_sync_response(
&mut self,
action_tx: &mpsc::UnboundedSender<Action>,
client: matrix_sdk::Client,
sync_response: matrix_sdk::sync::SyncResponse,
) -> Result<()> {
let known_rooms: HashSet<Arc<matrix_sdk::ruma::OwnedRoomId>> = self
.buffers
.iter()
.flat_map(|buf| match buf.id() {
BufferId::Room(room_id) => Some(room_id),
_ => None,
})
.collect();
let new_rooms: Vec<_> = sync_response
.rooms
.join
.keys()
.filter(|room_id: &&matrix_sdk::ruma::OwnedRoomId| {
!known_rooms.contains(&Arc::new((*room_id).clone()))
})
.map(|room_id| (client.clone(), room_id.to_owned()))
.collect();
self.create_room_buffers(new_rooms.into_iter()).await;
Ok(())
}
async fn create_room_buffers(
&mut self,
rooms: impl Iterator<Item = (matrix_sdk::Client, matrix_sdk::ruma::OwnedRoomId)>,
) {
futures::future::join_all(
rooms
.map(|(client, room)| RoomBuffer::new(self.config.clone(), client, Arc::new(room)))
.map(|fut| fut.map(|res| res.expect("Failed to create RoomBuffer at startup"))),
)
.await
.into_iter()
.map(Box::new)
.for_each(|buffer| self.buffers.push(buffer));
}
}