remove separate lemmy_federate entry point

This commit is contained in:
phiresky 2023-08-03 14:59:12 +00:00
parent 5c686f75c4
commit 8a6b3c248a
6 changed files with 111 additions and 112 deletions

3
Cargo.lock generated
View File

@ -2822,7 +2822,6 @@ dependencies = [
"async-trait",
"bytes",
"chrono",
"clap",
"diesel",
"diesel-async",
"enum_delegate",
@ -2880,6 +2879,7 @@ dependencies = [
"actix-web",
"actix-web-prom",
"chrono",
"clap",
"clokwerk",
"console-subscriber",
"diesel",
@ -2891,6 +2891,7 @@ dependencies = [
"lemmy_api_crud",
"lemmy_apub",
"lemmy_db_schema",
"lemmy_federate",
"lemmy_routes",
"lemmy_utils",
"opentelemetry 0.19.0",

View File

@ -166,3 +166,5 @@ tokio-postgres-rustls = { workspace = true }
chrono = { workspace = true }
prometheus = { version = "0.13.3", features = ["process"], optional = true }
actix-web-prom = { version = "0.6.0", optional = true }
clap = { version = "4.3.19", features = ["derive"] }
lemmy_federate = { version = "0.18.1", path = "crates/federate" }

View File

@ -32,7 +32,6 @@ tracing.workspace = true
async-trait = "0.1.71"
bytes = "1.4.0"
clap = { version = "4.3.19", features = ["derive"] }
enum_delegate = "0.2.0"
moka = { version = "0.11.2", features = ["future"] }
openssl = "0.10.55"

View File

@ -4,7 +4,6 @@ use crate::{
};
use activitypub_federation::config::FederationConfig;
use chrono::{Local, Timelike};
use clap::Parser;
use federation_queue_state::FederationQueueState;
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{
@ -27,13 +26,10 @@ mod worker;
static WORKER_EXIT_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Parser, Debug)]
pub struct Opts {
/// how many processes you are starting in total
#[arg(default_value_t = 1)]
pub process_count: i32,
/// the index of this process (1-based: 1 - process_count)
#[arg(default_value_t = 1)]
pub process_index: i32,
}

View File

@ -1,58 +0,0 @@
use activitypub_federation::config::FederationConfig;
use clap::Parser;
use lemmy_api_common::request::build_user_agent;
use lemmy_apub::{VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT};
use lemmy_db_schema::utils::build_db_pool;
use lemmy_federate::Opts;
use lemmy_utils::{error::LemmyErrorExt2, settings::SETTINGS, REQWEST_TIMEOUT};
use reqwest::Client;
use reqwest_middleware::ClientBuilder;
use reqwest_tracing::TracingMiddleware;
use tokio::signal::unix::SignalKind;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let opts = Opts::parse();
let settings = SETTINGS.to_owned();
// TODO: wait until migrations are applied? or are they safe from race conditions and i can just call run_migrations here as well?
let pool = build_db_pool(&settings).await.into_anyhow()?;
let user_agent = build_user_agent(&settings);
let reqwest_client = Client::builder()
.user_agent(user_agent.clone())
.timeout(REQWEST_TIMEOUT)
.connect_timeout(REQWEST_TIMEOUT)
.build()?;
let client = ClientBuilder::new(reqwest_client.clone())
.with(TracingMiddleware::default())
.build();
let federation_config = FederationConfig::builder()
.domain(settings.hostname.clone())
.app_data(())
.client(client.clone())
.http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
.http_signature_compat(true)
.url_verifier(Box::new(VerifyUrlData(pool.clone())))
.build()
.await?;
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?;
let task =
lemmy_federate::start_stop_federation_workers_cancellable(opts, pool, federation_config);
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::warn!("Received ctrl-c, shutting down gracefully...");
}
_ = interrupt.recv() => {
tracing::warn!("Received interrupt, shutting down gracefully...");
}
_ = terminate.recv() => {
tracing::warn!("Received terminate, shutting down gracefully...");
}
}
task.cancel().await?;
Ok(())
}

View File

