This commit is contained in:
phiresky 2023-07-23 21:29:36 +00:00
parent 552120498b
commit a2e3fc870b
8 changed files with 53 additions and 45 deletions

View File

@ -10,7 +10,7 @@ use crate::{
}, },
activity_lists::AnnouncableActivities, activity_lists::AnnouncableActivities,
insert_received_activity, insert_received_activity,
objects::{instance::remote_instance_inboxes, person::ApubPerson}, objects::person::ApubPerson,
protocol::activities::block::block_user::BlockUser, protocol::activities::block::block_user::BlockUser,
}; };
use activitypub_federation::{ use activitypub_federation::{

View File

@ -1,7 +1,7 @@
use crate::{ use crate::{
activities::{generate_activity_id, send_lemmy_activity, verify_is_public, verify_person}, activities::{generate_activity_id, send_lemmy_activity, verify_is_public, verify_person},
insert_received_activity, insert_received_activity,
objects::{instance::remote_instance_inboxes, person::ApubPerson}, objects::person::ApubPerson,
protocol::activities::deletion::delete_user::DeleteUser, protocol::activities::deletion::delete_user::DeleteUser,
SendActivity, SendActivity,
}; };

View File

@ -11,12 +11,16 @@ use activitypub_federation::{
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
}; };
use anyhow::anyhow; use anyhow::anyhow;
use lemmy_api_common::context::LemmyContext; use lemmy_api_common::{
context::LemmyContext,
send_activity::{ActivityChannel, SendActivityData},
};
use lemmy_db_schema::{ use lemmy_db_schema::{
newtypes::CommunityId, newtypes::CommunityId,
source::{ source::{
activity::{ActivityInsertForm, ActivitySendTargets, ActorType}, activity::{ActivitySendTargets, ActorType, SentActivity, SentActivityForm},
community::Community, community::Community,
instance::Instance,
}, },
traits::Crud, traits::Crud,
}; };
@ -168,7 +172,7 @@ async fn send_lemmy_activity<Activity, ActorT>(
data: &Data<LemmyContext>, data: &Data<LemmyContext>,
activity: Activity, activity: Activity,
actor: &ActorT, actor: &ActorT,
send_targets: ActivitySendTargets, mut send_targets: ActivitySendTargets,
sensitive: bool, sensitive: bool,
) -> Result<(), LemmyError> ) -> Result<(), LemmyError>
where where
@ -188,7 +192,7 @@ where
}) })
.await?; .await?;
inbox.retain(|i| { send_targets.inboxes.retain(|i| {
let domain = i.domain().expect("has domain").to_string(); let domain = i.domain().expect("has domain").to_string();
!dead_instances.contains(&domain) !dead_instances.contains(&domain)
}); });
@ -200,10 +204,10 @@ where
data: serde_json::to_value(activity.clone())?, data: serde_json::to_value(activity.clone())?,
sensitive, sensitive,
send_targets, send_targets,
actor_type: actor.actor_type(),
actor_apub_id: actor.id().into(), actor_apub_id: actor.id().into(),
}; };
SentActivity::create(&mut data.pool(), form).await?; SentActivity::create(&mut data.pool(), form).await?;
send_activity(activity, actor, inbox, data).await?;
Ok(()) Ok(())
} }

View File

