minor readability

This commit is contained in:
phiresky 2023-08-03 13:08:32 +00:00
parent 7e72ad87fe
commit de50f37a39

View File

@ -13,7 +13,7 @@ use lemmy_apub::activity_lists::SharedInboxActivities;
use lemmy_db_schema::{ use lemmy_db_schema::{
newtypes::{CommunityId, InstanceId}, newtypes::{CommunityId, InstanceId},
source::{activity::SentActivity, instance::Instance, site::Site}, source::{activity::SentActivity, instance::Instance, site::Site},
utils::{ActualDbPool, DbPool}, utils::DbPool,
}; };
use lemmy_db_views_actor::structs::CommunityFollowerView; use lemmy_db_views_actor::structs::CommunityFollowerView;
use lemmy_utils::{error::LemmyErrorExt2, REQWEST_TIMEOUT}; use lemmy_utils::{error::LemmyErrorExt2, REQWEST_TIMEOUT};
@ -71,24 +71,22 @@ impl InstanceWorker {
&mut self, &mut self,
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
self.update_communities(pool).await; let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative");
self.initial_fail_sleep().await;
self.update_communities(pool).await?;
self.initial_fail_sleep().await?;
while !self.stop.is_cancelled() { while !self.stop.is_cancelled() {
self.loop_batch(pool).await?; self.loop_batch(pool).await?;
if self.stop.is_cancelled() { if self.stop.is_cancelled() {
break; break;
} }
if Utc::now() - self.last_state_insert if (Utc::now() - self.last_state_insert) > save_state_every {
> chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative") self.save_and_send_state(pool).await?;
{
self.last_state_insert = Utc::now();
FederationQueueState::upsert(pool, &self.state).await?;
self.stats_sender.send(self.state.clone())?;
} }
self.update_communities(pool).await; self.update_communities(pool).await?;
} }
// final update of state in db // final update of state in db
FederationQueueState::upsert(pool, &self.state).await?; self.save_and_send_state(pool).await?;
Ok(()) Ok(())
} }
@ -172,8 +170,7 @@ impl InstanceWorker {
activity.id, activity.id,
self.state.fail_count self.state.fail_count
); );
self.stats_sender.send(self.state.clone())?; self.save_and_send_state(pool).await?;
FederationQueueState::upsert(pool, &self.state).await?;
req = sign_raw(&task, &self.context, REQWEST_TIMEOUT).await?; // resign request req = sign_raw(&task, &self.context, REQWEST_TIMEOUT).await?; // resign request
tokio::select! { tokio::select! {
() = sleep(retry_delay) => {}, () = sleep(retry_delay) => {},
@ -257,4 +254,10 @@ impl InstanceWorker {
new_last_fetch, new_last_fetch,
)) ))
} }
async fn save_and_send_state(&mut self, pool: &mut DbPool<'_>) -> Result<()> {
self.last_state_insert = Utc::now();
FederationQueueState::upsert(pool, &self.state).await?;
self.stats_sender.send(self.state.clone())?;
Ok(())
}
} }