diff --git a/crates/api_common/src/send_activity.rs b/crates/api_common/src/send_activity.rs index a2bc9a6de..6c91258ec 100644 --- a/crates/api_common/src/send_activity.rs +++ b/crates/api_common/src/send_activity.rs @@ -4,10 +4,13 @@ use futures::future::BoxFuture; use lemmy_db_schema::source::post::Post; use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION}; use once_cell::sync::{Lazy, OnceCell}; -use tokio::sync::{ - mpsc, - mpsc::{UnboundedReceiver, UnboundedSender}, - Mutex, +use tokio::{ + sync::{ + mpsc, + mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender}, + Mutex, + }, + task::JoinHandle, }; type MatchOutgoingActivitiesBoxed = @@ -21,17 +24,22 @@ pub enum SendActivityData { CreatePost(Post), } +// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with +// ctrl+c still works. static ACTIVITY_CHANNEL: Lazy = Lazy::new(|| { let (sender, receiver) = mpsc::unbounded_channel(); + let weak_sender = sender.downgrade(); ActivityChannel { - sender, + weak_sender, receiver: Mutex::new(receiver), + keepalive_sender: Mutex::new(Some(sender)), } }); pub struct ActivityChannel { - sender: UnboundedSender, + weak_sender: WeakUnboundedSender, receiver: Mutex>, + keepalive_sender: Mutex>>, } impl ActivityChannel { @@ -49,10 +57,18 @@ impl ActivityChannel { .get() .expect("retrieve function pointer")(data, context) .await?; - } else { - let lock = &ACTIVITY_CHANNEL.sender; - lock.send(data)?; + } + // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender, + // not sure which way is more efficient + else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() { + sender.send(data)?; } Ok(()) } + + pub async fn close(outgoing_activities_task: JoinHandle>) -> LemmyResult<()> { + ACTIVITY_CHANNEL.keepalive_sender.lock().await.take(); + outgoing_activities_task.await??; + Ok(()) + } } diff --git a/src/lib.rs b/src/lib.rs index e07ae2685..4950aff82 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,7 @@ use lemmy_api_common::{ context::LemmyContext, lemmy_db_views::structs::SiteView, request::build_user_agent, - send_activity::MATCH_OUTGOING_ACTIVITIES, + send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES}, utils::{ check_private_instance_and_federation_enabled, local_site_rate_limit_to_rate_limit_config, @@ -227,7 +227,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { .await?; // Wait for outgoing apub sends to complete - outgoing_activities_task.await??; + ActivityChannel::close(outgoing_activities_task).await?; Ok(()) }