mirror of
https://github.com/LemmyNet/lemmy.git
synced 2024-09-09 16:08:49 -06:00
make federation workers function callable from outside
This commit is contained in:
parent
a2e3fc870b
commit
c19211f255
171
Cargo.lock
generated
171
Cargo.lock
generated
@ -402,6 +402,55 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstream"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"anstyle-parse",
|
||||
"anstyle-query",
|
||||
"anstyle-wincon",
|
||||
"colorchoice",
|
||||
"is-terminal",
|
||||
"utf8parse",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3a30da5c5f2d5e72842e00bcb57657162cdabef0931f40e2deb9b4140440cecd"
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-parse"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333"
|
||||
dependencies = [
|
||||
"utf8parse",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-query"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b"
|
||||
dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-wincon"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.71"
|
||||
@ -715,9 +764,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "2.3.1"
|
||||
version = "2.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6776fc96284a0bb647b615056fc496d1fe1644a7ab01829818a6d91cae888b84"
|
||||
checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42"
|
||||
|
||||
[[package]]
|
||||
name = "bitvec"
|
||||
@ -904,40 +953,44 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "4.0.32"
|
||||
version = "4.3.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7db700bc935f9e43e88d00b0850dae18a63773cfbec6d8e070fccf7fef89a39"
|
||||
checksum = "5fd304a20bff958a57f04c4e96a2e7594cc4490a0e809cbd48bb6437edaa452d"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"clap_builder",
|
||||
"clap_derive",
|
||||
"clap_lex",
|
||||
"is-terminal",
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_builder"
|
||||
version = "4.3.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "01c6a3f08f1fe5662a35cfe393aec09c4df95f60ee93b7556505260f75eee9e1"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
"clap_lex",
|
||||
"strsim",
|
||||
"termcolor",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_derive"
|
||||
version = "4.0.21"
|
||||
version = "4.3.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0177313f9f02afc995627906bbd8967e2be069f5261954222dac78290c2b9014"
|
||||
checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro-error",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.103",
|
||||
"syn 2.0.25",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "0.3.0"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8"
|
||||
dependencies = [
|
||||
"os_str_bytes",
|
||||
]
|
||||
checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b"
|
||||
|
||||
[[package]]
|
||||
name = "clokwerk"
|
||||
@ -991,6 +1044,12 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b"
|
||||
|
||||
[[package]]
|
||||
name = "colorchoice"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
|
||||
|
||||
[[package]]
|
||||
name = "combine"
|
||||
version = "4.6.6"
|
||||
@ -1438,7 +1497,7 @@ version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f7a532c1f99a0f596f6960a60d1e119e91582b24b39e2d83a190e61262c3ef0c"
|
||||
dependencies = [
|
||||
"bitflags 2.3.1",
|
||||
"bitflags 2.3.3",
|
||||
"byteorder",
|
||||
"chrono",
|
||||
"diesel_derives",
|
||||
@ -2125,15 +2184,6 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.3.2"
|
||||
@ -2484,14 +2534,13 @@ checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b"
|
||||
|
||||
[[package]]
|
||||
name = "is-terminal"
|
||||
version = "0.4.2"
|
||||
version = "0.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "28dfb6c8100ccc63462345b67d1bbc3679177c75ee4bf59bf29c8b1d110b8189"
|
||||
checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
|
||||
dependencies = [
|
||||
"hermit-abi 0.2.6",
|
||||
"io-lifetimes",
|
||||
"rustix 0.36.5",
|
||||
"windows-sys 0.42.0",
|
||||
"hermit-abi 0.3.2",
|
||||
"rustix 0.38.3",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -2796,6 +2845,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"clap",
|
||||
"dashmap",
|
||||
"diesel",
|
||||
"diesel-async",
|
||||
@ -3020,6 +3070,12 @@ version = "0.3.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
|
||||
|
||||
[[package]]
|
||||
name = "linux-raw-sys"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0"
|
||||
|
||||
[[package]]
|
||||
name = "local-channel"
|
||||
version = "0.1.3"
|
||||
@ -3608,12 +3664,6 @@ dependencies = [
|
||||
"hashbrown 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "os_str_bytes"
|
||||
version = "6.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
@ -4029,30 +4079,6 @@ version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
|
||||
dependencies = [
|
||||
"proc-macro-error-attr",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.103",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error-attr"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.64"
|
||||
@ -4595,6 +4621,19 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustix"
|
||||
version = "0.38.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac5ffa1efe7548069688cd7028f32591853cd7b5b756d41bcffd2353e4fc75b4"
|
||||
dependencies = [
|
||||
"bitflags 2.3.3",
|
||||
"errno 0.3.1",
|
||||
"libc",
|
||||
"linux-raw-sys 0.4.3",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.20.7"
|
||||
@ -5925,6 +5964,12 @@ version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5190c9442dcdaf0ddd50f37420417d219ae5261bbf5db120d0f9bab996c9cba1"
|
||||
|
||||
[[package]]
|
||||
name = "utf8parse"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.4.0"
|
||||
|
@ -16,6 +16,7 @@ anyhow.workspace = true
|
||||
async-trait = "0.1.71"
|
||||
bytes = "1.4.0"
|
||||
chrono.workspace = true
|
||||
clap = { version = "4.3.19", features = ["derive"] }
|
||||
dashmap = "5.5.0"
|
||||
diesel = { workspace = true, features = ["postgres", "chrono", "serde_json"] }
|
||||
diesel-async = { workspace = true, features = ["deadpool", "postgres"] }
|
||||
|
@ -4,7 +4,9 @@ use crate::{
|
||||
};
|
||||
use activitypub_federation::config::FederationConfig;
|
||||
use chrono::{Local, Timelike};
|
||||
use clap::Parser;
|
||||
use federation_queue_state::FederationQueueState;
|
||||
use futures::Future;
|
||||
use lemmy_api_common::request::build_user_agent;
|
||||
use lemmy_apub::{VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT};
|
||||
use lemmy_db_schema::{
|
||||
@ -15,12 +17,16 @@ use lemmy_utils::{error::LemmyErrorExt2, settings::SETTINGS, REQWEST_TIMEOUT};
|
||||
use reqwest::Client;
|
||||
use reqwest_middleware::ClientBuilder;
|
||||
use reqwest_tracing::TracingMiddleware;
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{
|
||||
signal::unix::SignalKind,
|
||||
sync::mpsc::{unbounded_channel, UnboundedReceiver},
|
||||
time::sleep,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
mod federation_queue_state;
|
||||
mod util;
|
||||
@ -28,46 +34,35 @@ mod worker;
|
||||
|
||||
static WORKER_EXIT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt::init();
|
||||
let settings = SETTINGS.to_owned();
|
||||
// TODO: wait until migrations are applied? or are they safe from race conditions and i can just call run_migrations here as well?
|
||||
let pool = build_db_pool(&settings).await.into_anyhow()?;
|
||||
let user_agent = build_user_agent(&settings);
|
||||
let reqwest_client = Client::builder()
|
||||
.user_agent(user_agent.clone())
|
||||
.timeout(REQWEST_TIMEOUT)
|
||||
.connect_timeout(REQWEST_TIMEOUT)
|
||||
.build()?;
|
||||
#[derive(Parser, Debug)]
|
||||
pub struct Opts {
|
||||
/// how many processes you are starting in total
|
||||
#[arg(default_value_t = 1)]
|
||||
pub process_count: i32,
|
||||
/// the index of this process (1-based: 1 - process_count)
|
||||
#[arg(default_value_t = 1)]
|
||||
pub process_index: i32,
|
||||
}
|
||||
|
||||
let client = ClientBuilder::new(reqwest_client.clone())
|
||||
.with(TracingMiddleware::default())
|
||||
.build();
|
||||
|
||||
let federation_config = FederationConfig::builder()
|
||||
.domain(settings.hostname.clone())
|
||||
.app_data(())
|
||||
.client(client.clone())
|
||||
.http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
|
||||
.http_signature_compat(true)
|
||||
.url_verifier(Box::new(VerifyUrlData(pool.clone())))
|
||||
.build()
|
||||
.await?;
|
||||
let process_num = 1 - 1; // todo: pass these in via command line args
|
||||
let process_count = 1;
|
||||
/// starts and stops federation workers depending on which instances are on db
|
||||
async fn start_stop_federation_workers<T: Clone + Send + Sync + 'static>(
|
||||
opts: Opts,
|
||||
pool: ActualDbPool,
|
||||
federation_config: FederationConfig<T>,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut workers = HashMap::new();
|
||||
|
||||
let (stats_sender, stats_receiver) = unbounded_channel();
|
||||
let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver));
|
||||
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||
let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?;
|
||||
let pool2 = &mut DbPool::Pool(&pool);
|
||||
loop {
|
||||
for (instance, should_federate) in Instance::read_all_with_blocked(pool2).await?.into_iter() {
|
||||
if instance.id.inner() % process_count != process_num {
|
||||
let dead: HashSet<String> = HashSet::from_iter(Instance::dead_instances(pool2).await?);
|
||||
for (instance, allowed) in Instance::read_all_with_blocked(pool2).await?.into_iter() {
|
||||
if instance.id.inner() % opts.process_count != opts.process_index {
|
||||
continue;
|
||||
}
|
||||
let should_federate = allowed && !dead.contains(&instance.domain);
|
||||
if !workers.contains_key(&instance.id) && should_federate {
|
||||
let stats_sender = stats_sender.clone();
|
||||
workers.insert(
|
||||
@ -92,18 +87,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
tokio::select! {
|
||||
() = sleep(Duration::from_secs(60)) => {},
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
tracing::warn!("Received ctrl-c, shutting down gracefully...");
|
||||
break;
|
||||
}
|
||||
_ = interrupt.recv() => {
|
||||
tracing::warn!("Received interrupt, shutting down gracefully...");
|
||||
break;
|
||||
}
|
||||
_ = terminate.recv() => {
|
||||
tracing::warn!("Received terminate, shutting down gracefully...");
|
||||
break;
|
||||
}
|
||||
_ = cancel.cancelled() => { break; }
|
||||
}
|
||||
}
|
||||
drop(stats_sender);
|
||||
@ -117,9 +101,65 @@ async fn main() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn start_stop_federation_workers_cancellable(
|
||||
opts: Opts,
|
||||
pool: ActualDbPool,
|
||||
config: FederationConfig<impl Clone + Send + Sync + 'static>,
|
||||
) -> impl Future<Output = anyhow::Result<()>> {
|
||||
spawn_cancellable(WORKER_EXIT_TIMEOUT, move |c| {
|
||||
start_stop_federation_workers(opts, pool, config, c)
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt::init();
|
||||
let opts = Opts::parse();
|
||||
let settings = SETTINGS.to_owned();
|
||||
// TODO: wait until migrations are applied? or are they safe from race conditions and i can just call run_migrations here as well?
|
||||
let pool = build_db_pool(&settings).await.into_anyhow()?;
|
||||
let user_agent = build_user_agent(&settings);
|
||||
let reqwest_client = Client::builder()
|
||||
.user_agent(user_agent.clone())
|
||||
.timeout(REQWEST_TIMEOUT)
|
||||
.connect_timeout(REQWEST_TIMEOUT)
|
||||
.build()?;
|
||||
|
||||
let client = ClientBuilder::new(reqwest_client.clone())
|
||||
.with(TracingMiddleware::default())
|
||||
.build();
|
||||
|
||||
let federation_config = FederationConfig::builder()
|
||||
.domain(settings.hostname.clone())
|
||||
.app_data(())
|
||||
.client(client.clone())
|
||||
.http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
|
||||
.http_signature_compat(true)
|
||||
.url_verifier(Box::new(VerifyUrlData(pool.clone())))
|
||||
.build()
|
||||
.await?;
|
||||
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||
let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?;
|
||||
|
||||
let cancel = start_stop_federation_workers_cancellable(opts, pool, federation_config);
|
||||
tokio::select! {
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
tracing::warn!("Received ctrl-c, shutting down gracefully...");
|
||||
}
|
||||
_ = interrupt.recv() => {
|
||||
tracing::warn!("Received interrupt, shutting down gracefully...");
|
||||
}
|
||||
_ = terminate.recv() => {
|
||||
tracing::warn!("Received terminate, shutting down gracefully...");
|
||||
}
|
||||
}
|
||||
cancel.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped)
|
||||
async fn receive_print_stats(
|
||||
mut pool: ActualDbPool,
|
||||
pool: ActualDbPool,
|
||||
mut receiver: UnboundedReceiver<FederationQueueState>,
|
||||
) {
|
||||
let mut pool = &mut DbPool::Pool(&pool);
|
||||
@ -142,6 +182,7 @@ async fn receive_print_stats(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap<String, FederationQueueState>) {
|
||||
let last_id = crate::util::get_latest_activity_id(pool).await;
|
||||
let Ok(last_id) = last_id else {
|
||||
|
@ -13,7 +13,7 @@ use lemmy_db_schema::{
|
||||
person::Person,
|
||||
site::Site,
|
||||
},
|
||||
traits::{ApubActor, Crud},
|
||||
traits::ApubActor,
|
||||
utils::{get_conn, DbPool},
|
||||
};
|
||||
use moka::future::Cache;
|
||||
|
@ -40,7 +40,7 @@ static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(10);
|
||||
pub async fn instance_worker(
|
||||
pool: ActualDbPool,
|
||||
instance: Instance,
|
||||
data: Data<()>,
|
||||
data: Data<impl Clone>,
|
||||
stop: CancellationToken,
|
||||
stats_sender: UnboundedSender<FederationQueueState>,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
|
Loading…
Reference in New Issue
Block a user