@ -17,6 +17,7 @@ use actix_web::{
HttpServer,
Result,
};
use clap::{ArgAction, Parser};
use lemmy_api_common::{
context::LemmyContext,
lemmy_db_views::structs::SiteView,
@ -36,6 +37,7 @@ use lemmy_db_schema::{
source::secret::Secret,
utils::{build_db_pool, get_database_url, run_migrations},
};
use lemmy_federate::{start_stop_federation_workers_cancellable, Opts};
use lemmy_routes::{feeds, images, nodeinfo, webfinger};
use lemmy_utils::{
error::LemmyError,
@ -48,6 +50,7 @@ use reqwest::Client;
use reqwest_middleware::ClientBuilder;
use reqwest_tracing::TracingMiddleware;
use std::{env, thread, time::Duration};
use tokio::signal::unix::SignalKind;
use tracing::subscriber::set_global_default;
use tracing_actix_web::TracingLogger;
use tracing_error::ErrorLayer;
@ -61,15 +64,31 @@ use {
prometheus_metrics::serve_prometheus,
};
#[derive(Parser, Debug)]
struct CmdArgs {
#[arg(long, default_value_t = false)]
/// if you start multiple lemmy server instances set this to true on all but one of them
disable_scheduled_tasks: bool,
/// set to false to disable the http server
#[arg(long, default_value_t = true, action=ArgAction::Set)]
http_server: bool,
/// set to false to disable the outgoing federation in this process
#[arg(long, default_value_t = true, action=ArgAction::Set)]
send_activities: bool,
/// the index of this outgoing federation process (1-based). only useful if you want to split federation work into multiple servers
#[arg(long, default_value_t = 1)]
activity_worker_index: i32,
/// how many outgoing federation processes you are starting in total
#[arg(long, default_value_t = 1)]
activity_worker_count: i32,
}
/// Max timeout for http requests
pub(crate) const REQWEST_TIMEOUT: Duration = Duration::from_secs(10);
/// Placing the main function in lib.rs allows other crates to import it and embed Lemmy
pub async fn start_lemmy_server() -> Result<(), LemmyError> {
let args: Vec<String> = env::args().collect();
let scheduled_tasks_enabled = args.get(1) != Some(&"--disable-scheduled-tasks".to_string());
let args = CmdArgs::parse();
let scheduled_tasks_enabled = !args.disable_scheduled_tasks;
let settings = SETTINGS.to_owned();
// Run the DB migrations
@ -178,53 +197,93 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
let request_data = federation_config.to_request_data();
let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data));
// Create Http server with websocket support
HttpServer::new(move || {
let cors_origin = env::var("LEMMY_CORS_ORIGIN");
let cors_config = match (cors_origin, cfg!(debug_assertions)) {
(Ok(origin), false) => Cors::default()
.allowed_origin(&origin)
.allowed_origin(&settings.get_protocol_and_hostname()),
_ => Cors::default()
.allow_any_origin()
.allow_any_method()
.allow_any_header()
.expose_any_header()
.max_age(3600),
};
let server = if args.http_server {
let federation_config = federation_config.clone();
// Create Http server with websocket support
let server = HttpServer::new(move || {
let cors_origin = env::var("LEMMY_CORS_ORIGIN");
let cors_config = match (cors_origin, cfg!(debug_assertions)) {
(Ok(origin), false) => Cors::default()
.allowed_origin(&origin)
.allowed_origin(&settings.get_protocol_and_hostname()),
_ => Cors::default()
.allow_any_origin()
.allow_any_method()
.allow_any_header()
.expose_any_header()
.max_age(3600),
};
let app = App::new()
.wrap(middleware::Logger::new(
// This is the default log format save for the usage of %{r}a over %a to guarantee to record the client's (forwarded) IP and not the last peer address, since the latter is frequently just a reverse proxy
"%{r}a '%r' %s %b '%{Referer}i' '%{User-Agent}i' %T",
))
.wrap(middleware::Compress::default())
.wrap(cors_config)
.wrap(TracingLogger::<QuieterRootSpanBuilder>::new())
.wrap(ErrorHandlers::new().default_handler(jsonify_plain_text_errors))
.app_data(Data::new(context.clone()))
.app_data(Data::new(rate_limit_cell.clone()))
.wrap(FederationMiddleware::new(federation_config.clone()));
let app = App::new()
.wrap(middleware::Logger::new(
// This is the default log format save for the usage of %{r}a over %a to guarantee to record the client's (forwarded) IP and not the last peer address, since the latter is frequently just a reverse proxy
"%{r}a '%r' %s %b '%{Referer}i' '%{User-Agent}i' %T",
))
.wrap(middleware::Compress::default())
.wrap(cors_config)
.wrap(TracingLogger::<QuieterRootSpanBuilder>::new())
.wrap(ErrorHandlers::new().default_handler(jsonify_plain_text_errors))
.app_data(Data::new(context.clone()))
.app_data(Data::new(rate_limit_cell.clone()))
.wrap(FederationMiddleware::new(federation_config.clone()));
#[cfg(feature = "prometheus-metrics")]
let app = app.wrap(prom_api_metrics.clone());
#[cfg(feature = "prometheus-metrics")]
let app = app.wrap(prom_api_metrics.clone());
// The routes
app
.configure(|cfg| api_routes_http::config(cfg, rate_limit_cell))
.configure(|cfg| {
if federation_enabled {
lemmy_apub::http::routes::config(cfg);
webfinger::config(cfg);
}
})
.configure(feeds::config)
.configure(|cfg| images::config(cfg, pictrs_client.clone(), rate_limit_cell))
.configure(nodeinfo::config)
})
.bind((settings_bind.bind, settings_bind.port))?
.run()
.await?;
// The routes
app
.configure(|cfg| api_routes_http::config(cfg, rate_limit_cell))
.configure(|cfg| {
if federation_enabled {
lemmy_apub::http::routes::config(cfg);
webfinger::config(cfg);
}
})
.configure(feeds::config)
.configure(|cfg| images::config(cfg, pictrs_client.clone(), rate_limit_cell))
.configure(nodeinfo::config)
})
.disable_signals()
.bind((settings_bind.bind, settings_bind.port))?
.run();
let handle = server.handle();
tokio::task::spawn(server);
Some(handle)
} else {
None
};
let federate = if args.send_activities {
Some(start_stop_federation_workers_cancellable(
Opts {
process_index: args.activity_worker_index,
process_count: args.activity_worker_count,
},
pool.clone(),
federation_config.clone(),
))
} else {
None
};
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?;
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::warn!("Received ctrl-c, shutting down gracefully...");
}
_ = interrupt.recv() => {
tracing::warn!("Received interrupt, shutting down gracefully...");
}
_ = terminate.recv() => {
tracing::warn!("Received terminate, shutting down gracefully...");
}
}
if let Some(server) = server {
server.stop(true).await;
}
if let Some(federate) = federate {
federate.cancel().await?;
}
// Wait for outgoing apub sends to complete
ActivityChannel::close(outgoing_activities_task).await?;