This commit is contained in:
phiresky 2023-08-02 23:11:47 +00:00
parent bddcbab4cd
commit e6e96a71e6
7 changed files with 40 additions and 56 deletions

View File

@ -12,15 +12,8 @@ use activitypub_federation::{
protocol::verification::verify_domains_match,
traits::{ActivityHandler, Actor, Object},
};
use lemmy_api_common::{
context::LemmyContext,
private_message::{CreatePrivateMessage, EditPrivateMessage, PrivateMessageResponse},
};
use lemmy_db_schema::{
newtypes::PersonId,
source::{activity::ActivitySendTargets, person::Person, private_message::PrivateMessage},
traits::Crud,
};
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::source::activity::ActivitySendTargets;
use lemmy_db_views::structs::PrivateMessageView;
use lemmy_utils::error::LemmyError;
use url::Url;

View File

@ -10,11 +10,7 @@ use activitypub_federation::{
protocol::verification::verify_urls_match,
traits::{ActivityHandler, Actor},
};
use lemmy_api_common::{
context::LemmyContext,
person::{DeleteAccount, DeleteAccountResponse},
utils::{delete_user_account, local_user_view_from_jwt},
};
use lemmy_api_common::{context::LemmyContext, utils::delete_user_account};
use lemmy_db_schema::source::{activity::ActivitySendTargets, person::Person};
use lemmy_utils::error::LemmyError;
use url::Url;

View File

