399 lines
14 KiB
Rust
399 lines
14 KiB
Rust
/*
|
|
* Copyright (C) 2023 Valentin Lorentz
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License version 3,
|
|
* as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU Affero General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
use std::cell::RefCell;
|
|
use std::collections::{HashSet, VecDeque};
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::sync::Arc;
|
|
|
|
use color_eyre::eyre::{eyre, Result, WrapErr};
|
|
use crossterm::event::KeyEvent;
|
|
use futures::stream::FuturesUnordered;
|
|
use futures::{FutureExt, StreamExt};
|
|
use nonempty::NonEmpty;
|
|
use ratatui::prelude::Rect;
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio::sync::mpsc;
|
|
|
|
use crate::{
|
|
action::Action,
|
|
buffers::{BufferId, Buffers, LogBuffer, RoomBuffer},
|
|
commands::RataCommands,
|
|
components::{Component, FpsCounter, Home},
|
|
config::Config,
|
|
mode::Mode,
|
|
tui,
|
|
};
|
|
|
|
pub struct App {
|
|
pub config: Arc<Config>,
|
|
pub clients: NonEmpty<matrix_sdk::Client>,
|
|
pub commands: RataCommands,
|
|
pub frame_rate: f64,
|
|
pub components: Vec<Box<dyn Component>>,
|
|
pub buffers: Buffers,
|
|
pub should_quit: Arc<AtomicBool>,
|
|
pub should_suspend: bool,
|
|
pub mode: Mode,
|
|
pub last_tick_key_events: RefCell<Vec<KeyEvent>>,
|
|
}
|
|
|
|
impl App {
|
|
pub async fn new(frame_rate: f64, log_receiver: mpsc::UnboundedReceiver<String>) -> Result<Self> {
|
|
let config = Config::new()?;
|
|
let datadir = config.config._data_dir.join("default");
|
|
let future_clients = config.accounts.clone().map(|conf| {
|
|
let datadir = datadir.clone();
|
|
tokio::spawn(async move {
|
|
let server_name = conf.user_id.server_name();
|
|
let user_dir = datadir.join(conf.user_id.to_string());
|
|
std::fs::create_dir_all(&user_dir)
|
|
.with_context(|| format!("Could not create {}", user_dir.display()))?;
|
|
// If there is at least one device state for that user, use it. Otherwise,
|
|
// generate a random device id
|
|
let device_dir = std::fs::read_dir(&user_dir)
|
|
.with_context(|| format!("Could not read {}", user_dir.display()))?
|
|
.next()
|
|
.and_then(|dir_entry| match dir_entry {
|
|
Ok(dir_entry) => Some(dir_entry.path()),
|
|
Err(e) => {
|
|
tracing::error!("Could not read entry in {}: {}", user_dir.display(), e);
|
|
None
|
|
},
|
|
})
|
|
.unwrap_or_else(|| user_dir.join(matrix_sdk::ruma::DeviceId::new().as_str()));
|
|
// Extract device id from the path
|
|
let device_id = device_dir
|
|
.as_path()
|
|
.file_name()
|
|
.expect("empty path to state store")
|
|
.to_str()
|
|
.ok_or_else(|| eyre!("device id in {} is not valid UTF8", device_dir.display()))?;
|
|
let client = matrix_sdk::Client::builder()
|
|
.server_name(server_name)
|
|
.sqlite_store(&device_dir, conf.data_passphrase.as_deref())
|
|
.build()
|
|
.await
|
|
.with_context(|| format!("Could not initialize client for {}", server_name))?;
|
|
client
|
|
.matrix_auth()
|
|
.login_username(&conf.user_id, &conf.password)
|
|
.initial_device_display_name(&conf.device_name)
|
|
.device_id(device_id)
|
|
.send()
|
|
.await
|
|
.with_context(|| format!("Could not login as {}", conf.user_id))?;
|
|
tracing::info!(
|
|
"Logged in as {}",
|
|
client
|
|
.user_id()
|
|
.ok_or_else(|| eyre!("Unexpectedly not logged in as {}", conf.user_id))?,
|
|
);
|
|
Ok::<_, color_eyre::eyre::Report>(client)
|
|
})
|
|
});
|
|
let mut clients = Vec::new();
|
|
for client in futures::future::join_all(future_clients).await {
|
|
clients.push(
|
|
client
|
|
.context("Could not join client init task")?
|
|
.context("Could not connect to all accounts")?,
|
|
);
|
|
}
|
|
let clients = NonEmpty::collect(clients).expect("map on NonEmpty returned empty vec");
|
|
|
|
let config = Arc::new(config);
|
|
let home = Home::new(config.clone());
|
|
let fps = FpsCounter::default();
|
|
let mode = Mode::Home;
|
|
|
|
let mut app = Self {
|
|
frame_rate,
|
|
components: vec![Box::new(home), Box::new(fps)],
|
|
should_quit: Arc::new(AtomicBool::new(false)),
|
|
should_suspend: false,
|
|
buffers: Buffers::new(Box::new(LogBuffer::new(log_receiver))),
|
|
clients,
|
|
config,
|
|
commands: RataCommands::new(),
|
|
mode,
|
|
last_tick_key_events: RefCell::new(Vec::new()),
|
|
};
|
|
crate::plugins::load_all(&mut app).context("Could not load plugins")?;
|
|
Ok(app)
|
|
}
|
|
|
|
pub async fn run(&mut self) -> Result<()> {
|
|
let (action_tx, mut action_rx) = mpsc::unbounded_channel();
|
|
let (sync_responses_tx, mut sync_responses_rx) = mpsc::unbounded_channel();
|
|
|
|
let mut tui = tui::Tui::new()?
|
|
.frame_rate(self.frame_rate)
|
|
.mouse(self.config.mouse.enable);
|
|
tui.enter()?;
|
|
|
|
for component in self.components.iter_mut() {
|
|
component.register_action_handler(action_tx.clone())?;
|
|
}
|
|
|
|
for component in self.components.iter_mut() {
|
|
component.init(tui.size()?)?;
|
|
}
|
|
|
|
self
|
|
.create_room_buffers(self.clients.clone().iter().flat_map(|client| {
|
|
tracing::info!("joined rooms {:?}", client.joined_rooms());
|
|
client
|
|
.joined_rooms()
|
|
.into_iter()
|
|
.map(move |room| (client.clone(), room.room_id().to_owned()))
|
|
}))
|
|
.await;
|
|
|
|
let sync_clients = self.clients.clone();
|
|
let mut sync_results = FuturesUnordered::new();
|
|
for client in sync_clients.iter() {
|
|
let user_id = client.user_id().expect("missing user id");
|
|
let server_name = user_id.server_name();
|
|
let mut sync_settings = matrix_sdk::config::SyncSettings::default();
|
|
if server_name == "matrix.org" {
|
|
// matrix.org is slow and frequently hits the timeout on initial sync,
|
|
// let's quadruple the default
|
|
sync_settings = sync_settings.timeout(std::time::Duration::from_secs(120));
|
|
}
|
|
tracing::info!("Syncing with {}...", user_id);
|
|
let should_quit = self.should_quit.clone();
|
|
let sync_responses_tx = sync_responses_tx.clone();
|
|
sync_results.push(client.sync_with_result_callback(sync_settings, move |res| {
|
|
let client2 = client.clone();
|
|
let should_quit = should_quit.clone();
|
|
let sync_responses_tx = sync_responses_tx.clone();
|
|
async move {
|
|
let sync_response = res.expect("Failed sync");
|
|
let user_id = client2.user_id().expect("missing user id").to_string();
|
|
sync_responses_tx
|
|
.send((client2, sync_response))
|
|
.expect("could not send sync response");
|
|
tracing::info!("Got updates for {}", user_id);
|
|
if should_quit.load(Ordering::Acquire) {
|
|
Ok(matrix_sdk::LoopCtrl::Break)
|
|
} else {
|
|
Ok(matrix_sdk::LoopCtrl::Continue)
|
|
}
|
|
}
|
|
}));
|
|
}
|
|
|
|
let mut changes_since_last_render = true;
|
|
loop {
|
|
tokio::select! {
|
|
e = tui.next() => {
|
|
if let Some(e) = e {
|
|
self.handle_tui_event(&action_tx, e.clone()).context("Error while handling TUI event")?;
|
|
for component in self.components.iter_mut() {
|
|
if let Some(action) = component.handle_events(Some(e.clone()))? {
|
|
action_tx.send(action).context("Error while sending action from component")?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
sync_response = sync_responses_rx.recv() => {
|
|
let (client, sync_response) = sync_response.expect("sync_responses_rx unexpectedly closed");
|
|
self.handle_sync_response(&action_tx, client, sync_response).await.context("Error while handling sync response")?;
|
|
}
|
|
poll_updates = self.buffers.poll_updates_once() => {
|
|
changes_since_last_render = true;
|
|
}
|
|
sync_result = sync_results.next() => {
|
|
if !self.should_quit.load(Ordering::Acquire) {
|
|
panic!("Sync ended unexpected: {:?}", sync_result);
|
|
}
|
|
}
|
|
}
|
|
|
|
while let Ok(action) = action_rx.try_recv() {
|
|
if action != Action::Render {
|
|
log::debug!("{action:?}");
|
|
}
|
|
match action {
|
|
Action::Quit => self.should_quit.store(true, Ordering::Release),
|
|
Action::Suspend => self.should_suspend = true,
|
|
Action::Resume => self.should_suspend = false,
|
|
Action::NextBuffer => {
|
|
changes_since_last_render = true;
|
|
// self.buffer implements wrap-around itself
|
|
self
|
|
.buffers
|
|
.set_active_index(self.buffers.active_index() as isize + 1)
|
|
},
|
|
Action::PreviousBuffer => {
|
|
changes_since_last_render = true;
|
|
// self.buffer implements wrap-around itself
|
|
self
|
|
.buffers
|
|
.set_active_index(self.buffers.active_index() as isize - 1)
|
|
},
|
|
Action::ToBuffer(buffer_id) => {
|
|
changes_since_last_render = true;
|
|
self.buffers.set_active_index(buffer_id)
|
|
},
|
|
Action::Resize(w, h) => {
|
|
changes_since_last_render = true;
|
|
tui.resize(Rect::new(0, 0, w, h))?;
|
|
tui.draw(|f| {
|
|
for component in self.components.iter_mut() {
|
|
let r = component.draw(f, f.size(), &self.buffers);
|
|
if let Err(e) = r {
|
|
action_tx
|
|
.send(Action::Error(format!("Failed to draw: {:?}", e)))
|
|
.unwrap();
|
|
}
|
|
}
|
|
})?;
|
|
},
|
|
Action::ShouldRender => {
|
|
changes_since_last_render = true;
|
|
},
|
|
Action::Render => {
|
|
if changes_since_last_render {
|
|
tui.draw(|f| {
|
|
for component in self.components.iter_mut() {
|
|
let r = component.draw(f, f.size(), &self.buffers);
|
|
if let Err(e) = r {
|
|
action_tx
|
|
.send(Action::Error(format!("Failed to draw: {:?}", e)))
|
|
.unwrap();
|
|
}
|
|
}
|
|
})?;
|
|
changes_since_last_render = false;
|
|
}
|
|
},
|
|
Action::RunCommand(ref command_line) => {
|
|
changes_since_last_render = true;
|
|
log::info!("Got command: {command_line}");
|
|
crate::commands::run_command(command_line, self, &action_tx)?;
|
|
},
|
|
_ => {},
|
|
}
|
|
for component in self.components.iter_mut() {
|
|
if let Some(action) = component.update(&action)? {
|
|
action_tx.send(action)?
|
|
};
|
|
}
|
|
}
|
|
|
|
if self.should_suspend {
|
|
tui.suspend()?;
|
|
action_tx.send(Action::Resume)?;
|
|
tui = tui::Tui::new()?
|
|
.frame_rate(self.frame_rate)
|
|
.mouse(self.config.mouse.enable);
|
|
tui.enter()?;
|
|
} else if self.should_quit.load(Ordering::Acquire) {
|
|
tui.stop()?;
|
|
break;
|
|
}
|
|
}
|
|
tui.exit()?;
|
|
Ok(())
|
|
}
|
|
|
|
fn handle_tui_event(
|
|
&mut self,
|
|
action_tx: &mpsc::UnboundedSender<Action>,
|
|
e: tui::Event,
|
|
) -> Result<()> {
|
|
match e {
|
|
tui::Event::Quit => action_tx.send(Action::Quit)?,
|
|
tui::Event::Render => action_tx.send(Action::Render)?,
|
|
tui::Event::Resize(x, y) => action_tx.send(Action::Resize(x, y))?,
|
|
tui::Event::Mouse(event) => {
|
|
// Don't go through the action queue, we need to process mouse events immediately
|
|
// or stuff may move on screen before we process the click
|
|
for component in &mut self.components {
|
|
if let Some(action) = component.handle_mouse_events(event)? {
|
|
action_tx.send(action)?;
|
|
}
|
|
}
|
|
},
|
|
tui::Event::Key(key) => {
|
|
if let Some(keymap) = self.config.keybindings.get(&self.mode) {
|
|
if let Some(command_line) = keymap.get(&vec![key]) {
|
|
action_tx.send(Action::RunCommand(command_line.clone()))?;
|
|
} else {
|
|
let mut last_tick_key_events = self.last_tick_key_events.borrow_mut();
|
|
|
|
// If the key was not handled as a single key action,
|
|
// then consider it for multi-key combinations.
|
|
last_tick_key_events.push(key);
|
|
|
|
// Check for multi-key combinations
|
|
if let Some(command_line) = keymap.get(&*last_tick_key_events) {
|
|
action_tx.send(Action::RunCommand(command_line.clone()))?;
|
|
}
|
|
}
|
|
};
|
|
},
|
|
_ => {},
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_sync_response(
|
|
&mut self,
|
|
action_tx: &mpsc::UnboundedSender<Action>,
|
|
client: matrix_sdk::Client,
|
|
sync_response: matrix_sdk::sync::SyncResponse,
|
|
) -> Result<()> {
|
|
let known_rooms: HashSet<Arc<matrix_sdk::ruma::OwnedRoomId>> = self
|
|
.buffers
|
|
.iter()
|
|
.flat_map(|buf| match buf.id() {
|
|
BufferId::Room(room_id) => Some(room_id),
|
|
_ => None,
|
|
})
|
|
.collect();
|
|
let new_rooms: Vec<_> = sync_response
|
|
.rooms
|
|
.join
|
|
.keys()
|
|
.filter(|room_id: &&matrix_sdk::ruma::OwnedRoomId| {
|
|
!known_rooms.contains(&Arc::new((*room_id).clone()))
|
|
})
|
|
.map(|room_id| (client.clone(), room_id.to_owned()))
|
|
.collect();
|
|
self.create_room_buffers(new_rooms.into_iter()).await;
|
|
Ok(())
|
|
}
|
|
|
|
async fn create_room_buffers(
|
|
&mut self,
|
|
rooms: impl Iterator<Item = (matrix_sdk::Client, matrix_sdk::ruma::OwnedRoomId)>,
|
|
) {
|
|
futures::future::join_all(
|
|
rooms
|
|
.map(|(client, room)| RoomBuffer::new(self.config.clone(), client, Arc::new(room)))
|
|
.map(|fut| fut.map(|res| res.expect("Failed to create RoomBuffer at startup"))),
|
|
)
|
|
.await
|
|
.into_iter()
|
|
.map(Box::new)
|
|
.for_each(|buffer| self.buffers.push(buffer));
|
|
}
|
|
}
|