diff --git a/Cargo.lock b/Cargo.lock index b6eba6e3a..7704977a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2823,7 +2823,6 @@ dependencies = [ "bytes", "chrono", "clap", - "dashmap", "diesel", "diesel-async", "enum_delegate", diff --git a/crates/apub/src/activities/block/block_user.rs b/crates/apub/src/activities/block/block_user.rs index b761873fa..7fd5768fd 100644 --- a/crates/apub/src/activities/block/block_user.rs +++ b/crates/apub/src/activities/block/block_user.rs @@ -98,8 +98,7 @@ impl BlockUser { match target { SiteOrCommunity::Site(_) => { - let mut inboxes = ActivitySendTargets::empty(); - inboxes.set_all_instances(true); + let inboxes = ActivitySendTargets::to_all_instances(); send_lemmy_activity(context, block, mod_, inboxes, false).await } SiteOrCommunity::Community(c) => { diff --git a/crates/apub/src/activities/block/undo_block_user.rs b/crates/apub/src/activities/block/undo_block_user.rs index 4c8e245fd..43f1b235c 100644 --- a/crates/apub/src/activities/block/undo_block_user.rs +++ b/crates/apub/src/activities/block/undo_block_user.rs @@ -63,7 +63,7 @@ impl UndoBlockUser { let mut inboxes = ActivitySendTargets::to_inbox(user.shared_inbox_or_inbox()); match target { SiteOrCommunity::Site(_) => { - inboxes.set_all_instances(true); + inboxes.set_all_instances(); send_lemmy_activity(context, undo, mod_, inboxes, false).await } SiteOrCommunity::Community(c) => { diff --git a/crates/apub/src/activities/deletion/delete_user.rs b/crates/apub/src/activities/deletion/delete_user.rs index 694ba3877..782d4759b 100644 --- a/crates/apub/src/activities/deletion/delete_user.rs +++ b/crates/apub/src/activities/deletion/delete_user.rs @@ -38,8 +38,7 @@ pub async fn delete_user(person: Person, context: Data) -> Result< cc: vec![], }; - let mut inboxes = ActivitySendTargets::empty(); - inboxes.set_all_instances(true); + let inboxes = ActivitySendTargets::to_all_instances(); send_lemmy_activity(&context, delete, &actor, inboxes, true).await?; Ok(()) diff --git a/crates/db_schema/src/source/activity.rs b/crates/db_schema/src/source/activity.rs index a7b4e6687..57881877f 100644 --- a/crates/db_schema/src/source/activity.rs +++ b/crates/db_schema/src/source/activity.rs @@ -48,14 +48,16 @@ impl ActivitySendTargets { } pub fn to_local_community_followers(id: CommunityId) -> ActivitySendTargets { let mut a = ActivitySendTargets::empty(); - a.add_local_community_followers(id); + a.community_followers_of.insert(id); a } - pub fn add_local_community_followers(&mut self, id: CommunityId) { - self.community_followers_of.insert(id); + pub fn to_all_instances() -> ActivitySendTargets { + let mut a = ActivitySendTargets::empty(); + a.all_instances = true; + a } - pub fn set_all_instances(&mut self, b: bool) { - self.all_instances = b; + pub fn set_all_instances(&mut self) { + self.all_instances = true; } pub fn add_inbox(&mut self, inbox: Url) { diff --git a/crates/federate/Cargo.toml b/crates/federate/Cargo.toml index 5c1c1a9e7..c9121490b 100644 --- a/crates/federate/Cargo.toml +++ b/crates/federate/Cargo.toml @@ -11,31 +11,32 @@ repository.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -activitypub_federation.workspace = true -anyhow.workspace = true -async-trait = "0.1.71" -bytes = "1.4.0" -chrono.workspace = true -clap = { version = "4.3.19", features = ["derive"] } -dashmap = "5.5.0" -diesel = { workspace = true, features = ["postgres", "chrono", "serde_json"] } -diesel-async = { workspace = true, features = ["deadpool", "postgres"] } -enum_delegate = "0.2.0" -futures.workspace = true lemmy_api_common.workspace = true lemmy_apub.workspace = true lemmy_db_schema = { workspace = true, features = ["full"] } lemmy_db_views_actor.workspace = true lemmy_utils.workspace = true -moka = { version = "0.11.2", features = ["future"] } + +activitypub_federation.workspace = true +anyhow.workspace = true +futures.workspace = true +chrono.workspace = true +diesel = { workspace = true, features = ["postgres", "chrono", "serde_json"] } +diesel-async = { workspace = true, features = ["deadpool", "postgres"] } once_cell.workspace = true -openssl = "0.10.55" reqwest.workspace = true +serde_json.workspace = true +serde.workspace = true +tokio = { workspace = true, features = ["full"] } +tracing.workspace = true + +async-trait = "0.1.71" +bytes = "1.4.0" +clap = { version = "4.3.19", features = ["derive"] } +enum_delegate = "0.2.0" +moka = { version = "0.11.2", features = ["future"] } +openssl = "0.10.55" reqwest-middleware = "0.2.2" reqwest-tracing = "0.4.5" -serde.workspace = true -serde_json.workspace = true -tokio = { workspace = true, features = ["full"] } tokio-util = "0.7.8" -tracing.workspace = true tracing-subscriber = "0.3.17" diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 11b6f366a..bf6a436bb 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -1,5 +1,4 @@ use anyhow::{anyhow, Context, Result}; -use dashmap::DashSet; use diesel::{prelude::*, sql_types::Int8}; use diesel_async::RunQueryDsl; use lemmy_apub::{ @@ -20,13 +19,7 @@ use moka::future::Cache; use once_cell::sync::Lazy; use reqwest::Url; use serde_json::Value; -use std::{ - borrow::{Borrow, Cow}, - future::Future, - pin::Pin, - sync::Arc, - time::Duration, -}; +use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; use tokio::{task::JoinHandle, time::sleep}; use tokio_util::sync::CancellationToken; @@ -81,7 +74,7 @@ impl CancellableTask { /// assuming apub priv key and ids are immutable, then we don't need to have TTL /// TODO: capacity should be configurable maybe based on memory use -pub async fn get_actor_cached( +pub(crate) async fn get_actor_cached( pool: &mut DbPool<'_>, actor_type: ActorType, actor_apub_id: &Url, @@ -117,30 +110,15 @@ pub async fn get_actor_cached( .map_err(|e| anyhow::anyhow!("err getting actor: {e:?}")) } -/// intern urls to reduce memory usage -/// not sure if worth it -pub fn intern_url<'a>(url: impl Into>) -> Arc { - static INTERNED_URLS: Lazy>> = Lazy::new(DashSet::new); - let url: Cow<'a, Url> = url.into(); - return INTERNED_URLS - .get::(url.borrow()) - .map(|e| e.clone()) - .unwrap_or_else(|| { - let ret = Arc::new(url.into_owned()); - INTERNED_URLS.insert(ret.clone()); - ret - }); -} - /// this should maybe be a newtype like all the other PersonId CommunityId etc. -pub type ActivityId = i64; +pub(crate) type ActivityId = i64; type CachedActivityInfo = Option>; /// activities are immutable so cache does not need to have TTL /// May return None if the corresponding id does not exist or is a received activity. /// Holes in serials are expected behaviour in postgresql /// todo: cache size should probably be configurable / dependent on desired memory usage -pub async fn get_activity_cached( +pub(crate) async fn get_activity_cached( pool: &mut DbPool<'_>, activity_id: ActivityId, ) -> Result { @@ -165,7 +143,7 @@ pub async fn get_activity_cached( } /// return the most current activity id (with 1 second cache) -pub async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result { +pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result { static CACHE: Lazy> = Lazy::new(|| { Cache::builder() .time_to_live(Duration::from_secs(1)) @@ -186,7 +164,7 @@ pub async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result } /// how long to sleep based on how many retries have already happened -pub fn retry_sleep_duration(retry_count: i32) -> Duration { +pub(crate) fn retry_sleep_duration(retry_count: i32) -> Duration { Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count))) } diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 86cacae7b..578f8ac03 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,12 +1,6 @@ use crate::{ federation_queue_state::FederationQueueState, - util::{ - get_activity_cached, - get_actor_cached, - get_latest_activity_id, - intern_url, - retry_sleep_duration, - }, + util::{get_activity_cached, get_actor_cached, get_latest_activity_id, retry_sleep_duration}, }; use activitypub_federation::{ activity_queue::{prepare_raw, send_raw, sign_raw}, @@ -23,10 +17,7 @@ use lemmy_db_views_actor::structs::CommunityFollowerView; use lemmy_utils::{error::LemmyErrorExt2, REQWEST_TIMEOUT}; use reqwest::Url; use std::{ - borrow::Cow, collections::{HashMap, HashSet}, - ops::Deref, - sync::Arc, time::Duration, }; use tokio::{sync::mpsc::UnboundedSender, time::sleep}; @@ -48,7 +39,7 @@ pub async fn instance_worker( let mut last_full_communities_fetch = Utc.timestamp_nanos(0); let mut last_incremental_communities_fetch = Utc.timestamp_nanos(0); let mut last_state_insert = Utc.timestamp_nanos(0); - let mut followed_communities: HashMap>> = + let mut followed_communities: HashMap> = get_communities(pool, instance.id, &mut last_incremental_communities_fetch).await?; let site = Site::read_from_instance_id(pool, instance.id).await?; @@ -94,7 +85,7 @@ pub async fn instance_worker( }; let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id).await?; - let inbox_urls = inbox_urls.into_iter().map(|e| (*e).clone()).collect(); + let inbox_urls = inbox_urls.into_iter().collect(); let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &data) .await .into_anyhow()?; @@ -163,15 +154,15 @@ pub async fn instance_worker( fn get_inbox_urls( instance: &Instance, site: &Option, - followed_communities: &HashMap>>, + followed_communities: &HashMap>, activity: &SentActivity, -) -> HashSet> { - let mut inbox_urls = HashSet::new(); +) -> HashSet { + let mut inbox_urls: HashSet = HashSet::new(); if activity.send_all_instances { if let Some(site) = &site { - // todo: when does an instance not have a site? - inbox_urls.insert(intern_url(Cow::Borrowed(site.inbox_url.deref()))); + // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. + inbox_urls.insert(site.inbox_url.inner().clone()); } } for t in &activity.send_community_followers_of { @@ -183,7 +174,7 @@ fn get_inbox_urls( if inbox.domain() != Some(&instance.domain) { continue; } - inbox_urls.insert(intern_url(Cow::Borrowed(inbox.inner()))); + inbox_urls.insert(inbox.inner().clone()); } inbox_urls } @@ -193,7 +184,7 @@ async fn get_communities( pool: &mut DbPool<'_>, instance_id: InstanceId, last_fetch: &mut DateTime, -) -> Result>>> { +) -> Result>> { let e = *last_fetch; *last_fetch = Utc::now(); // update to time before fetch to ensure overlap Ok( @@ -201,10 +192,7 @@ async fn get_communities( .await? .into_iter() .fold(HashMap::new(), |mut map, (c, u)| { - map - .entry(c) - .or_insert_with(HashSet::new) - .insert(intern_url(Cow::Owned(u.into()))); + map.entry(c).or_insert_with(HashSet::new).insert(u.into()); map }), )