diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh index 3042fd344..ff74744a1 100755 --- a/api_tests/run-federation-test.sh +++ b/api_tests/run-federation-test.sh @@ -2,6 +2,7 @@ set -e export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 +export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still. pushd .. cargo build rm target/lemmy_server || true diff --git a/crates/api_common/src/send_activity.rs b/crates/api_common/src/send_activity.rs index 84b2efb2d..897f102fe 100644 --- a/crates/api_common/src/send_activity.rs +++ b/crates/api_common/src/send_activity.rs @@ -17,14 +17,22 @@ use lemmy_db_schema::{ }, }; use lemmy_db_views::structs::PrivateMessageView; -use lemmy_utils::error::LemmyResult; -use once_cell::sync::OnceCell; +use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION}; +use once_cell::sync::{Lazy, OnceCell}; +use tokio::{ + sync::{ + mpsc, + mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender}, + Mutex, + }, + task::JoinHandle, +}; use url::Url; type MatchOutgoingActivitiesBoxed = Box fn(SendActivityData, &'a Data) -> BoxFuture<'a, LemmyResult<()>>>; -/// This static is necessary so that the api_common crates don't need to depend on lemmy_apub +/// This static is necessary so that activities can be sent out synchronously for tests and the api_common crates don't need to depend on lemmy_apub pub static MATCH_OUTGOING_ACTIVITIES: OnceCell = OnceCell::new(); #[derive(Debug)] @@ -54,16 +62,51 @@ pub enum SendActivityData { CreateReport(Url, Person, Community, String), } -pub struct ActivityChannel; +// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with +// ctrl+c still works. +static ACTIVITY_CHANNEL: Lazy = Lazy::new(|| { + let (sender, receiver) = mpsc::unbounded_channel(); + let weak_sender = sender.downgrade(); + ActivityChannel { + weak_sender, + receiver: Mutex::new(receiver), + keepalive_sender: Mutex::new(Some(sender)), + } +}); + +pub struct ActivityChannel { + weak_sender: WeakUnboundedSender, + receiver: Mutex>, + keepalive_sender: Mutex>>, +} impl ActivityChannel { + pub async fn retrieve_activity() -> Option { + let mut lock = ACTIVITY_CHANNEL.receiver.lock().await; + lock.recv().await + } + pub async fn submit_activity( data: SendActivityData, context: &Data, ) -> LemmyResult<()> { - MATCH_OUTGOING_ACTIVITIES - .get() - .expect("retrieve function pointer")(data, context) - .await + if *SYNCHRONOUS_FEDERATION { + MATCH_OUTGOING_ACTIVITIES + .get() + .expect("retrieve function pointer")(data, context) + .await?; + } + // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender, + // not sure which way is more efficient + else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() { + sender.send(data)?; + } + Ok(()) + } + + pub async fn close(outgoing_activities_task: JoinHandle>) -> LemmyResult<()> { + ACTIVITY_CHANNEL.keepalive_sender.lock().await.take(); + outgoing_activities_task.await??; + Ok(()) } } diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index 8cc6ffe62..d0b0f368c 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -37,6 +37,7 @@ use lemmy_utils::{ slurs::{check_slurs, check_slurs_opt}, validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title}, }, + SYNCHRONOUS_FEDERATION, }; use tracing::Instrument; use url::Url; @@ -189,7 +190,11 @@ pub async fn create_post( Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention), } }; - spawn_try_task(task); + if *SYNCHRONOUS_FEDERATION { + task.await?; + } else { + spawn_try_task(task); + } }; build_post_response(&context, community_id, person_id, post_id).await diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index 29ec5bd30..53adc78df 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -45,6 +45,7 @@ use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; use lemmy_utils::{ error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult}, spawn_try_task, + SYNCHRONOUS_FEDERATION, }; use serde::Serialize; use std::{ops::Deref, time::Duration}; @@ -220,6 +221,13 @@ where Ok(()) } +pub async fn handle_outgoing_activities(context: Data) -> LemmyResult<()> { + while let Some(data) = ActivityChannel::retrieve_activity().await { + match_outgoing_activities(data, &context.reset_request_count()).await? + } + Ok(()) +} + pub async fn match_outgoing_activities( data: SendActivityData, context: &Data, @@ -324,6 +332,10 @@ pub async fn match_outgoing_activities( } } }; - spawn_try_task(fed_task); + if *SYNCHRONOUS_FEDERATION { + fed_task.await?; + } else { + spawn_try_task(fed_task); + } Ok(()) } diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index c0553de31..1ef8a842c 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -18,6 +18,7 @@ pub mod version; use error::LemmyError; use futures::Future; +use once_cell::sync::Lazy; use std::time::Duration; use tracing::Instrument; @@ -37,6 +38,16 @@ macro_rules! location_info { }; } +/// if true, all federation should happen synchronously. useful for debugging and testing. +/// defaults to true on debug mode, false on releasemode +/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1 +/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION="" +pub static SYNCHRONOUS_FEDERATION: Lazy = Lazy::new(|| { + std::env::var("LEMMY_SYNCHRONOUS_FEDERATION") + .map(|s| !s.is_empty()) + .unwrap_or(cfg!(debug_assertions)) +}); + /// tokio::spawn, but accepts a future that may fail and also /// * logs errors /// * attaches the spawned task to the tracing span of the caller for better logging diff --git a/src/lib.rs b/src/lib.rs index e1c6d1fae..9ce1bfa00 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,14 +28,14 @@ use lemmy_api_common::{ context::LemmyContext, lemmy_db_views::structs::SiteView, request::build_user_agent, - send_activity::MATCH_OUTGOING_ACTIVITIES, + send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES}, utils::{ check_private_instance_and_federation_enabled, local_site_rate_limit_to_rate_limit_config, }, }; use lemmy_apub::{ - activities::match_outgoing_activities, + activities::{handle_outgoing_activities, match_outgoing_activities}, VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT, }; @@ -203,6 +203,8 @@ pub async fn start_lemmy_server(args: CmdArgs) -> Result<(), LemmyError> { Box::pin(match_outgoing_activities(d, c)) })) .expect("set function pointer"); + let request_data = federation_config.to_request_data(); + let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data)); let server = if args.http_server { Some(create_http_server( @@ -245,6 +247,9 @@ pub async fn start_lemmy_server(args: CmdArgs) -> Result<(), LemmyError> { federate.cancel().await?; } + // Wait for outgoing apub sends to complete + ActivityChannel::close(outgoing_activities_task).await?; + Ok(()) }