diff --git a/crates/apub/src/activities/create_or_update/private_message.rs b/crates/apub/src/activities/create_or_update/private_message.rs index 452c39207..74f833051 100644 --- a/crates/apub/src/activities/create_or_update/private_message.rs +++ b/crates/apub/src/activities/create_or_update/private_message.rs @@ -12,15 +12,8 @@ use activitypub_federation::{ protocol::verification::verify_domains_match, traits::{ActivityHandler, Actor, Object}, }; -use lemmy_api_common::{ - context::LemmyContext, - private_message::{CreatePrivateMessage, EditPrivateMessage, PrivateMessageResponse}, -}; -use lemmy_db_schema::{ - newtypes::PersonId, - source::{activity::ActivitySendTargets, person::Person, private_message::PrivateMessage}, - traits::Crud, -}; +use lemmy_api_common::context::LemmyContext; +use lemmy_db_schema::source::activity::ActivitySendTargets; use lemmy_db_views::structs::PrivateMessageView; use lemmy_utils::error::LemmyError; use url::Url; diff --git a/crates/apub/src/activities/deletion/delete_user.rs b/crates/apub/src/activities/deletion/delete_user.rs index 929ac2e88..694ba3877 100644 --- a/crates/apub/src/activities/deletion/delete_user.rs +++ b/crates/apub/src/activities/deletion/delete_user.rs @@ -10,11 +10,7 @@ use activitypub_federation::{ protocol::verification::verify_urls_match, traits::{ActivityHandler, Actor}, }; -use lemmy_api_common::{ - context::LemmyContext, - person::{DeleteAccount, DeleteAccountResponse}, - utils::{delete_user_account, local_user_view_from_jwt}, -}; +use lemmy_api_common::{context::LemmyContext, utils::delete_user_account}; use lemmy_db_schema::source::{activity::ActivitySendTargets, person::Person}; use lemmy_utils::error::LemmyError; use url::Url; diff --git a/crates/apub/src/activities/following/follow.rs b/crates/apub/src/activities/following/follow.rs index 09218d2ab..d2754dd58 100644 --- a/crates/apub/src/activities/following/follow.rs +++ b/crates/apub/src/activities/following/follow.rs @@ -20,7 +20,7 @@ use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ source::{ activity::ActivitySendTargets, - community::{Community, CommunityFollower, CommunityFollowerForm}, + community::{CommunityFollower, CommunityFollowerForm}, person::{PersonFollower, PersonFollowerForm}, }, traits::Followable, diff --git a/crates/federate/src/federation_queue_state.rs b/crates/federate/src/federation_queue_state.rs index e048f120d..cf6a33a81 100644 --- a/crates/federate/src/federation_queue_state.rs +++ b/crates/federate/src/federation_queue_state.rs @@ -19,7 +19,7 @@ pub struct FederationQueueState { impl FederationQueueState { /// load or return a default empty value pub async fn load(pool: &mut DbPool<'_>, domain_: &str) -> Result { - use lemmy_db_schema::schema::federation_queue_state::dsl::*; + use lemmy_db_schema::schema::federation_queue_state::dsl::federation_queue_state; let conn = &mut get_conn(pool).await?; Ok( federation_queue_state @@ -37,8 +37,8 @@ impl FederationQueueState { ) } pub async fn upsert(pool: &mut DbPool<'_>, state: &FederationQueueState) -> Result<()> { + use lemmy_db_schema::schema::federation_queue_state::dsl::{domain, federation_queue_state}; let conn = &mut get_conn(pool).await?; - use lemmy_db_schema::schema::federation_queue_state::dsl::*; state .insert_into(federation_queue_state) diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index f0abaa737..34e595f9e 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -53,7 +53,7 @@ async fn start_stop_federation_workers( let mut total_count = 0; let mut dead_count = 0; let mut disallowed_count = 0; - for (instance, allowed) in Instance::read_all_with_blocked(pool2).await?.into_iter() { + for (instance, allowed) in Instance::read_all_with_blocked(pool2).await? { if instance.id.inner() % opts.process_count != process_index { continue; } @@ -102,7 +102,7 @@ async fn start_stop_federation_workers( WORKER_EXIT_TIMEOUT ); // the cancel futures need to be awaited concurrently for the shutdown processes to be triggered concurrently - futures::future::join_all(workers.into_values().map(|e| e.cancel())).await; + futures::future::join_all(workers.into_values().map(util::CancellableTask::cancel)).await; exit_print.await?; Ok(()) } @@ -124,7 +124,7 @@ async fn receive_print_stats( pool: ActualDbPool, mut receiver: UnboundedReceiver, ) { - let mut pool = &mut DbPool::Pool(&pool); + let pool = &mut DbPool::Pool(&pool); let mut printerval = tokio::time::interval(Duration::from_secs(60)); printerval.tick().await; // skip first let mut stats = HashMap::new(); @@ -133,13 +133,13 @@ async fn receive_print_stats( ele = receiver.recv() => { let Some(ele) = ele else { tracing::info!("done. quitting"); - print_stats(&mut pool, &stats).await; + print_stats(pool, &stats).await; return; }; stats.insert(ele.domain.clone(), ele); }, _ = printerval.tick() => { - print_stats(&mut pool, &stats).await; + print_stats(pool, &stats).await; } } } @@ -154,7 +154,10 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap, stats: &HashMap 0 { + tracing::info!("{}: Ok. {} behind", stat.domain, behind); } else { - if behind > 0 { - tracing::info!("{}: Ok. {} behind", stat.domain, behind); - } else { - ok_count += 1; - } + ok_count += 1; } } tracing::info!("{ok_count} others up to date"); diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 46a78956c..47bb1253c 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -135,6 +135,7 @@ pub fn intern_url<'a>(url: impl Into>) -> Arc { /// this should maybe be a newtype like all the other PersonId CommunityId etc. pub type ActivityId = i64; +type CachedActivityInfo = Option>; /// activities are immutable so cache does not need to have TTL /// May return None if the corresponding id does not exist or is a received activity. /// Holes in serials are expected behaviour in postgresql @@ -142,8 +143,8 @@ pub type ActivityId = i64; pub async fn get_activity_cached( pool: &mut DbPool<'_>, activity_id: ActivityId, -) -> Result>> { - static ACTIVITIES: Lazy>>> = +) -> Result { + static ACTIVITIES: Lazy> = Lazy::new(|| Cache::builder().max_capacity(10000).build()); ACTIVITIES .try_get_with(activity_id, async { @@ -186,7 +187,7 @@ pub async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result /// how long to sleep based on how many retries have already happened pub fn retry_sleep_duration(retry_count: i32) -> Duration { - Duration::from_secs_f64(10.0 * 2.0_f64.powf(retry_count as f64)) + Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count))) } #[derive(QueryableByName)] diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 240b37d65..a3d7ac379 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -44,19 +44,15 @@ pub async fn instance_worker( stop: CancellationToken, stats_sender: UnboundedSender, ) -> Result<(), anyhow::Error> { - let mut pool = &mut DbPool::Pool(&pool); + let pool = &mut DbPool::Pool(&pool); let mut last_full_communities_fetch = Utc.timestamp_nanos(0); let mut last_incremental_communities_fetch = Utc.timestamp_nanos(0); let mut last_state_insert = Utc.timestamp_nanos(0); - let mut followed_communities: HashMap>> = get_communities( - &mut pool, - instance.id, - &mut last_incremental_communities_fetch, - ) - .await?; - let site = Site::read_from_instance_id(&mut pool, instance.id).await?; + let mut followed_communities: HashMap>> = + get_communities(pool, instance.id, &mut last_incremental_communities_fetch).await?; + let site = Site::read_from_instance_id(pool, instance.id).await?; - let mut state = FederationQueueState::load(&mut pool, &instance.domain).await?; + let mut state = FederationQueueState::load(pool, &instance.domain).await?; if state.fail_count > 0 { // before starting queue, sleep remaining duration let elapsed = (Utc::now() - state.last_retry).to_std()?; @@ -67,7 +63,7 @@ pub async fn instance_worker( } } while !stop.is_cancelled() { - let latest_id = get_latest_activity_id(&mut pool).await?; + let latest_id = get_latest_activity_id(pool).await?; let mut id = state.last_successful_id; if id == latest_id { // no more work to be done, wait before rechecking @@ -83,7 +79,7 @@ pub async fn instance_worker( { id += 1; processed_activities += 1; - let Some(ele) = get_activity_cached(&mut pool, id).await? else { + let Some(ele) = get_activity_cached(pool, id).await? else { state.last_successful_id = id; continue; }; @@ -96,7 +92,7 @@ pub async fn instance_worker( let Some(actor_apub_id) = &activity.actor_apub_id else { continue; // activity was inserted before persistent queue was activated }; - let actor = get_actor_cached(&mut pool, activity.actor_type, actor_apub_id).await?; + let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id).await?; let inbox_urls = inbox_urls.into_iter().map(|e| (*e).clone()).collect(); let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &data) @@ -116,7 +112,7 @@ pub async fn instance_worker( state.fail_count ); stats_sender.send(state.clone())?; - FederationQueueState::upsert(&mut pool, &state).await?; + FederationQueueState::upsert(pool, &state).await?; req = sign_raw(&task, &data, REQWEST_TIMEOUT).await?; // resign request tokio::select! { () = sleep(retry_delay) => {}, @@ -132,9 +128,11 @@ pub async fn instance_worker( state.fail_count = 0; } - if Utc::now() - last_state_insert > chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).unwrap() { + if Utc::now() - last_state_insert + > chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative") + { last_state_insert = Utc::now(); - FederationQueueState::upsert(&mut pool, &state).await?; + FederationQueueState::upsert(pool, &state).await?; stats_sender.send(state.clone())?; } { @@ -142,20 +140,15 @@ pub async fn instance_worker( if (Utc::now() - last_incremental_communities_fetch) > chrono::Duration::seconds(10) { // process additions every 10s followed_communities.extend( - get_communities( - &mut pool, - instance.id, - &mut last_incremental_communities_fetch, - ) - .await?, + get_communities(pool, instance.id, &mut last_incremental_communities_fetch).await?, ); } if (Utc::now() - last_full_communities_fetch) > chrono::Duration::seconds(300) { // process removals every 5min last_full_communities_fetch = Utc.timestamp_nanos(0); followed_communities = - get_communities(&mut pool, instance.id, &mut last_full_communities_fetch).await?; - last_incremental_communities_fetch = last_full_communities_fetch.clone(); + get_communities(pool, instance.id, &mut last_full_communities_fetch).await?; + last_incremental_communities_fetch = last_full_communities_fetch; } } } @@ -183,7 +176,7 @@ fn get_inbox_urls( } for t in &targets.community_followers_of { if let Some(urls) = followed_communities.get(t) { - inbox_urls.extend(urls.iter().map(|e| e.clone())); + inbox_urls.extend(urls.iter().map(std::clone::Clone::clone)); } } for inbox in &targets.inboxes { @@ -210,7 +203,7 @@ async fn get_communities( .fold(HashMap::new(), |mut map, (c, u)| { map .entry(c) - .or_insert_with(|| HashSet::new()) + .or_insert_with(HashSet::new) .insert(intern_url(Cow::Owned(u.into()))); map }),