diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index c72e51015..c5a29e813 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,5 +1,5 @@ use crate::{ - util::{retry_sleep_duration, spawn_cancellable}, + util::{retry_sleep_duration, CancellableTask}, worker::instance_worker, }; use activitypub_federation::config::FederationConfig; @@ -71,7 +71,7 @@ async fn start_stop_federation_workers( let stats_sender = stats_sender.clone(); workers.insert( instance.id, - spawn_cancellable(WORKER_EXIT_TIMEOUT, |stop| { + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, |stop| { instance_worker( pool.clone(), instance, @@ -83,7 +83,7 @@ async fn start_stop_federation_workers( ); } else if !should_federate { if let Some(worker) = workers.remove(&instance.id) { - if let Err(e) = worker.await { + if let Err(e) = worker.cancel().await { tracing::error!("error stopping worker: {e}"); } } @@ -102,7 +102,8 @@ async fn start_stop_federation_workers( workers.len(), WORKER_EXIT_TIMEOUT ); - futures::future::join_all(workers.into_values()).await; + // the cancel futures need to be awaited concurrently for the shutdown processes to be triggered concurrently + futures::future::join_all(workers.into_values().map(|e| e.cancel())).await; exit_print.await?; Ok(()) } @@ -113,8 +114,8 @@ pub fn start_stop_federation_workers_cancellable( opts: Opts, pool: ActualDbPool, config: FederationConfig, -) -> impl Future> { - spawn_cancellable(WORKER_EXIT_TIMEOUT, move |c| { +) -> CancellableTask<(), impl Future>> { + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |c| { start_stop_federation_workers(opts, pool, config, c) }) } diff --git a/crates/federate/src/main.rs b/crates/federate/src/main.rs index e17b23688..15b5dbe6f 100644 --- a/crates/federate/src/main.rs +++ b/crates/federate/src/main.rs @@ -40,7 +40,7 @@ async fn main() -> anyhow::Result<()> { let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; - let cancel = + let task = lemmy_federate::start_stop_federation_workers_cancellable(opts, pool, federation_config); tokio::select! { _ = tokio::signal::ctrl_c() => { @@ -53,6 +53,6 @@ async fn main() -> anyhow::Result<()> { tracing::warn!("Received terminate, shutting down gracefully..."); } } - cancel.await?; + task.cancel().await?; Ok(()) } diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index b138b1667..1fe6f9404 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -29,42 +29,59 @@ use std::{ use tokio::{task::JoinHandle, time::sleep}; use tokio_util::sync::CancellationToken; -/// spawn a task but with graceful shutdown -/// -/// only await the returned future when you want to cancel the task -pub fn spawn_cancellable( - timeout: Duration, - task: impl FnOnce(CancellationToken) -> F, -) -> impl Future> +pub struct CancellableTask where - F: Future> + Send + 'static, + F: Future>, { - let stop = CancellationToken::new(); - let task = task(stop.clone()); - let task: JoinHandle> = tokio::spawn(async move { - match task.await { - Ok(o) => Ok(o), - Err(e) => { - tracing::error!("worker errored out: {e}"); - // todo: if this error happens, requeue worker creation in main - Err(e) - } - } - }); - let abort = task.abort_handle(); - async move { - stop.cancel(); - tokio::select! { - r = task => { - Ok(r.context("could not join")??) - }, - _ = sleep(timeout) => { - abort.abort(); - tracing::warn!("Graceful shutdown timed out, aborting task"); - Err(anyhow!("task aborted due to timeout")) + f: F, +} + +impl CancellableTask +where + F: Future>, +{ + /// spawn a task but with graceful shutdown + pub fn spawn( + timeout: Duration, + task: impl FnOnce(CancellationToken) -> F, + ) -> CancellableTask>> + where + F: Future> + Send + 'static, + { + let stop = CancellationToken::new(); + let task = task(stop.clone()); + let task: JoinHandle> = tokio::spawn(async move { + match task.await { + Ok(o) => Ok(o), + Err(e) => { + tracing::error!("worker errored out: {e}"); + // todo: if this error happens, requeue worker creation in main + Err(e) } + } + }); + let abort = task.abort_handle(); + CancellableTask { + f: async move { + stop.cancel(); + tokio::select! { + r = task => { + Ok(r.context("could not join")??) + }, + _ = sleep(timeout) => { + abort.abort(); + tracing::warn!("Graceful shutdown timed out, aborting task"); + Err(anyhow!("task aborted due to timeout")) + } + } + }, } } + + /// cancel the cancel signal, wait for timeout for the task to stop gracefully, otherwise abort it + pub async fn cancel(self) -> Result { + self.f.await + } } /// assuming apub priv key and ids are immutable, then we don't need to have TTL