From 284ebb33043ecbcb2afd41cfe737daffcf57830a Mon Sep 17 00:00:00 2001 From: phiresky Date: Fri, 1 Sep 2023 12:15:00 +0000 Subject: [PATCH] start federation based on latest id at the time --- crates/federate/src/federation_queue_state.rs | 4 ++-- crates/federate/src/worker.rs | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/federate/src/federation_queue_state.rs b/crates/federate/src/federation_queue_state.rs index 80708fa9e..164dbf786 100644 --- a/crates/federate/src/federation_queue_state.rs +++ b/crates/federate/src/federation_queue_state.rs @@ -17,7 +17,7 @@ pub struct FederationQueueState { } impl FederationQueueState { - /// load or return a default empty value + /// load state or return a default empty value pub async fn load(pool: &mut DbPool<'_>, domain_: &str) -> Result { use lemmy_db_schema::schema::federation_queue_state::dsl::{domain, federation_queue_state}; let conn = &mut get_conn(pool).await?; @@ -32,7 +32,7 @@ impl FederationQueueState { domain: domain_.to_owned(), fail_count: 0, last_retry: Utc.timestamp_nanos(0), - last_successful_id: 0, // todo: start at current id not from beginning + last_successful_id: 0, // this value is set to the most current id for new instances }), ) } diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 37663b63a..11cfa5594 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -105,6 +105,13 @@ impl InstanceWorker { } async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { let latest_id = get_latest_activity_id(pool).await?; + if self.state.last_successful_id == 0 { + // this is the initial creation (instance first seen) of the federation queue for this instance + // skip all past activities: + self.state.last_successful_id = latest_id; + // save here to ensure it's not read as 0 again later if no activities have happened + self.save_and_send_state(pool).await?; + } let mut id = self.state.last_successful_id; if id == latest_id { // no more work to be done, wait before rechecking