diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index f81cacf84..c784e5ad3 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -13,7 +13,7 @@ use lemmy_apub::activity_lists::SharedInboxActivities; use lemmy_db_schema::{ newtypes::{CommunityId, InstanceId}, source::{activity::SentActivity, instance::Instance, site::Site}, - utils::{ActualDbPool, DbPool}, + utils::DbPool, }; use lemmy_db_views_actor::structs::CommunityFollowerView; use lemmy_utils::{error::LemmyErrorExt2, REQWEST_TIMEOUT}; @@ -71,24 +71,22 @@ impl InstanceWorker { &mut self, pool: &mut DbPool<'_>, ) -> Result<(), anyhow::Error> { - self.update_communities(pool).await; - self.initial_fail_sleep().await; + let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); + + self.update_communities(pool).await?; + self.initial_fail_sleep().await?; while !self.stop.is_cancelled() { self.loop_batch(pool).await?; if self.stop.is_cancelled() { break; } - if Utc::now() - self.last_state_insert - > chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative") - { - self.last_state_insert = Utc::now(); - FederationQueueState::upsert(pool, &self.state).await?; - self.stats_sender.send(self.state.clone())?; + if (Utc::now() - self.last_state_insert) > save_state_every { + self.save_and_send_state(pool).await?; } - self.update_communities(pool).await; + self.update_communities(pool).await?; } // final update of state in db - FederationQueueState::upsert(pool, &self.state).await?; + self.save_and_send_state(pool).await?; Ok(()) } @@ -172,8 +170,7 @@ impl InstanceWorker { activity.id, self.state.fail_count ); - self.stats_sender.send(self.state.clone())?; - FederationQueueState::upsert(pool, &self.state).await?; + self.save_and_send_state(pool).await?; req = sign_raw(&task, &self.context, REQWEST_TIMEOUT).await?; // resign request tokio::select! { () = sleep(retry_delay) => {}, @@ -257,4 +254,10 @@ impl InstanceWorker { new_last_fetch, )) } + async fn save_and_send_state(&mut self, pool: &mut DbPool<'_>) -> Result<()> { + self.last_state_insert = Utc::now(); + FederationQueueState::upsert(pool, &self.state).await?; + self.stats_sender.send(self.state.clone())?; + Ok(()) + } }