diff --git a/crates/apub/src/activities/block/block_user.rs b/crates/apub/src/activities/block/block_user.rs index 207c39903..cbc6658c9 100644 --- a/crates/apub/src/activities/block/block_user.rs +++ b/crates/apub/src/activities/block/block_user.rs @@ -10,7 +10,7 @@ use crate::{ }, activity_lists::AnnouncableActivities, insert_received_activity, - objects::{instance::remote_instance_inboxes, person::ApubPerson}, + objects::person::ApubPerson, protocol::activities::block::block_user::BlockUser, }; use activitypub_federation::{ diff --git a/crates/apub/src/activities/deletion/delete_user.rs b/crates/apub/src/activities/deletion/delete_user.rs index a4bd36919..a30393629 100644 --- a/crates/apub/src/activities/deletion/delete_user.rs +++ b/crates/apub/src/activities/deletion/delete_user.rs @@ -1,7 +1,7 @@ use crate::{ activities::{generate_activity_id, send_lemmy_activity, verify_is_public, verify_person}, insert_received_activity, - objects::{instance::remote_instance_inboxes, person::ApubPerson}, + objects::person::ApubPerson, protocol::activities::deletion::delete_user::DeleteUser, SendActivity, }; diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index e2513cb20..0c9a9313b 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -11,12 +11,16 @@ use activitypub_federation::{ traits::{ActivityHandler, Actor}, }; use anyhow::anyhow; -use lemmy_api_common::context::LemmyContext; +use lemmy_api_common::{ + context::LemmyContext, + send_activity::{ActivityChannel, SendActivityData}, +}; use lemmy_db_schema::{ newtypes::CommunityId, source::{ - activity::{ActivityInsertForm, ActivitySendTargets, ActorType}, + activity::{ActivitySendTargets, ActorType, SentActivity, SentActivityForm}, community::Community, + instance::Instance, }, traits::Crud, }; @@ -168,7 +172,7 @@ async fn send_lemmy_activity( data: &Data, activity: Activity, actor: &ActorT, - send_targets: ActivitySendTargets, + mut send_targets: ActivitySendTargets, sensitive: bool, ) -> Result<(), LemmyError> where @@ -188,7 +192,7 @@ where }) .await?; - inbox.retain(|i| { + send_targets.inboxes.retain(|i| { let domain = i.domain().expect("has domain").to_string(); !dead_instances.contains(&domain) }); @@ -200,10 +204,10 @@ where data: serde_json::to_value(activity.clone())?, sensitive, send_targets, + actor_type: actor.actor_type(), actor_apub_id: actor.id().into(), }; SentActivity::create(&mut data.pool(), form).await?; - send_activity(activity, actor, inbox, data).await?; Ok(()) } diff --git a/crates/db_schema/src/impls/activity.rs b/crates/db_schema/src/impls/activity.rs index 8c4a0a15d..a7aab933f 100644 --- a/crates/db_schema/src/impls/activity.rs +++ b/crates/db_schema/src/impls/activity.rs @@ -30,6 +30,14 @@ impl SentActivity { .first::(conn) .await } + pub async fn read(pool: &mut DbPool<'_>, object_id: i64) -> Result { + use crate::schema::sent_activity::dsl::{id, sent_activity}; + let conn = &mut get_conn(pool).await?; + sent_activity + .filter(id.eq(object_id)) + .first::(conn) + .await + } } impl ReceivedActivity { @@ -62,7 +70,10 @@ mod tests { #![allow(clippy::indexing_slicing)] use super::*; - use crate::utils::build_db_pool_for_tests; + use crate::{ + source::activity::{ActivitySendTargets, ActorType}, + utils::build_db_pool_for_tests, + }; use serde_json::json; use serial_test::serial; use url::Url; @@ -102,6 +113,9 @@ mod tests { ap_id: ap_id.clone(), data: data.clone(), sensitive, + actor_apub_id: Url::parse("http://example.com/u/exampleuser").unwrap(), + actor_type: ActorType::Person, + send_targets: ActivitySendTargets::empty(), }; SentActivity::create(pool, form).await.unwrap(); diff --git a/crates/db_schema/src/source/activity.rs b/crates/db_schema/src/source/activity.rs index 9e81f5515..195d953cb 100644 --- a/crates/db_schema/src/source/activity.rs +++ b/crates/db_schema/src/source/activity.rs @@ -1,6 +1,6 @@ use crate::{ newtypes::{CommunityId, DbUrl}, - schema::{activity, sent_activity}, + schema::sent_activity, }; use diesel::{ deserialize::FromSql, diff --git a/crates/federate/src/main.rs b/crates/federate/src/main.rs index b727324bd..142f2b078 100644 --- a/crates/federate/src/main.rs +++ b/crates/federate/src/main.rs @@ -9,7 +9,7 @@ use lemmy_api_common::request::build_user_agent; use lemmy_apub::{VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT}; use lemmy_db_schema::{ source::instance::Instance, - utils::{build_db_pool, DbPool}, + utils::{build_db_pool, ActualDbPool, DbPool}, }; use lemmy_utils::{error::LemmyErrorExt2, settings::SETTINGS, REQWEST_TIMEOUT}; use reqwest::Client; @@ -57,17 +57,14 @@ async fn main() -> anyhow::Result<()> { let process_num = 1 - 1; // todo: pass these in via command line args let process_count = 1; let mut workers = HashMap::new(); - let mut pool2 = DbPool::from(&pool); let (stats_sender, stats_receiver) = unbounded_channel(); - let exit_print = tokio::spawn(receive_print_stats(&mut pool2, stats_receiver)); + let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver)); let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; + let pool2 = &mut DbPool::Pool(&pool); loop { - for (instance, should_federate) in Instance::read_all_with_blocked(&mut pool2) - .await? - .into_iter() - { + for (instance, should_federate) in Instance::read_all_with_blocked(pool2).await?.into_iter() { if instance.id.inner() % process_count != process_num { continue; } @@ -77,7 +74,7 @@ async fn main() -> anyhow::Result<()> { instance.id, spawn_cancellable(WORKER_EXIT_TIMEOUT, |stop| { instance_worker( - pool2, + pool.clone(), instance, federation_config.to_request_data(), stop, @@ -122,9 +119,10 @@ async fn main() -> anyhow::Result<()> { /// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped) async fn receive_print_stats( - mut pool: &mut DbPool<'_>, + mut pool: ActualDbPool, mut receiver: UnboundedReceiver, ) { + let mut pool = &mut DbPool::Pool(&pool); let mut printerval = tokio::time::interval(Duration::from_secs(60)); printerval.tick().await; // skip first let mut stats = HashMap::new(); diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index c2f589a86..e53c338ab 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -8,7 +8,7 @@ use lemmy_apub::{ }; use lemmy_db_schema::{ source::{ - activity::{Activity, ActorType}, + activity::{ActorType, SentActivity}, community::Community, person::Person, site::Site, @@ -122,8 +122,7 @@ pub fn intern_url<'a>(url: impl Into>) -> Arc { } /// this should maybe be a newtype like all the other PersonId CommunityId etc. -/// also should be i64 -pub type ActivityId = i32; +pub type ActivityId = i64; /// activities are immutable so cache does not need to have TTL /// May return None if the corresponding id does not exist or is a received activity. @@ -132,20 +131,16 @@ pub type ActivityId = i32; pub async fn get_activity_cached( pool: &mut DbPool<'_>, activity_id: ActivityId, -) -> Result>> { - static ACTIVITIES: Lazy>>> = +) -> Result>> { + static ACTIVITIES: Lazy>>> = Lazy::new(|| Cache::builder().max_capacity(10000).build()); ACTIVITIES .try_get_with(activity_id, async { - let row = Activity::read(pool, activity_id) + let row = SentActivity::read(pool, activity_id) .await .optional() .context("could not read activity")?; let Some(mut row) = row else { return anyhow::Result::<_, anyhow::Error>::Ok(None) }; - if row.send_targets.is_none() { - // must be a received activity - return Ok(None); - } // swap to avoid cloning let mut data = Value::Null; std::mem::swap(&mut row.data, &mut data); @@ -169,7 +164,7 @@ pub async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result let conn = &mut get_conn(pool).await?; let Sequence { last_value: latest_id, - } = diesel::sql_query("select last_value from activity_id_seq") + } = diesel::sql_query("select last_value from sent_activity_id_seq") .get_result(conn) .await?; anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 98e569943..1b6b043f5 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -16,8 +16,8 @@ use anyhow::Result; use chrono::{DateTime, TimeZone, Utc}; use lemmy_db_schema::{ newtypes::{CommunityId, InstanceId}, - source::{activity::Activity, instance::Instance, site::Site}, - utils::DbPool, + source::{activity::SentActivity, instance::Instance, site::Site}, + utils::{ActualDbPool, DbPool}, }; use lemmy_db_views_actor::structs::CommunityFollowerView; use lemmy_utils::{error::LemmyErrorExt2, REQWEST_TIMEOUT}; @@ -38,12 +38,13 @@ static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(10); /// loop fetch new activities from db and send them to the inboxes of the given instances /// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) pub async fn instance_worker( - mut pool: DbPool<'_>, + pool: ActualDbPool, instance: Instance, data: Data<()>, stop: CancellationToken, stats_sender: UnboundedSender, ) -> Result<(), anyhow::Error> { + let mut pool = &mut DbPool::Pool(&pool); let mut last_full_communities_fetch = Utc.timestamp_nanos(0); let mut last_incremental_communities_fetch = Utc.timestamp_nanos(0); let mut last_state_insert = Utc.timestamp_nanos(0); @@ -92,15 +93,13 @@ pub async fn instance_worker( state.last_successful_id = id; continue; } - let actor = { - // these should always be set for sent activities - let (Some(actor_type), Some(apub_id)) = (activity.actor_type, &activity.actor_apub_id) else { - tracing::warn!("activity {id} does not have actor_type or actor_apub_id set"); - state.last_successful_id = id; - continue; - }; - get_actor_cached(&mut pool, actor_type, apub_id.deref()).await? - }; + let actor = get_actor_cached( + &mut pool, + activity.actor_type, + activity.actor_apub_id.deref(), + ) + .await?; + let inbox_urls = inbox_urls.into_iter().map(|e| (*e).clone()).collect(); let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &data) .await @@ -169,12 +168,10 @@ fn get_inbox_urls( instance: &Instance, site: &Option, followed_communities: &HashMap>>, - activity: &Activity, + activity: &SentActivity, ) -> HashSet> { let mut inbox_urls = HashSet::new(); - let Some(targets) = &activity.send_targets else { - return inbox_urls; - }; + let targets = &activity.send_targets; if targets.all_instances { if let Some(site) = &site { // todo: when does an instance not have a site?