From 7e72ad87fe07f6f2f8e41b505819c986fe090af3 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 3 Aug 2023 12:58:46 +0000 Subject: [PATCH] make worker a struct for readability --- crates/federate/src/lib.rs | 21 ++- crates/federate/src/worker.rs | 337 ++++++++++++++++++++-------------- 2 files changed, 212 insertions(+), 146 deletions(-) diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 34e595f9e..9312c5637 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,11 +1,12 @@ use crate::{ util::{retry_sleep_duration, CancellableTask}, - worker::instance_worker, + worker::InstanceWorker, }; use activitypub_federation::config::FederationConfig; use chrono::{Local, Timelike}; use clap::Parser; use federation_queue_state::FederationQueueState; +use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ source::instance::Instance, utils::{ActualDbPool, DbPool}, @@ -36,10 +37,10 @@ pub struct Opts { pub process_index: i32, } -async fn start_stop_federation_workers( +async fn start_stop_federation_workers( opts: Opts, pool: ActualDbPool, - federation_config: FederationConfig, + federation_config: FederationConfig, cancel: CancellationToken, ) -> anyhow::Result<()> { let mut workers = HashMap::new(); @@ -68,16 +69,20 @@ async fn start_stop_federation_workers( let should_federate = allowed && !is_dead; if !workers.contains_key(&instance.id) && should_federate { let stats_sender = stats_sender.clone(); + let context = federation_config.to_request_data(); + let pool = pool.clone(); workers.insert( instance.id, - CancellableTask::spawn(WORKER_EXIT_TIMEOUT, |stop| { - instance_worker( - pool.clone(), + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, |stop| async move { + InstanceWorker::init_and_loop( instance, - federation_config.to_request_data(), + context, + &mut DbPool::Pool(&pool), stop, stats_sender, ) + .await?; + Ok(()) }), ); } else if !should_federate { @@ -112,7 +117,7 @@ async fn start_stop_federation_workers( pub fn start_stop_federation_workers_cancellable( opts: Opts, pool: ActualDbPool, - config: FederationConfig, + config: FederationConfig, ) -> CancellableTask<()> { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |c| { start_stop_federation_workers(opts, pool, config, c) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 578f8ac03..f81cacf84 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -8,6 +8,8 @@ use activitypub_federation::{ }; use anyhow::Result; use chrono::{DateTime, TimeZone, Utc}; +use lemmy_api_common::context::LemmyContext; +use lemmy_apub::activity_lists::SharedInboxActivities; use lemmy_db_schema::{ newtypes::{CommunityId, InstanceId}, source::{activity::SentActivity, instance::Instance, site::Site}, @@ -26,174 +28,233 @@ use tokio_util::sync::CancellationToken; static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(10); -/// loop fetch new activities from db and send them to the inboxes of the given instances -/// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) -pub async fn instance_worker( - pool: ActualDbPool, +pub(crate) struct InstanceWorker { instance: Instance, - data: Data, + site: Option, + followed_communities: HashMap>, stop: CancellationToken, + context: Data, stats_sender: UnboundedSender, -) -> Result<(), anyhow::Error> { - let pool = &mut DbPool::Pool(&pool); - let mut last_full_communities_fetch = Utc.timestamp_nanos(0); - let mut last_incremental_communities_fetch = Utc.timestamp_nanos(0); - let mut last_state_insert = Utc.timestamp_nanos(0); - let mut followed_communities: HashMap> = - get_communities(pool, instance.id, &mut last_incremental_communities_fetch).await?; - let site = Site::read_from_instance_id(pool, instance.id).await?; + last_full_communities_fetch: DateTime, + last_incremental_communities_fetch: DateTime, + state: FederationQueueState, + last_state_insert: DateTime, +} - let mut state = FederationQueueState::load(pool, &instance.domain).await?; - if state.fail_count > 0 { - // before starting queue, sleep remaining duration - let elapsed = (Utc::now() - state.last_retry).to_std()?; - let remaining = retry_sleep_duration(state.fail_count) - elapsed; - tokio::select! { - () = sleep(remaining) => {}, - () = stop.cancelled() => { return Ok(()); } - } +impl InstanceWorker { + pub(crate) async fn init_and_loop( + instance: Instance, + context: Data, + pool: &mut DbPool<'_>, // in theory there's a ref to the pool in context, but i couldn't get that to work wrt lifetimes + stop: CancellationToken, + stats_sender: UnboundedSender, + ) -> Result<(), anyhow::Error> { + let site = Site::read_from_instance_id(pool, instance.id).await?; + let state = FederationQueueState::load(pool, &instance.domain).await?; + let mut worker = InstanceWorker { + instance, + site, + followed_communities: HashMap::new(), + stop, + context, + stats_sender, + last_full_communities_fetch: Utc.timestamp_nanos(0), + last_incremental_communities_fetch: Utc.timestamp_nanos(0), + state, + last_state_insert: Utc.timestamp_nanos(0), + }; + worker.loop_until_stopped(pool).await } - while !stop.is_cancelled() { + /// loop fetch new activities from db and send them to the inboxes of the given instances + /// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) + pub(crate) async fn loop_until_stopped( + &mut self, + pool: &mut DbPool<'_>, + ) -> Result<(), anyhow::Error> { + self.update_communities(pool).await; + self.initial_fail_sleep().await; + while !self.stop.is_cancelled() { + self.loop_batch(pool).await?; + if self.stop.is_cancelled() { + break; + } + if Utc::now() - self.last_state_insert + > chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative") + { + self.last_state_insert = Utc::now(); + FederationQueueState::upsert(pool, &self.state).await?; + self.stats_sender.send(self.state.clone())?; + } + self.update_communities(pool).await; + } + // final update of state in db + FederationQueueState::upsert(pool, &self.state).await?; + Ok(()) + } + + async fn initial_fail_sleep(&mut self) -> Result<()> { + // before starting queue, sleep remaining duration if last request failed + if self.state.fail_count > 0 { + let elapsed = (Utc::now() - self.state.last_retry).to_std()?; + let remaining = retry_sleep_duration(self.state.fail_count) - elapsed; + tokio::select! { + () = sleep(remaining) => {}, + () = self.stop.cancelled() => {} + } + } + Ok(()) + } + async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { let latest_id = get_latest_activity_id(pool).await?; - let mut id = state.last_successful_id; + let mut id = self.state.last_successful_id; if id == latest_id { // no more work to be done, wait before rechecking tokio::select! { - () = sleep(Duration::from_secs(10)) => { continue; }, - () = stop.cancelled() => { return Ok(()); } + () = sleep(Duration::from_secs(10)) => {}, + () = self.stop.cancelled() => {} } + return Ok(()); } let mut processed_activities = 0; - 'batch: while id < latest_id + while id < latest_id && processed_activities < CHECK_SAVE_STATE_EVERY_IT - && !stop.is_cancelled() + && !self.stop.is_cancelled() { id += 1; processed_activities += 1; let Some(ele) = get_activity_cached(pool, id).await? else { - state.last_successful_id = id; + self.state.last_successful_id = id; continue; }; - let (activity, object) = (&ele.0, &ele.1); - let inbox_urls = get_inbox_urls(&instance, &site, &followed_communities, activity); - if inbox_urls.is_empty() { - state.last_successful_id = id; - continue; + self.send_retry_loop(pool, &ele.0, &ele.1).await?; + if self.stop.is_cancelled() { + return Ok(()); } - let Some(actor_apub_id) = &activity.actor_apub_id else { - continue; // activity was inserted before persistent queue was activated - }; - let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id).await?; + // send success! + self.state.last_successful_id = id; + self.state.fail_count = 0; + } + Ok(()) + } - let inbox_urls = inbox_urls.into_iter().collect(); - let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &data) - .await - .into_anyhow()?; - for task in requests { - // usually only one due to shared inbox - let mut req = sign_raw(&task, &data, REQWEST_TIMEOUT).await?; - tracing::info!("sending out {}", task); - while let Err(e) = send_raw(&task, &data, req).await { - state.fail_count += 1; - state.last_retry = Utc::now(); - let retry_delay: Duration = retry_sleep_duration(state.fail_count); - tracing::info!( - "{}: retrying {id} attempt {} with delay {retry_delay:.2?}. ({e})", - instance.domain, - state.fail_count - ); - stats_sender.send(state.clone())?; - FederationQueueState::upsert(pool, &state).await?; - req = sign_raw(&task, &data, REQWEST_TIMEOUT).await?; // resign request - tokio::select! { - () = sleep(retry_delay) => {}, - () = stop.cancelled() => { - // save state to db and exit - break 'batch; - } + /** this function will only return if (a) send succeeded or (b) worker cancelled */ + async fn send_retry_loop( + &mut self, + pool: &mut DbPool<'_>, + activity: &SentActivity, + object: &SharedInboxActivities, + ) -> Result<()> { + let inbox_urls = self.get_inbox_urls(activity); + if inbox_urls.is_empty() { + self.state.last_successful_id = activity.id; + return Ok(()); + } + let Some(actor_apub_id) = &activity.actor_apub_id else { + return Ok(()); // activity was inserted before persistent queue was activated + }; + let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id).await?; + + let inbox_urls = inbox_urls.into_iter().collect(); + let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &self.context) + .await + .into_anyhow()?; + for task in requests { + // usually only one due to shared inbox + let mut req = sign_raw(&task, &self.context, REQWEST_TIMEOUT).await?; + tracing::info!("sending out {}", task); + while let Err(e) = send_raw(&task, &self.context, req).await { + self.state.fail_count += 1; + self.state.last_retry = Utc::now(); + let retry_delay: Duration = retry_sleep_duration(self.state.fail_count); + tracing::info!( + "{}: retrying {} attempt {} with delay {retry_delay:.2?}. ({e})", + self.instance.domain, + activity.id, + self.state.fail_count + ); + self.stats_sender.send(self.state.clone())?; + FederationQueueState::upsert(pool, &self.state).await?; + req = sign_raw(&task, &self.context, REQWEST_TIMEOUT).await?; // resign request + tokio::select! { + () = sleep(retry_delay) => {}, + () = self.stop.cancelled() => { + // save state to db and exit + return Ok(()); } } } - // send success! - state.last_successful_id = id; - state.fail_count = 0; } + Ok(()) + } - if Utc::now() - last_state_insert - > chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative") - { - last_state_insert = Utc::now(); - FederationQueueState::upsert(pool, &state).await?; - stats_sender.send(state.clone())?; - } - { - // update communities - if (Utc::now() - last_incremental_communities_fetch) > chrono::Duration::seconds(10) { - // process additions every 10s - followed_communities.extend( - get_communities(pool, instance.id, &mut last_incremental_communities_fetch).await?, - ); - } - if (Utc::now() - last_full_communities_fetch) > chrono::Duration::seconds(300) { - // process removals every 5min - last_full_communities_fetch = Utc.timestamp_nanos(0); - followed_communities = - get_communities(pool, instance.id, &mut last_full_communities_fetch).await?; - last_incremental_communities_fetch = last_full_communities_fetch; + /// get inbox urls of sending the given activity to the given instance + /// most often this will return 0 values (if instance doesn't care about the activity) + /// or 1 value (the shared inbox) + /// > 1 values only happens for non-lemmy software + fn get_inbox_urls(&self, activity: &SentActivity) -> HashSet { + let mut inbox_urls: HashSet = HashSet::new(); + + if activity.send_all_instances { + if let Some(site) = &self.site { + // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. + inbox_urls.insert(site.inbox_url.inner().clone()); } } - } - - Ok(()) -} - -/// get inbox urls of sending the given activity to the given instance -/// most often this will return 0 values (if instance doesn't care about the activity) -/// or 1 value (the shared inbox) -/// > 1 values only happens for non-lemmy software -fn get_inbox_urls( - instance: &Instance, - site: &Option, - followed_communities: &HashMap>, - activity: &SentActivity, -) -> HashSet { - let mut inbox_urls: HashSet = HashSet::new(); - - if activity.send_all_instances { - if let Some(site) = &site { - // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. - inbox_urls.insert(site.inbox_url.inner().clone()); + for t in &activity.send_community_followers_of { + if let Some(urls) = self.followed_communities.get(t) { + inbox_urls.extend(urls.iter().map(std::clone::Clone::clone)); + } } - } - for t in &activity.send_community_followers_of { - if let Some(urls) = followed_communities.get(t) { - inbox_urls.extend(urls.iter().map(std::clone::Clone::clone)); + for inbox in &activity.send_inboxes { + if inbox.domain() != Some(&self.instance.domain) { + continue; + } + inbox_urls.insert(inbox.inner().clone()); } + inbox_urls } - for inbox in &activity.send_inboxes { - if inbox.domain() != Some(&instance.domain) { - continue; - } - inbox_urls.insert(inbox.inner().clone()); - } - inbox_urls -} -/// get a list of local communities with the remote inboxes on the given instance that cares about them -async fn get_communities( - pool: &mut DbPool<'_>, - instance_id: InstanceId, - last_fetch: &mut DateTime, -) -> Result>> { - let e = *last_fetch; - *last_fetch = Utc::now(); // update to time before fetch to ensure overlap - Ok( - CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, e) - .await? - .into_iter() - .fold(HashMap::new(), |mut map, (c, u)| { - map.entry(c).or_insert_with(HashSet::new).insert(u.into()); - map - }), - ) + async fn update_communities(&mut self, pool: &mut DbPool<'_>) -> Result<()> { + if (Utc::now() - self.last_full_communities_fetch) > chrono::Duration::seconds(300) { + // process removals every 5min + self.last_full_communities_fetch = Utc.timestamp_nanos(0); + (self.followed_communities, self.last_full_communities_fetch) = self + .get_communities(pool, self.instance.id, self.last_full_communities_fetch) + .await?; + self.last_incremental_communities_fetch = self.last_full_communities_fetch; + } + if (Utc::now() - self.last_incremental_communities_fetch) > chrono::Duration::seconds(10) { + let (news, time) = self + .get_communities( + pool, + self.instance.id, + self.last_incremental_communities_fetch, + ) + .await?; + // process additions every 10s + self.followed_communities.extend(news); + self.last_incremental_communities_fetch = time; + } + Ok(()) + } + + /// get a list of local communities with the remote inboxes on the given instance that cares about them + async fn get_communities( + &mut self, + pool: &mut DbPool<'_>, + instance_id: InstanceId, + last_fetch: DateTime, + ) -> Result<(HashMap>, DateTime)> { + let new_last_fetch = Utc::now(); // update to time before fetch to ensure overlap + Ok(( + CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, last_fetch) + .await? + .into_iter() + .fold(HashMap::new(), |mut map, (c, u)| { + map.entry(c).or_insert_with(HashSet::new).insert(u.into()); + map + }), + new_last_fetch, + )) + } }