create struct to hold cancellable task for readability

This commit is contained in:
phiresky 2023-07-30 18:16:27 +00:00
parent 82c22439b8
commit 14479cefd2
3 changed files with 57 additions and 39 deletions

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
util::{retry_sleep_duration, spawn_cancellable}, util::{retry_sleep_duration, CancellableTask},
worker::instance_worker, worker::instance_worker,
}; };
use activitypub_federation::config::FederationConfig; use activitypub_federation::config::FederationConfig;
@ -71,7 +71,7 @@ async fn start_stop_federation_workers<T: Clone + Send + Sync + 'static>(
let stats_sender = stats_sender.clone(); let stats_sender = stats_sender.clone();
workers.insert( workers.insert(
instance.id, instance.id,
spawn_cancellable(WORKER_EXIT_TIMEOUT, |stop| { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, |stop| {
instance_worker( instance_worker(
pool.clone(), pool.clone(),
instance, instance,
@ -83,7 +83,7 @@ async fn start_stop_federation_workers<T: Clone + Send + Sync + 'static>(
); );
} else if !should_federate { } else if !should_federate {
if let Some(worker) = workers.remove(&instance.id) { if let Some(worker) = workers.remove(&instance.id) {
if let Err(e) = worker.await { if let Err(e) = worker.cancel().await {
tracing::error!("error stopping worker: {e}"); tracing::error!("error stopping worker: {e}");
} }
} }
@ -102,7 +102,8 @@ async fn start_stop_federation_workers<T: Clone + Send + Sync + 'static>(
workers.len(), workers.len(),
WORKER_EXIT_TIMEOUT WORKER_EXIT_TIMEOUT
); );
futures::future::join_all(workers.into_values()).await; // the cancel futures need to be awaited concurrently for the shutdown processes to be triggered concurrently
futures::future::join_all(workers.into_values().map(|e| e.cancel())).await;
exit_print.await?; exit_print.await?;
Ok(()) Ok(())
} }
@ -113,8 +114,8 @@ pub fn start_stop_federation_workers_cancellable(
opts: Opts, opts: Opts,
pool: ActualDbPool, pool: ActualDbPool,
config: FederationConfig<impl Clone + Send + Sync + 'static>, config: FederationConfig<impl Clone + Send + Sync + 'static>,
) -> impl Future<Output = anyhow::Result<()>> { ) -> CancellableTask<(), impl Future<Output = anyhow::Result<()>>> {
spawn_cancellable(WORKER_EXIT_TIMEOUT, move |c| { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |c| {
start_stop_federation_workers(opts, pool, config, c) start_stop_federation_workers(opts, pool, config, c)
}) })
} }

View File

@ -40,7 +40,7 @@ async fn main() -> anyhow::Result<()> {
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?;
let cancel = let task =
lemmy_federate::start_stop_federation_workers_cancellable(opts, pool, federation_config); lemmy_federate::start_stop_federation_workers_cancellable(opts, pool, federation_config);
tokio::select! { tokio::select! {
_ = tokio::signal::ctrl_c() => { _ = tokio::signal::ctrl_c() => {
@ -53,6 +53,6 @@ async fn main() -> anyhow::Result<()> {
tracing::warn!("Received terminate, shutting down gracefully..."); tracing::warn!("Received terminate, shutting down gracefully...");
} }
} }
cancel.await?; task.cancel().await?;
Ok(()) Ok(())
} }

View File

@ -29,42 +29,59 @@ use std::{
use tokio::{task::JoinHandle, time::sleep}; use tokio::{task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
/// spawn a task but with graceful shutdown pub struct CancellableTask<R: Send + 'static, F>
///
/// only await the returned future when you want to cancel the task
pub fn spawn_cancellable<R: Send + 'static, F>(
timeout: Duration,
task: impl FnOnce(CancellationToken) -> F,
) -> impl Future<Output = Result<R>>
where where
F: Future<Output = Result<R>> + Send + 'static, F: Future<Output = Result<R>>,
{ {
let stop = CancellationToken::new(); f: F,
let task = task(stop.clone()); }
let task: JoinHandle<Result<R>> = tokio::spawn(async move {
match task.await { impl<R: Send + 'static, F> CancellableTask<R, F>
Ok(o) => Ok(o), where
Err(e) => { F: Future<Output = Result<R>>,
tracing::error!("worker errored out: {e}"); {
// todo: if this error happens, requeue worker creation in main /// spawn a task but with graceful shutdown
Err(e) pub fn spawn(
} timeout: Duration,
} task: impl FnOnce(CancellationToken) -> F,
}); ) -> CancellableTask<R, impl Future<Output = Result<R>>>
let abort = task.abort_handle(); where
async move { F: Future<Output = Result<R>> + Send + 'static,
stop.cancel(); {
tokio::select! { let stop = CancellationToken::new();
r = task => { let task = task(stop.clone());
Ok(r.context("could not join")??) let task: JoinHandle<Result<R>> = tokio::spawn(async move {
}, match task.await {
_ = sleep(timeout) => { Ok(o) => Ok(o),
abort.abort(); Err(e) => {
tracing::warn!("Graceful shutdown timed out, aborting task"); tracing::error!("worker errored out: {e}");
Err(anyhow!("task aborted due to timeout")) // todo: if this error happens, requeue worker creation in main
Err(e)
} }
}
});
let abort = task.abort_handle();
CancellableTask {
f: async move {
stop.cancel();
tokio::select! {
r = task => {
Ok(r.context("could not join")??)
},
_ = sleep(timeout) => {
abort.abort();
tracing::warn!("Graceful shutdown timed out, aborting task");
Err(anyhow!("task aborted due to timeout"))
}
}
},
} }
} }
/// cancel the cancel signal, wait for timeout for the task to stop gracefully, otherwise abort it
pub async fn cancel(self) -> Result<R, anyhow::Error> {
self.f.await
}
} }
/// assuming apub priv key and ids are immutable, then we don't need to have TTL /// assuming apub priv key and ids are immutable, then we don't need to have TTL