diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index c5a29e813..f0abaa737 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -6,7 +6,6 @@ use activitypub_federation::config::FederationConfig; use chrono::{Local, Timelike}; use clap::Parser; use federation_queue_state::FederationQueueState; -use futures::Future; use lemmy_db_schema::{ source::instance::Instance, utils::{ActualDbPool, DbPool}, @@ -114,7 +113,7 @@ pub fn start_stop_federation_workers_cancellable( opts: Opts, pool: ActualDbPool, config: FederationConfig, -) -> CancellableTask<(), impl Future>> { +) -> CancellableTask<()> { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |c| { start_stop_federation_workers(opts, pool, config, c) }) diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 1fe6f9404..46a78956c 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -23,28 +23,23 @@ use serde_json::Value; use std::{ borrow::{Borrow, Cow}, future::Future, + pin::Pin, sync::Arc, time::Duration, }; use tokio::{task::JoinHandle, time::sleep}; use tokio_util::sync::CancellationToken; -pub struct CancellableTask -where - F: Future>, -{ - f: F, +pub struct CancellableTask { + f: Pin> + Send + 'static>>, } -impl CancellableTask -where - F: Future>, -{ +impl CancellableTask { /// spawn a task but with graceful shutdown - pub fn spawn( + pub fn spawn( timeout: Duration, task: impl FnOnce(CancellationToken) -> F, - ) -> CancellableTask>> + ) -> CancellableTask where F: Future> + Send + 'static, { @@ -62,7 +57,7 @@ where }); let abort = task.abort_handle(); CancellableTask { - f: async move { + f: Box::pin(async move { stop.cancel(); tokio::select! { r = task => { @@ -74,7 +69,7 @@ where Err(anyhow!("task aborted due to timeout")) } } - }, + }), } }