@ -20,7 +20,7 @@ use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{
source::{
activity::ActivitySendTargets,
community::{Community, CommunityFollower, CommunityFollowerForm},
community::{CommunityFollower, CommunityFollowerForm},
person::{PersonFollower, PersonFollowerForm},
},
traits::Followable,

View File

@ -19,7 +19,7 @@ pub struct FederationQueueState {
impl FederationQueueState {
/// load or return a default empty value
pub async fn load(pool: &mut DbPool<'_>, domain_: &str) -> Result<FederationQueueState> {
use lemmy_db_schema::schema::federation_queue_state::dsl::*;
use lemmy_db_schema::schema::federation_queue_state::dsl::federation_queue_state;
let conn = &mut get_conn(pool).await?;
Ok(
federation_queue_state
@ -37,8 +37,8 @@ impl FederationQueueState {
)
}
pub async fn upsert(pool: &mut DbPool<'_>, state: &FederationQueueState) -> Result<()> {
use lemmy_db_schema::schema::federation_queue_state::dsl::{domain, federation_queue_state};
let conn = &mut get_conn(pool).await?;
use lemmy_db_schema::schema::federation_queue_state::dsl::*;
state
.insert_into(federation_queue_state)

View File

@ -53,7 +53,7 @@ async fn start_stop_federation_workers<T: Clone + Send + Sync + 'static>(
let mut total_count = 0;
let mut dead_count = 0;
let mut disallowed_count = 0;
for (instance, allowed) in Instance::read_all_with_blocked(pool2).await?.into_iter() {
for (instance, allowed) in Instance::read_all_with_blocked(pool2).await? {
if instance.id.inner() % opts.process_count != process_index {
continue;
}
@ -102,7 +102,7 @@ async fn start_stop_federation_workers<T: Clone + Send + Sync + 'static>(
WORKER_EXIT_TIMEOUT
);
// the cancel futures need to be awaited concurrently for the shutdown processes to be triggered concurrently
futures::future::join_all(workers.into_values().map(|e| e.cancel())).await;
futures::future::join_all(workers.into_values().map(util::CancellableTask::cancel)).await;
exit_print.await?;
Ok(())
}
@ -124,7 +124,7 @@ async fn receive_print_stats(
pool: ActualDbPool,
mut receiver: UnboundedReceiver<FederationQueueState>,
) {
let mut pool = &mut DbPool::Pool(&pool);
let pool = &mut DbPool::Pool(&pool);
let mut printerval = tokio::time::interval(Duration::from_secs(60));
printerval.tick().await; // skip first
let mut stats = HashMap::new();
@ -133,13 +133,13 @@ async fn receive_print_stats(
ele = receiver.recv() => {
let Some(ele) = ele else {
tracing::info!("done. quitting");
print_stats(&mut pool, &stats).await;
print_stats(pool, &stats).await;
return;
};
stats.insert(ele.domain.clone(), ele);
},
_ = printerval.tick() => {
print_stats(&mut pool, &stats).await;
print_stats(pool, &stats).await;
}
}
}
@ -154,7 +154,10 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap<String, FederationQu
// it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be considered up to date
tracing::info!(
"Federation state as of {}:",
Local::now().with_nanosecond(0).unwrap().to_rfc3339()
Local::now()
.with_nanosecond(0)
.expect("0 is valid nanos")
.to_rfc3339()
);
// todo: less noisy output (only output failing instances and summary for successful)
// todo: more stats (act/sec, avg http req duration)
@ -169,12 +172,10 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap<String, FederationQu
stat.fail_count,
retry_sleep_duration(stat.fail_count)
);
} else if behind > 0 {
tracing::info!("{}: Ok. {} behind", stat.domain, behind);
} else {
if behind > 0 {
tracing::info!("{}: Ok. {} behind", stat.domain, behind);
} else {
ok_count += 1;
}
ok_count += 1;
}
}
tracing::info!("{ok_count} others up to date");

View File

@ -135,6 +135,7 @@ pub fn intern_url<'a>(url: impl Into<Cow<'a, Url>>) -> Arc<Url> {
/// this should maybe be a newtype like all the other PersonId CommunityId etc.
pub type ActivityId = i64;
type CachedActivityInfo = Option<Arc<(SentActivity, SharedInboxActivities)>>;
/// activities are immutable so cache does not need to have TTL
/// May return None if the corresponding id does not exist or is a received activity.
/// Holes in serials are expected behaviour in postgresql
@ -142,8 +143,8 @@ pub type ActivityId = i64;
pub async fn get_activity_cached(
pool: &mut DbPool<'_>,
activity_id: ActivityId,
) -> Result<Option<Arc<(SentActivity, SharedInboxActivities)>>> {
static ACTIVITIES: Lazy<Cache<ActivityId, Option<Arc<(SentActivity, SharedInboxActivities)>>>> =
) -> Result<CachedActivityInfo> {
static ACTIVITIES: Lazy<Cache<ActivityId, CachedActivityInfo>> =
Lazy::new(|| Cache::builder().max_capacity(10000).build());
ACTIVITIES
.try_get_with(activity_id, async {
@ -186,7 +187,7 @@ pub async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<ActivityId>
/// how long to sleep based on how many retries have already happened
pub fn retry_sleep_duration(retry_count: i32) -> Duration {
Duration::from_secs_f64(10.0 * 2.0_f64.powf(retry_count as f64))
Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count)))
}
#[derive(QueryableByName)]

View File

@ -44,19 +44,15 @@ pub async fn instance_worker(
stop: CancellationToken,
stats_sender: UnboundedSender<FederationQueueState>,
) -> Result<(), anyhow::Error> {
let mut pool = &mut DbPool::Pool(&pool);
let pool = &mut DbPool::Pool(&pool);
let mut last_full_communities_fetch = Utc.timestamp_nanos(0);
let mut last_incremental_communities_fetch = Utc.timestamp_nanos(0);
let mut last_state_insert = Utc.timestamp_nanos(0);
let mut followed_communities: HashMap<CommunityId, HashSet<Arc<Url>>> = get_communities(
&mut pool,
instance.id,
&mut last_incremental_communities_fetch,
)
.await?;
let site = Site::read_from_instance_id(&mut pool, instance.id).await?;
let mut followed_communities: HashMap<CommunityId, HashSet<Arc<Url>>> =
get_communities(pool, instance.id, &mut last_incremental_communities_fetch).await?;
let site = Site::read_from_instance_id(pool, instance.id).await?;
let mut state = FederationQueueState::load(&mut pool, &instance.domain).await?;
let mut state = FederationQueueState::load(pool, &instance.domain).await?;
if state.fail_count > 0 {
// before starting queue, sleep remaining duration
let elapsed = (Utc::now() - state.last_retry).to_std()?;
@ -67,7 +63,7 @@ pub async fn instance_worker(
}
}
while !stop.is_cancelled() {
let latest_id = get_latest_activity_id(&mut pool).await?;
let latest_id = get_latest_activity_id(pool).await?;
let mut id = state.last_successful_id;
if id == latest_id {
// no more work to be done, wait before rechecking
@ -83,7 +79,7 @@ pub async fn instance_worker(
{
id += 1;
processed_activities += 1;
let Some(ele) = get_activity_cached(&mut pool, id).await? else {
let Some(ele) = get_activity_cached(pool, id).await? else {
state.last_successful_id = id;
continue;
};
@ -96,7 +92,7 @@ pub async fn instance_worker(
let Some(actor_apub_id) = &activity.actor_apub_id else {
continue; // activity was inserted before persistent queue was activated
};
let actor = get_actor_cached(&mut pool, activity.actor_type, actor_apub_id).await?;
let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id).await?;
let inbox_urls = inbox_urls.into_iter().map(|e| (*e).clone()).collect();
let requests = prepare_raw(object, actor.as_ref(), inbox_urls, &data)
@ -116,7 +112,7 @@ pub async fn instance_worker(
state.fail_count
);
stats_sender.send(state.clone())?;
FederationQueueState::upsert(&mut pool, &state).await?;
FederationQueueState::upsert(pool, &state).await?;
req = sign_raw(&task, &data, REQWEST_TIMEOUT).await?; // resign request
tokio::select! {
() = sleep(retry_delay) => {},
@ -132,9 +128,11 @@ pub async fn instance_worker(
state.fail_count = 0;
}
if Utc::now() - last_state_insert > chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).unwrap() {
if Utc::now() - last_state_insert
> chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative")
{
last_state_insert = Utc::now();
FederationQueueState::upsert(&mut pool, &state).await?;
FederationQueueState::upsert(pool, &state).await?;
stats_sender.send(state.clone())?;
}
{
@ -142,20 +140,15 @@ pub async fn instance_worker(
if (Utc::now() - last_incremental_communities_fetch) > chrono::Duration::seconds(10) {
// process additions every 10s
followed_communities.extend(
get_communities(
&mut pool,
instance.id,
&mut last_incremental_communities_fetch,
)
.await?,
get_communities(pool, instance.id, &mut last_incremental_communities_fetch).await?,
);
}
if (Utc::now() - last_full_communities_fetch) > chrono::Duration::seconds(300) {
// process removals every 5min
last_full_communities_fetch = Utc.timestamp_nanos(0);
followed_communities =
get_communities(&mut pool, instance.id, &mut last_full_communities_fetch).await?;
last_incremental_communities_fetch = last_full_communities_fetch.clone();
get_communities(pool, instance.id, &mut last_full_communities_fetch).await?;
last_incremental_communities_fetch = last_full_communities_fetch;
}
}
}
@ -183,7 +176,7 @@ fn get_inbox_urls(
}
for t in &targets.community_followers_of {
if let Some(urls) = followed_communities.get(t) {
inbox_urls.extend(urls.iter().map(|e| e.clone()));
inbox_urls.extend(urls.iter().map(std::clone::Clone::clone));
}
}
for inbox in &targets.inboxes {
@ -210,7 +203,7 @@ async fn get_communities(
.fold(HashMap::new(), |mut map, (c, u)| {
map
.entry(c)
.or_insert_with(|| HashSet::new())
.or_insert_with(HashSet::new)
.insert(intern_url(Cow::Owned(u.into())));
map
}),