From bfa1e57f7154bc3238fa3cec101e1cf39b12e5d9 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 24 Jul 2023 08:33:21 +0000 Subject: [PATCH] split federate bin/lib --- crates/federate/src/lib.rs | 175 +++++++++++++++++++++++++++++++++++ crates/federate/src/main.rs | 179 +----------------------------------- crates/utils/translations | 2 +- 3 files changed, 181 insertions(+), 175 deletions(-) create mode 100644 crates/federate/src/lib.rs diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs new file mode 100644 index 000000000..0c88c3a6e --- /dev/null +++ b/crates/federate/src/lib.rs @@ -0,0 +1,175 @@ +use crate::{ + util::{retry_sleep_duration, spawn_cancellable}, + worker::instance_worker, +}; +use activitypub_federation::config::FederationConfig; +use chrono::{Local, Timelike}; +use clap::Parser; +use federation_queue_state::FederationQueueState; +use futures::Future; +use lemmy_db_schema::{ + source::instance::Instance, + utils::{ActualDbPool, DbPool}, +}; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; +use tokio::{ + sync::mpsc::{unbounded_channel, UnboundedReceiver}, + time::sleep, +}; +use tokio_util::sync::CancellationToken; + +mod federation_queue_state; +mod util; +mod worker; + +static WORKER_EXIT_TIMEOUT: Duration = Duration::from_secs(30); + +#[derive(Parser, Debug)] +pub struct Opts { + /// how many processes you are starting in total + #[arg(default_value_t = 1)] + pub process_count: i32, + /// the index of this process (1-based: 1 - process_count) + #[arg(default_value_t = 1)] + pub process_index: i32, +} + +async fn start_stop_federation_workers( + opts: Opts, + pool: ActualDbPool, + federation_config: FederationConfig, + cancel: CancellationToken, +) -> anyhow::Result<()> { + let mut workers = HashMap::new(); + + let (stats_sender, stats_receiver) = unbounded_channel(); + let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver)); + let pool2 = &mut DbPool::Pool(&pool); + let process_index = opts.process_index - 1; + loop { + let dead: HashSet = HashSet::from_iter(Instance::dead_instances(pool2).await?); + let mut total_count = 0; + let mut dead_count = 0; + let mut disallowed_count = 0; + for (instance, allowed) in Instance::read_all_with_blocked(pool2).await?.into_iter() { + if instance.id.inner() % opts.process_count != process_index { + continue; + } + total_count += 1; + if !allowed { + disallowed_count += 1; + } + let is_dead = dead.contains(&instance.domain); + if is_dead { + dead_count += 1; + } + let should_federate = allowed && !is_dead; + if !workers.contains_key(&instance.id) && should_federate { + let stats_sender = stats_sender.clone(); + workers.insert( + instance.id, + spawn_cancellable(WORKER_EXIT_TIMEOUT, |stop| { + instance_worker( + pool.clone(), + instance, + federation_config.to_request_data(), + stop, + stats_sender, + ) + }), + ); + } else if !should_federate { + if let Some(worker) = workers.remove(&instance.id) { + if let Err(e) = worker.await { + tracing::error!("error stopping worker: {e}"); + } + } + } + } + let worker_count = workers.len(); + tracing::info!("Federating to {worker_count}/{total_count} instances ({dead_count} dead, {disallowed_count} disallowed)"); + tokio::select! { + () = sleep(Duration::from_secs(60)) => {}, + _ = cancel.cancelled() => { break; } + } + } + drop(stats_sender); + tracing::warn!( + "Waiting for {} workers ({:.2?} max)", + workers.len(), + WORKER_EXIT_TIMEOUT + ); + futures::future::join_all(workers.into_values()).await; + exit_print.await?; + Ok(()) +} + +/// starts and stops federation workers depending on which instances are on db +/// await the returned future to stop/cancel all workers gracefully +pub fn start_stop_federation_workers_cancellable( + opts: Opts, + pool: ActualDbPool, + config: FederationConfig, +) -> impl Future> { + spawn_cancellable(WORKER_EXIT_TIMEOUT, move |c| { + start_stop_federation_workers(opts, pool, config, c) + }) +} + +/// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped) +async fn receive_print_stats( + pool: ActualDbPool, + mut receiver: UnboundedReceiver, +) { + let mut pool = &mut DbPool::Pool(&pool); + let mut printerval = tokio::time::interval(Duration::from_secs(60)); + printerval.tick().await; // skip first + let mut stats = HashMap::new(); + loop { + tokio::select! { + ele = receiver.recv() => { + let Some(ele) = ele else { + tracing::info!("done. quitting"); + print_stats(&mut pool, &stats).await; + return; + }; + stats.insert(ele.domain.clone(), ele); + }, + _ = printerval.tick() => { + print_stats(&mut pool, &stats).await; + } + } + } +} + +async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { + let last_id = crate::util::get_latest_activity_id(pool).await; + let Ok(last_id) = last_id else { + tracing::error!("could not get last id"); + return; + }; + // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be considered up to date + tracing::info!( + "Federation state as of {}:", + Local::now().with_nanosecond(0).unwrap().to_rfc3339() + ); + // todo: less noisy output (only output failing instances and summary for successful) + // todo: more stats (act/sec, avg http req duration) + for stat in stats.values() { + let behind = last_id - stat.last_successful_id; + if stat.fail_count > 0 { + tracing::info!( + "{}: Warning. {} behind, {} consecutive fails, current retry delay {:.2?}", + stat.domain, + behind, + stat.fail_count, + retry_sleep_duration(stat.fail_count) + ); + } else { + tracing::info!("{}: Ok. {} behind", stat.domain, behind); + } + } +} diff --git a/crates/federate/src/main.rs b/crates/federate/src/main.rs index d73d58233..e17b23688 100644 --- a/crates/federate/src/main.rs +++ b/crates/federate/src/main.rs @@ -1,129 +1,14 @@ -use crate::{ - util::{retry_sleep_duration, spawn_cancellable}, - worker::instance_worker, -}; use activitypub_federation::config::FederationConfig; -use chrono::{Local, Timelike}; use clap::Parser; -use federation_queue_state::FederationQueueState; -use futures::Future; use lemmy_api_common::request::build_user_agent; use lemmy_apub::{VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT}; -use lemmy_db_schema::{ - source::instance::Instance, - utils::{build_db_pool, ActualDbPool, DbPool}, -}; +use lemmy_db_schema::utils::build_db_pool; +use lemmy_federate::Opts; use lemmy_utils::{error::LemmyErrorExt2, settings::SETTINGS, REQWEST_TIMEOUT}; use reqwest::Client; use reqwest_middleware::ClientBuilder; use reqwest_tracing::TracingMiddleware; -use std::{ - collections::{HashMap, HashSet}, - time::Duration, -}; -use tokio::{ - signal::unix::SignalKind, - sync::mpsc::{unbounded_channel, UnboundedReceiver}, - time::sleep, -}; -use tokio_util::sync::CancellationToken; - -mod federation_queue_state; -mod util; -mod worker; - -static WORKER_EXIT_TIMEOUT: Duration = Duration::from_secs(30); - -#[derive(Parser, Debug)] -pub struct Opts { - /// how many processes you are starting in total - #[arg(default_value_t = 1)] - pub process_count: i32, - /// the index of this process (1-based: 1 - process_count) - #[arg(default_value_t = 1)] - pub process_index: i32, -} - -/// starts and stops federation workers depending on which instances are on db -async fn start_stop_federation_workers( - opts: Opts, - pool: ActualDbPool, - federation_config: FederationConfig, - cancel: CancellationToken, -) -> anyhow::Result<()> { - let mut workers = HashMap::new(); - - let (stats_sender, stats_receiver) = unbounded_channel(); - let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver)); - let pool2 = &mut DbPool::Pool(&pool); - let process_index = opts.process_index - 1; - loop { - let dead: HashSet = HashSet::from_iter(Instance::dead_instances(pool2).await?); - let mut total_count = 0; - let mut dead_count = 0; - let mut disallowed_count = 0; - for (instance, allowed) in Instance::read_all_with_blocked(pool2).await?.into_iter() { - if instance.id.inner() % opts.process_count != process_index { - continue; - } - total_count += 1; - if !allowed { - disallowed_count += 1; - } - let is_dead = dead.contains(&instance.domain); - if is_dead { - dead_count += 1; - } - let should_federate = allowed && !is_dead; - if !workers.contains_key(&instance.id) && should_federate { - let stats_sender = stats_sender.clone(); - workers.insert( - instance.id, - spawn_cancellable(WORKER_EXIT_TIMEOUT, |stop| { - instance_worker( - pool.clone(), - instance, - federation_config.to_request_data(), - stop, - stats_sender, - ) - }), - ); - } else if !should_federate { - if let Some(worker) = workers.remove(&instance.id) { - if let Err(e) = worker.await { - tracing::error!("error stopping worker: {e}"); - } - } - } - } - let worker_count = workers.len(); - tracing::info!("Federating to {worker_count}/{total_count} instances ({dead_count} dead, {disallowed_count} disallowed)"); - tokio::select! { - () = sleep(Duration::from_secs(60)) => {}, - _ = cancel.cancelled() => { break; } - } - } - drop(stats_sender); - tracing::warn!( - "Waiting for {} workers ({:.2?} max)", - workers.len(), - WORKER_EXIT_TIMEOUT - ); - futures::future::join_all(workers.into_values()).await; - exit_print.await?; - Ok(()) -} - -pub fn start_stop_federation_workers_cancellable( - opts: Opts, - pool: ActualDbPool, - config: FederationConfig, -) -> impl Future> { - spawn_cancellable(WORKER_EXIT_TIMEOUT, move |c| { - start_stop_federation_workers(opts, pool, config, c) - }) -} +use tokio::signal::unix::SignalKind; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -155,7 +40,8 @@ async fn main() -> anyhow::Result<()> { let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; - let cancel = start_stop_federation_workers_cancellable(opts, pool, federation_config); + let cancel = + lemmy_federate::start_stop_federation_workers_cancellable(opts, pool, federation_config); tokio::select! { _ = tokio::signal::ctrl_c() => { tracing::warn!("Received ctrl-c, shutting down gracefully..."); @@ -170,58 +56,3 @@ async fn main() -> anyhow::Result<()> { cancel.await?; Ok(()) } - -/// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped) -async fn receive_print_stats( - pool: ActualDbPool, - mut receiver: UnboundedReceiver, -) { - let mut pool = &mut DbPool::Pool(&pool); - let mut printerval = tokio::time::interval(Duration::from_secs(60)); - printerval.tick().await; // skip first - let mut stats = HashMap::new(); - loop { - tokio::select! { - ele = receiver.recv() => { - let Some(ele) = ele else { - tracing::info!("done. quitting"); - print_stats(&mut pool, &stats).await; - return; - }; - stats.insert(ele.domain.clone(), ele); - }, - _ = printerval.tick() => { - print_stats(&mut pool, &stats).await; - } - } - } -} - -async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { - let last_id = crate::util::get_latest_activity_id(pool).await; - let Ok(last_id) = last_id else { - tracing::error!("could not get last id"); - return; - }; - // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be considered up to date - tracing::info!( - "Federation state as of {}:", - Local::now().with_nanosecond(0).unwrap().to_rfc3339() - ); - // todo: less noisy output (only output failing instances and summary for successful) - // todo: more stats (act/sec, avg http req duration) - for stat in stats.values() { - let behind = last_id - stat.last_successful_id; - if stat.fail_count > 0 { - tracing::info!( - "{}: Warning. {} behind, {} consecutive fails, current retry delay {:.2?}", - stat.domain, - behind, - stat.fail_count, - retry_sleep_duration(stat.fail_count) - ); - } else { - tracing::info!("{}: Ok. {} behind", stat.domain, behind); - } - } -} diff --git a/crates/utils/translations b/crates/utils/translations index 713ceed9c..f9ed06989 160000 --- a/crates/utils/translations +++ b/crates/utils/translations @@ -1 +1 @@ -Subproject commit 713ceed9c7ef84deaa222e68361e670e0763cd83 +Subproject commit f9ed0698944cb6d44dc733677a4634b7bc916536