From c19211f255588c024418b02047d79e1c2a176c84 Mon Sep 17 00:00:00 2001 From: phiresky Date: Sun, 23 Jul 2023 22:04:28 +0000 Subject: [PATCH] make federation workers function callable from outside --- Cargo.lock | 171 +++++++++++++++++++++------------- crates/federate/Cargo.toml | 1 + crates/federate/src/main.rs | 131 +++++++++++++++++--------- crates/federate/src/util.rs | 2 +- crates/federate/src/worker.rs | 2 +- 5 files changed, 197 insertions(+), 110 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8fbe09244..0d3a8c026 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -402,6 +402,55 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is-terminal", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a30da5c5f2d5e72842e00bcb57657162cdabef0931f40e2deb9b4140440cecd" + +[[package]] +name = "anstyle-parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +dependencies = [ + "windows-sys 0.48.0", +] + +[[package]] +name = "anstyle-wincon" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" +dependencies = [ + "anstyle", + "windows-sys 0.48.0", +] + [[package]] name = "anyhow" version = "1.0.71" @@ -715,9 +764,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.3.1" +version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6776fc96284a0bb647b615056fc496d1fe1644a7ab01829818a6d91cae888b84" +checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" [[package]] name = "bitvec" @@ -904,40 +953,44 @@ dependencies = [ [[package]] name = "clap" -version = "4.0.32" +version = "4.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7db700bc935f9e43e88d00b0850dae18a63773cfbec6d8e070fccf7fef89a39" +checksum = "5fd304a20bff958a57f04c4e96a2e7594cc4490a0e809cbd48bb6437edaa452d" dependencies = [ - "bitflags 1.3.2", + "clap_builder", "clap_derive", - "clap_lex", - "is-terminal", "once_cell", +] + +[[package]] +name = "clap_builder" +version = "4.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01c6a3f08f1fe5662a35cfe393aec09c4df95f60ee93b7556505260f75eee9e1" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", "strsim", - "termcolor", ] [[package]] name = "clap_derive" -version = "4.0.21" +version = "4.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0177313f9f02afc995627906bbd8967e2be069f5261954222dac78290c2b9014" +checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050" dependencies = [ "heck", - "proc-macro-error", "proc-macro2", "quote", - "syn 1.0.103", + "syn 2.0.25", ] [[package]] name = "clap_lex" -version = "0.3.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8" -dependencies = [ - "os_str_bytes", -] +checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" [[package]] name = "clokwerk" @@ -991,6 +1044,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b" +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + [[package]] name = "combine" version = "4.6.6" @@ -1438,7 +1497,7 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7a532c1f99a0f596f6960a60d1e119e91582b24b39e2d83a190e61262c3ef0c" dependencies = [ - "bitflags 2.3.1", + "bitflags 2.3.3", "byteorder", "chrono", "diesel_derives", @@ -2125,15 +2184,6 @@ dependencies = [ "libc", ] -[[package]] -name = "hermit-abi" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.3.2" @@ -2484,14 +2534,13 @@ checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" [[package]] name = "is-terminal" -version = "0.4.2" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28dfb6c8100ccc63462345b67d1bbc3679177c75ee4bf59bf29c8b1d110b8189" +checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ - "hermit-abi 0.2.6", - "io-lifetimes", - "rustix 0.36.5", - "windows-sys 0.42.0", + "hermit-abi 0.3.2", + "rustix 0.38.3", + "windows-sys 0.48.0", ] [[package]] @@ -2796,6 +2845,7 @@ dependencies = [ "async-trait", "bytes", "chrono", + "clap", "dashmap", "diesel", "diesel-async", @@ -3020,6 +3070,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" +[[package]] +name = "linux-raw-sys" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" + [[package]] name = "local-channel" version = "0.1.3" @@ -3608,12 +3664,6 @@ dependencies = [ "hashbrown 0.12.3", ] -[[package]] -name = "os_str_bytes" -version = "6.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" - [[package]] name = "overload" version = "0.1.1" @@ -4029,30 +4079,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" -[[package]] -name = "proc-macro-error" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn 1.0.103", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" -dependencies = [ - "proc-macro2", - "quote", - "version_check", -] - [[package]] name = "proc-macro2" version = "1.0.64" @@ -4595,6 +4621,19 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "rustix" +version = "0.38.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac5ffa1efe7548069688cd7028f32591853cd7b5b756d41bcffd2353e4fc75b4" +dependencies = [ + "bitflags 2.3.3", + "errno 0.3.1", + "libc", + "linux-raw-sys 0.4.3", + "windows-sys 0.48.0", +] + [[package]] name = "rustls" version = "0.20.7" @@ -5925,6 +5964,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5190c9442dcdaf0ddd50f37420417d219ae5261bbf5db120d0f9bab996c9cba1" +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "uuid" version = "1.4.0" diff --git a/crates/federate/Cargo.toml b/crates/federate/Cargo.toml index 06794499b..5c1c1a9e7 100644 --- a/crates/federate/Cargo.toml +++ b/crates/federate/Cargo.toml @@ -16,6 +16,7 @@ anyhow.workspace = true async-trait = "0.1.71" bytes = "1.4.0" chrono.workspace = true +clap = { version = "4.3.19", features = ["derive"] } dashmap = "5.5.0" diesel = { workspace = true, features = ["postgres", "chrono", "serde_json"] } diesel-async = { workspace = true, features = ["deadpool", "postgres"] } diff --git a/crates/federate/src/main.rs b/crates/federate/src/main.rs index 142f2b078..f150b1ded 100644 --- a/crates/federate/src/main.rs +++ b/crates/federate/src/main.rs @@ -4,7 +4,9 @@ use crate::{ }; use activitypub_federation::config::FederationConfig; use chrono::{Local, Timelike}; +use clap::Parser; use federation_queue_state::FederationQueueState; +use futures::Future; use lemmy_api_common::request::build_user_agent; use lemmy_apub::{VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT}; use lemmy_db_schema::{ @@ -15,12 +17,16 @@ use lemmy_utils::{error::LemmyErrorExt2, settings::SETTINGS, REQWEST_TIMEOUT}; use reqwest::Client; use reqwest_middleware::ClientBuilder; use reqwest_tracing::TracingMiddleware; -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; use tokio::{ signal::unix::SignalKind, sync::mpsc::{unbounded_channel, UnboundedReceiver}, time::sleep, }; +use tokio_util::sync::CancellationToken; mod federation_queue_state; mod util; @@ -28,46 +34,35 @@ mod worker; static WORKER_EXIT_TIMEOUT: Duration = Duration::from_secs(30); -#[tokio::main] -async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); - let settings = SETTINGS.to_owned(); - // TODO: wait until migrations are applied? or are they safe from race conditions and i can just call run_migrations here as well? - let pool = build_db_pool(&settings).await.into_anyhow()?; - let user_agent = build_user_agent(&settings); - let reqwest_client = Client::builder() - .user_agent(user_agent.clone()) - .timeout(REQWEST_TIMEOUT) - .connect_timeout(REQWEST_TIMEOUT) - .build()?; +#[derive(Parser, Debug)] +pub struct Opts { + /// how many processes you are starting in total + #[arg(default_value_t = 1)] + pub process_count: i32, + /// the index of this process (1-based: 1 - process_count) + #[arg(default_value_t = 1)] + pub process_index: i32, +} - let client = ClientBuilder::new(reqwest_client.clone()) - .with(TracingMiddleware::default()) - .build(); - - let federation_config = FederationConfig::builder() - .domain(settings.hostname.clone()) - .app_data(()) - .client(client.clone()) - .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) - .http_signature_compat(true) - .url_verifier(Box::new(VerifyUrlData(pool.clone()))) - .build() - .await?; - let process_num = 1 - 1; // todo: pass these in via command line args - let process_count = 1; +/// starts and stops federation workers depending on which instances are on db +async fn start_stop_federation_workers( + opts: Opts, + pool: ActualDbPool, + federation_config: FederationConfig, + cancel: CancellationToken, +) -> anyhow::Result<()> { let mut workers = HashMap::new(); let (stats_sender, stats_receiver) = unbounded_channel(); let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver)); - let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; - let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; let pool2 = &mut DbPool::Pool(&pool); loop { - for (instance, should_federate) in Instance::read_all_with_blocked(pool2).await?.into_iter() { - if instance.id.inner() % process_count != process_num { + let dead: HashSet = HashSet::from_iter(Instance::dead_instances(pool2).await?); + for (instance, allowed) in Instance::read_all_with_blocked(pool2).await?.into_iter() { + if instance.id.inner() % opts.process_count != opts.process_index { continue; } + let should_federate = allowed && !dead.contains(&instance.domain); if !workers.contains_key(&instance.id) && should_federate { let stats_sender = stats_sender.clone(); workers.insert( @@ -92,18 +87,7 @@ async fn main() -> anyhow::Result<()> { } tokio::select! { () = sleep(Duration::from_secs(60)) => {}, - _ = tokio::signal::ctrl_c() => { - tracing::warn!("Received ctrl-c, shutting down gracefully..."); - break; - } - _ = interrupt.recv() => { - tracing::warn!("Received interrupt, shutting down gracefully..."); - break; - } - _ = terminate.recv() => { - tracing::warn!("Received terminate, shutting down gracefully..."); - break; - } + _ = cancel.cancelled() => { break; } } } drop(stats_sender); @@ -117,9 +101,65 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +pub fn start_stop_federation_workers_cancellable( + opts: Opts, + pool: ActualDbPool, + config: FederationConfig, +) -> impl Future> { + spawn_cancellable(WORKER_EXIT_TIMEOUT, move |c| { + start_stop_federation_workers(opts, pool, config, c) + }) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let opts = Opts::parse(); + let settings = SETTINGS.to_owned(); + // TODO: wait until migrations are applied? or are they safe from race conditions and i can just call run_migrations here as well? + let pool = build_db_pool(&settings).await.into_anyhow()?; + let user_agent = build_user_agent(&settings); + let reqwest_client = Client::builder() + .user_agent(user_agent.clone()) + .timeout(REQWEST_TIMEOUT) + .connect_timeout(REQWEST_TIMEOUT) + .build()?; + + let client = ClientBuilder::new(reqwest_client.clone()) + .with(TracingMiddleware::default()) + .build(); + + let federation_config = FederationConfig::builder() + .domain(settings.hostname.clone()) + .app_data(()) + .client(client.clone()) + .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) + .http_signature_compat(true) + .url_verifier(Box::new(VerifyUrlData(pool.clone()))) + .build() + .await?; + let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; + let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; + + let cancel = start_stop_federation_workers_cancellable(opts, pool, federation_config); + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::warn!("Received ctrl-c, shutting down gracefully..."); + } + _ = interrupt.recv() => { + tracing::warn!("Received interrupt, shutting down gracefully..."); + } + _ = terminate.recv() => { + tracing::warn!("Received terminate, shutting down gracefully..."); + } + } + cancel.await?; + Ok(()) +} + /// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped) async fn receive_print_stats( - mut pool: ActualDbPool, + pool: ActualDbPool, mut receiver: UnboundedReceiver, ) { let mut pool = &mut DbPool::Pool(&pool); @@ -142,6 +182,7 @@ async fn receive_print_stats( } } } + async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { let last_id = crate::util::get_latest_activity_id(pool).await; let Ok(last_id) = last_id else { diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index e53c338ab..0b007e70d 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -13,7 +13,7 @@ use lemmy_db_schema::{ person::Person, site::Site, }, - traits::{ApubActor, Crud}, + traits::ApubActor, utils::{get_conn, DbPool}, }; use moka::future::Cache; diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 1b6b043f5..34883c688 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -40,7 +40,7 @@ static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(10); pub async fn instance_worker( pool: ActualDbPool, instance: Instance, - data: Data<()>, + data: Data, stop: CancellationToken, stats_sender: UnboundedSender, ) -> Result<(), anyhow::Error> {