@ -30,6 +30,14 @@ impl SentActivity {
.first::<Self>(conn) .first::<Self>(conn)
.await .await
} }
pub async fn read(pool: &mut DbPool<'_>, object_id: i64) -> Result<Self, Error> {
use crate::schema::sent_activity::dsl::{id, sent_activity};
let conn = &mut get_conn(pool).await?;
sent_activity
.filter(id.eq(object_id))
.first::<Self>(conn)
.await
}
} }
impl ReceivedActivity { impl ReceivedActivity {
@ -62,7 +70,10 @@ mod tests {
#![allow(clippy::indexing_slicing)] #![allow(clippy::indexing_slicing)]
use super::*; use super::*;
use crate::utils::build_db_pool_for_tests; use crate::{
source::activity::{ActivitySendTargets, ActorType},
utils::build_db_pool_for_tests,
};
use serde_json::json; use serde_json::json;
use serial_test::serial; use serial_test::serial;
use url::Url; use url::Url;
@ -102,6 +113,9 @@ mod tests {
ap_id: ap_id.clone(), ap_id: ap_id.clone(),
data: data.clone(), data: data.clone(),
sensitive, sensitive,
actor_apub_id: Url::parse("http://example.com/u/exampleuser").unwrap(),
actor_type: ActorType::Person,
send_targets: ActivitySendTargets::empty(),
}; };
SentActivity::create(pool, form).await.unwrap(); SentActivity::create(pool, form).await.unwrap();

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
newtypes::{CommunityId, DbUrl}, newtypes::{CommunityId, DbUrl},
schema::{activity, sent_activity}, schema::sent_activity,
}; };
use diesel::{ use diesel::{
deserialize::FromSql, deserialize::FromSql,

View File

@ -9,7 +9,7 @@ use lemmy_api_common::request::build_user_agent;
use lemmy_apub::{VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT}; use lemmy_apub::{VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT};
use lemmy_db_schema::{ use lemmy_db_schema::{
source::instance::Instance, source::instance::Instance,
utils::{build_db_pool, DbPool}, utils::{build_db_pool, ActualDbPool, DbPool},
}; };
use lemmy_utils::{error::LemmyErrorExt2, settings::SETTINGS, REQWEST_TIMEOUT}; use lemmy_utils::{error::LemmyErrorExt2, settings::SETTINGS, REQWEST_TIMEOUT};
use reqwest::Client; use reqwest::Client;
@ -57,17 +57,14 @@ async fn main() -> anyhow::Result<()> {
let process_num = 1 - 1; // todo: pass these in via command line args let process_num = 1 - 1; // todo: pass these in via command line args
let process_count = 1; let process_count = 1;
let mut workers = HashMap::new(); let mut workers = HashMap::new();
let mut pool2 = DbPool::from(&pool);
let (stats_sender, stats_receiver) = unbounded_channel(); let (stats_sender, stats_receiver) = unbounded_channel();
let exit_print = tokio::spawn(receive_print_stats(&mut pool2, stats_receiver)); let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver));
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?;
let pool2 = &mut DbPool::Pool(&pool);
loop { loop {
for (instance, should_federate) in Instance::read_all_with_blocked(&mut pool2) for (instance, should_federate) in Instance::read_all_with_blocked(pool2).await?.into_iter() {
.await?
.into_iter()
{
if instance.id.inner() % process_count != process_num { if instance.id.inner() % process_count != process_num {
continue; continue;
} }
@ -77,7 +74,7 @@ async fn main() -> anyhow::Result<()> {
instance.id, instance.id,
spawn_cancellable(WORKER_EXIT_TIMEOUT, |stop| { spawn_cancellable(WORKER_EXIT_TIMEOUT, |stop| {
instance_worker( instance_worker(
pool2, pool.clone(),
instance, instance,
federation_config.to_request_data(), federation_config.to_request_data(),
stop, stop,
@ -122,9 +119,10 @@ async fn main() -> anyhow::Result<()> {
/// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped) /// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped)
async fn receive_print_stats( async fn receive_print_stats(
mut pool: &mut DbPool<'_>, mut pool: ActualDbPool,
mut receiver: UnboundedReceiver<FederationQueueState>, mut receiver: UnboundedReceiver<FederationQueueState>,
) { ) {
let mut pool = &mut DbPool::Pool(&pool);
let mut printerval = tokio::time::interval(Duration::from_secs(60)); let mut printerval = tokio::time::interval(Duration::from_secs(60));
printerval.tick().await; // skip first printerval.tick().await; // skip first
let mut stats = HashMap::new(); let mut stats = HashMap::new();

View File

@ -8,7 +8,7 @@ use lemmy_apub::{
}; };
use lemmy_db_schema::{ use lemmy_db_schema::{
source::{ source::{
activity::{Activity, ActorType}, activity::{ActorType, SentActivity},
community::Community, community::Community,
person::Person, person::Person,
site::Site, site::Site,
@ -122,8 +122,7 @@ pub fn intern_url<'a>(url: impl Into<Cow<'a, Url>>) -> Arc<Url> {
} }
/// this should maybe be a newtype like all the other PersonId CommunityId etc. /// this should maybe be a newtype like all the other PersonId CommunityId etc.
/// also should be i64 pub type ActivityId = i64;
pub type ActivityId = i32;
/// activities are immutable so cache does not need to have TTL /// activities are immutable so cache does not need to have TTL
/// May return None if the corresponding id does not exist or is a received activity. /// May return None if the corresponding id does not exist or is a received activity.
@ -132,20 +131,16 @@ pub type ActivityId = i32;
pub async fn get_activity_cached( pub async fn get_activity_cached(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
activity_id: ActivityId, activity_id: ActivityId,
) -> Result<Option<Arc<(Activity, SharedInboxActivities)>>> { ) -> Result<Option<Arc<(SentActivity, SharedInboxActivities)>>> {
static ACTIVITIES: Lazy<Cache<ActivityId, Option<Arc<(Activity, SharedInboxActivities)>>>> = static ACTIVITIES: Lazy<Cache<ActivityId, Option<Arc<(SentActivity, SharedInboxActivities)>>>> =
Lazy::new(|| Cache::builder().max_capacity(10000).build()); Lazy::new(|| Cache::builder().max_capacity(10000).build());
ACTIVITIES ACTIVITIES
.try_get_with(activity_id, async { .try_get_with(activity_id, async {
let row = Activity::read(pool, activity_id) let row = SentActivity::read(pool, activity_id)
.await .await
.optional() .optional()
.context("could not read activity")?; .context("could not read activity")?;
let Some(mut row) = row else { return anyhow::Result::<_, anyhow::Error>::Ok(None) }; let Some(mut row) = row else { return anyhow::Result::<_, anyhow::Error>::Ok(None) };
if row.send_targets.is_none() {
// must be a received activity
return Ok(None);
}
// swap to avoid cloning // swap to avoid cloning
let mut data = Value::Null; let mut data = Value::Null;
std::mem::swap(&mut row.data, &mut data); std::mem::swap(&mut row.data, &mut data);
@ -169,7 +164,7 @@ pub async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<ActivityId>
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
let Sequence { let Sequence {
last_value: latest_id, last_value: latest_id,
} = diesel::sql_query("select last_value from activity_id_seq") } = diesel::sql_query("select last_value from sent_activity_id_seq")
.get_result(conn) .get_result(conn)
.await?; .await?;
anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId) anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId)

View File

@ -16,8 +16,8 @@ use anyhow::Result;
use chrono::{DateTime, TimeZone, Utc}; use chrono::{DateTime, TimeZone, Utc};
use lemmy_db_schema::{ use lemmy_db_schema::{
newtypes::{CommunityId, InstanceId}, newtypes::{CommunityId, InstanceId},
source::{activity::Activity, instance::Instance, site::Site}, source::{activity::SentActivity, instance::Instance, site::Site},
utils::DbPool, utils::{ActualDbPool, DbPool},
}; };
use lemmy_db_views_actor::structs::CommunityFollowerView; use lemmy_db_views_actor::structs::CommunityFollowerView;
use lemmy_utils::{error::LemmyErrorExt2, REQWEST_TIMEOUT}; use lemmy_utils::{error::LemmyErrorExt2, REQWEST_TIMEOUT};
@ -38,12 +38,13 @@ static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(10);
/// loop fetch new activities from db and send them to the inboxes of the given instances /// loop fetch new activities from db and send them to the inboxes of the given instances
/// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) /// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit)
pub async fn instance_worker( pub async fn instance_worker(
mut pool: DbPool<'_>, pool: ActualDbPool,
instance: Instance, instance: Instance,
data: Data<()>, data: Data<()>,
stop: CancellationToken, stop: CancellationToken,
stats_sender: UnboundedSender<FederationQueueState>, stats_sender: UnboundedSender<FederationQueueState>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let mut pool = &mut DbPool::Pool(&pool);
let mut last_full_communities_fetch = Utc.timestamp_nanos(0); let mut last_full_communities_fetch = Utc.timestamp_nanos(0);
let mut last_incremental_communities_fetch = Utc.timestamp_nanos(0); let mut last_incremental_communities_fetch = Utc.timestamp_nanos(0);
let mut last_state_insert = Utc.timestamp_nanos(0); let mut last_state_insert = Utc.timestamp_nanos(0);
@ -92,15 +93,13 @@ pub async fn instance_worker(
state.last_successful_id = id; state.last_successful_id = id;
continue; continue;
} }
let actor = { let actor = get_actor_cached(
// these should always be set for sent activities &mut pool,
let (Some(actor_type), Some(apub_id)) = (activity.actor_type, &activity.actor_apub_id) else { activity.actor_type,
tracing::warn!("activity {id} does not have actor_type or actor_apub_id set"); activity.actor_apub_id.deref(),
state.last_successful_id = id; )
continue; .await?;
};
get_actor_cached(&mut pool, actor_type, apub_id.deref()).await?
};
let inbox_urls = inbox_urls.into_iter().map(|e| (*e).clone()).collect(); let inbox_urls = inbox_urls.into_iter().map(|e| (*e).clone()).collect();
let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &data) let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &data)
.await .await
@ -169,12 +168,10 @@ fn get_inbox_urls(
instance: &Instance, instance: &Instance,
site: &Option<Site>, site: &Option<Site>,
followed_communities: &HashMap<CommunityId, HashSet<Arc<Url>>>, followed_communities: &HashMap<CommunityId, HashSet<Arc<Url>>>,
activity: &Activity, activity: &SentActivity,
) -> HashSet<Arc<Url>> { ) -> HashSet<Arc<Url>> {
let mut inbox_urls = HashSet::new(); let mut inbox_urls = HashSet::new();
let Some(targets) = &activity.send_targets else { let targets = &activity.send_targets;
return inbox_urls;
};
if targets.all_instances { if targets.all_instances {
if let Some(site) = &site { if let Some(site) = &site {
// todo: when does an instance not have a site? // todo: when does an instance not have a site?