diff --git a/Cargo.lock b/Cargo.lock index 7704977a9..08fbd70b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2822,7 +2822,6 @@ dependencies = [ "async-trait", "bytes", "chrono", - "clap", "diesel", "diesel-async", "enum_delegate", @@ -2880,6 +2879,7 @@ dependencies = [ "actix-web", "actix-web-prom", "chrono", + "clap", "clokwerk", "console-subscriber", "diesel", @@ -2891,6 +2891,7 @@ dependencies = [ "lemmy_api_crud", "lemmy_apub", "lemmy_db_schema", + "lemmy_federate", "lemmy_routes", "lemmy_utils", "opentelemetry 0.19.0", diff --git a/Cargo.toml b/Cargo.toml index e2899a13a..751e6b30a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,3 +166,5 @@ tokio-postgres-rustls = { workspace = true } chrono = { workspace = true } prometheus = { version = "0.13.3", features = ["process"], optional = true } actix-web-prom = { version = "0.6.0", optional = true } +clap = { version = "4.3.19", features = ["derive"] } +lemmy_federate = { version = "0.18.1", path = "crates/federate" } diff --git a/crates/federate/Cargo.toml b/crates/federate/Cargo.toml index c9121490b..0c394d9f4 100644 --- a/crates/federate/Cargo.toml +++ b/crates/federate/Cargo.toml @@ -32,7 +32,6 @@ tracing.workspace = true async-trait = "0.1.71" bytes = "1.4.0" -clap = { version = "4.3.19", features = ["derive"] } enum_delegate = "0.2.0" moka = { version = "0.11.2", features = ["future"] } openssl = "0.10.55" diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 9312c5637..97620100a 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -4,7 +4,6 @@ use crate::{ }; use activitypub_federation::config::FederationConfig; use chrono::{Local, Timelike}; -use clap::Parser; use federation_queue_state::FederationQueueState; use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ @@ -27,13 +26,10 @@ mod worker; static WORKER_EXIT_TIMEOUT: Duration = Duration::from_secs(30); -#[derive(Parser, Debug)] pub struct Opts { /// how many processes you are starting in total - #[arg(default_value_t = 1)] pub process_count: i32, /// the index of this process (1-based: 1 - process_count) - #[arg(default_value_t = 1)] pub process_index: i32, } diff --git a/crates/federate/src/main.rs b/crates/federate/src/main.rs deleted file mode 100644 index 15b5dbe6f..000000000 --- a/crates/federate/src/main.rs +++ /dev/null @@ -1,58 +0,0 @@ -use activitypub_federation::config::FederationConfig; -use clap::Parser; -use lemmy_api_common::request::build_user_agent; -use lemmy_apub::{VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT}; -use lemmy_db_schema::utils::build_db_pool; -use lemmy_federate::Opts; -use lemmy_utils::{error::LemmyErrorExt2, settings::SETTINGS, REQWEST_TIMEOUT}; -use reqwest::Client; -use reqwest_middleware::ClientBuilder; -use reqwest_tracing::TracingMiddleware; -use tokio::signal::unix::SignalKind; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); - let opts = Opts::parse(); - let settings = SETTINGS.to_owned(); - // TODO: wait until migrations are applied? or are they safe from race conditions and i can just call run_migrations here as well? - let pool = build_db_pool(&settings).await.into_anyhow()?; - let user_agent = build_user_agent(&settings); - let reqwest_client = Client::builder() - .user_agent(user_agent.clone()) - .timeout(REQWEST_TIMEOUT) - .connect_timeout(REQWEST_TIMEOUT) - .build()?; - - let client = ClientBuilder::new(reqwest_client.clone()) - .with(TracingMiddleware::default()) - .build(); - - let federation_config = FederationConfig::builder() - .domain(settings.hostname.clone()) - .app_data(()) - .client(client.clone()) - .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) - .http_signature_compat(true) - .url_verifier(Box::new(VerifyUrlData(pool.clone()))) - .build() - .await?; - let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; - let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; - - let task = - lemmy_federate::start_stop_federation_workers_cancellable(opts, pool, federation_config); - tokio::select! { - _ = tokio::signal::ctrl_c() => { - tracing::warn!("Received ctrl-c, shutting down gracefully..."); - } - _ = interrupt.recv() => { - tracing::warn!("Received interrupt, shutting down gracefully..."); - } - _ = terminate.recv() => { - tracing::warn!("Received terminate, shutting down gracefully..."); - } - } - task.cancel().await?; - Ok(()) -} diff --git a/src/lib.rs b/src/lib.rs index 4950aff82..4072df0f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,7 @@ use actix_web::{ HttpServer, Result, }; +use clap::{ArgAction, Parser}; use lemmy_api_common::{ context::LemmyContext, lemmy_db_views::structs::SiteView, @@ -36,6 +37,7 @@ use lemmy_db_schema::{ source::secret::Secret, utils::{build_db_pool, get_database_url, run_migrations}, }; +use lemmy_federate::{start_stop_federation_workers_cancellable, Opts}; use lemmy_routes::{feeds, images, nodeinfo, webfinger}; use lemmy_utils::{ error::LemmyError, @@ -48,6 +50,7 @@ use reqwest::Client; use reqwest_middleware::ClientBuilder; use reqwest_tracing::TracingMiddleware; use std::{env, thread, time::Duration}; +use tokio::signal::unix::SignalKind; use tracing::subscriber::set_global_default; use tracing_actix_web::TracingLogger; use tracing_error::ErrorLayer; @@ -61,15 +64,31 @@ use { prometheus_metrics::serve_prometheus, }; +#[derive(Parser, Debug)] +struct CmdArgs { + #[arg(long, default_value_t = false)] + /// if you start multiple lemmy server instances set this to true on all but one of them + disable_scheduled_tasks: bool, + /// set to false to disable the http server + #[arg(long, default_value_t = true, action=ArgAction::Set)] + http_server: bool, + /// set to false to disable the outgoing federation in this process + #[arg(long, default_value_t = true, action=ArgAction::Set)] + send_activities: bool, + /// the index of this outgoing federation process (1-based). only useful if you want to split federation work into multiple servers + #[arg(long, default_value_t = 1)] + activity_worker_index: i32, + /// how many outgoing federation processes you are starting in total + #[arg(long, default_value_t = 1)] + activity_worker_count: i32, +} /// Max timeout for http requests pub(crate) const REQWEST_TIMEOUT: Duration = Duration::from_secs(10); /// Placing the main function in lib.rs allows other crates to import it and embed Lemmy pub async fn start_lemmy_server() -> Result<(), LemmyError> { - let args: Vec = env::args().collect(); - - let scheduled_tasks_enabled = args.get(1) != Some(&"--disable-scheduled-tasks".to_string()); - + let args = CmdArgs::parse(); + let scheduled_tasks_enabled = !args.disable_scheduled_tasks; let settings = SETTINGS.to_owned(); // Run the DB migrations @@ -178,53 +197,93 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { let request_data = federation_config.to_request_data(); let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data)); - // Create Http server with websocket support - HttpServer::new(move || { - let cors_origin = env::var("LEMMY_CORS_ORIGIN"); - let cors_config = match (cors_origin, cfg!(debug_assertions)) { - (Ok(origin), false) => Cors::default() - .allowed_origin(&origin) - .allowed_origin(&settings.get_protocol_and_hostname()), - _ => Cors::default() - .allow_any_origin() - .allow_any_method() - .allow_any_header() - .expose_any_header() - .max_age(3600), - }; + let server = if args.http_server { + let federation_config = federation_config.clone(); + // Create Http server with websocket support + let server = HttpServer::new(move || { + let cors_origin = env::var("LEMMY_CORS_ORIGIN"); + let cors_config = match (cors_origin, cfg!(debug_assertions)) { + (Ok(origin), false) => Cors::default() + .allowed_origin(&origin) + .allowed_origin(&settings.get_protocol_and_hostname()), + _ => Cors::default() + .allow_any_origin() + .allow_any_method() + .allow_any_header() + .expose_any_header() + .max_age(3600), + }; - let app = App::new() - .wrap(middleware::Logger::new( - // This is the default log format save for the usage of %{r}a over %a to guarantee to record the client's (forwarded) IP and not the last peer address, since the latter is frequently just a reverse proxy - "%{r}a '%r' %s %b '%{Referer}i' '%{User-Agent}i' %T", - )) - .wrap(middleware::Compress::default()) - .wrap(cors_config) - .wrap(TracingLogger::::new()) - .wrap(ErrorHandlers::new().default_handler(jsonify_plain_text_errors)) - .app_data(Data::new(context.clone())) - .app_data(Data::new(rate_limit_cell.clone())) - .wrap(FederationMiddleware::new(federation_config.clone())); + let app = App::new() + .wrap(middleware::Logger::new( + // This is the default log format save for the usage of %{r}a over %a to guarantee to record the client's (forwarded) IP and not the last peer address, since the latter is frequently just a reverse proxy + "%{r}a '%r' %s %b '%{Referer}i' '%{User-Agent}i' %T", + )) + .wrap(middleware::Compress::default()) + .wrap(cors_config) + .wrap(TracingLogger::::new()) + .wrap(ErrorHandlers::new().default_handler(jsonify_plain_text_errors)) + .app_data(Data::new(context.clone())) + .app_data(Data::new(rate_limit_cell.clone())) + .wrap(FederationMiddleware::new(federation_config.clone())); - #[cfg(feature = "prometheus-metrics")] - let app = app.wrap(prom_api_metrics.clone()); + #[cfg(feature = "prometheus-metrics")] + let app = app.wrap(prom_api_metrics.clone()); - // The routes - app - .configure(|cfg| api_routes_http::config(cfg, rate_limit_cell)) - .configure(|cfg| { - if federation_enabled { - lemmy_apub::http::routes::config(cfg); - webfinger::config(cfg); - } - }) - .configure(feeds::config) - .configure(|cfg| images::config(cfg, pictrs_client.clone(), rate_limit_cell)) - .configure(nodeinfo::config) - }) - .bind((settings_bind.bind, settings_bind.port))? - .run() - .await?; + // The routes + app + .configure(|cfg| api_routes_http::config(cfg, rate_limit_cell)) + .configure(|cfg| { + if federation_enabled { + lemmy_apub::http::routes::config(cfg); + webfinger::config(cfg); + } + }) + .configure(feeds::config) + .configure(|cfg| images::config(cfg, pictrs_client.clone(), rate_limit_cell)) + .configure(nodeinfo::config) + }) + .disable_signals() + .bind((settings_bind.bind, settings_bind.port))? + .run(); + let handle = server.handle(); + tokio::task::spawn(server); + Some(handle) + } else { + None + }; + let federate = if args.send_activities { + Some(start_stop_federation_workers_cancellable( + Opts { + process_index: args.activity_worker_index, + process_count: args.activity_worker_count, + }, + pool.clone(), + federation_config.clone(), + )) + } else { + None + }; + let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; + let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; + + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::warn!("Received ctrl-c, shutting down gracefully..."); + } + _ = interrupt.recv() => { + tracing::warn!("Received interrupt, shutting down gracefully..."); + } + _ = terminate.recv() => { + tracing::warn!("Received terminate, shutting down gracefully..."); + } + } + if let Some(server) = server { + server.stop(true).await; + } + if let Some(federate) = federate { + federate.cancel().await?; + } // Wait for outgoing apub sends to complete ActivityChannel::close(outgoing_activities_task).await?;