remove json column, use separate array columns instead

This commit is contained in:
phiresky 2023-08-03 11:27:17 +00:00
parent 0f775341a6
commit ef60dc0560
8 changed files with 155 additions and 102 deletions

View File

@ -205,7 +205,17 @@ where
ap_id: activity.id().clone().into(), ap_id: activity.id().clone().into(),
data: serde_json::to_value(activity.clone())?, data: serde_json::to_value(activity.clone())?,
sensitive, sensitive,
send_targets, send_inboxes: send_targets
.inboxes
.into_iter()
.map(|e| Some(e.into()))
.collect(),
send_all_instances: send_targets.all_instances,
send_community_followers_of: send_targets
.community_followers_of
.into_iter()
.map(|e| Some(e.0))
.collect(),
actor_type: actor.actor_type(), actor_type: actor.actor_type(),
actor_apub_id: actor.id().into(), actor_apub_id: actor.id().into(),
}; };

View File

@ -70,10 +70,7 @@ mod tests {
#![allow(clippy::indexing_slicing)] #![allow(clippy::indexing_slicing)]
use super::*; use super::*;
use crate::{ use crate::{source::activity::ActorType, utils::build_db_pool_for_tests};
source::activity::{ActivitySendTargets, ActorType},
utils::build_db_pool_for_tests,
};
use serde_json::json; use serde_json::json;
use serial_test::serial; use serial_test::serial;
use url::Url; use url::Url;
@ -117,7 +114,9 @@ mod tests {
.unwrap() .unwrap()
.into(), .into(),
actor_type: ActorType::Person, actor_type: ActorType::Person,
send_targets: ActivitySendTargets::empty(), send_all_instances: false,
send_community_followers_of: vec![],
send_inboxes: vec![],
}; };
SentActivity::create(pool, form).await.unwrap(); SentActivity::create(pool, form).await.unwrap();

View File

@ -162,7 +162,7 @@ pub struct CustomEmojiId(i32);
pub struct LtreeDef(pub String); pub struct LtreeDef(pub String);
#[repr(transparent)] #[repr(transparent)]
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] #[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug, Hash)]
#[cfg_attr(feature = "full", derive(AsExpression, FromSqlRow))] #[cfg_attr(feature = "full", derive(AsExpression, FromSqlRow))]
#[cfg_attr(feature = "full", diesel(sql_type = diesel::sql_types::Text))] #[cfg_attr(feature = "full", diesel(sql_type = diesel::sql_types::Text))]
pub struct DbUrl(pub(crate) Box<Url>); pub struct DbUrl(pub(crate) Box<Url>);

View File

@ -1,21 +1,21 @@
// @generated automatically by Diesel CLI. // @generated automatically by Diesel CLI.
pub mod sql_types { pub mod sql_types {
#[derive(diesel::sql_types::SqlType)] #[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "actor_type_enum"))] #[diesel(postgres_type(name = "actor_type_enum"))]
pub struct ActorTypeEnum; pub struct ActorTypeEnum;
#[derive(diesel::sql_types::SqlType)] #[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "listing_type_enum"))] #[diesel(postgres_type(name = "listing_type_enum"))]
pub struct ListingTypeEnum; pub struct ListingTypeEnum;
#[derive(diesel::sql_types::SqlType)] #[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "registration_mode_enum"))] #[diesel(postgres_type(name = "registration_mode_enum"))]
pub struct RegistrationModeEnum; pub struct RegistrationModeEnum;
#[derive(diesel::sql_types::SqlType)] #[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "sort_type_enum"))] #[diesel(postgres_type(name = "sort_type_enum"))]
pub struct SortTypeEnum; pub struct SortTypeEnum;
} }
diesel::table! { diesel::table! {
@ -409,9 +409,9 @@ diesel::table! {
totp_2fa_secret -> Nullable<Text>, totp_2fa_secret -> Nullable<Text>,
totp_2fa_url -> Nullable<Text>, totp_2fa_url -> Nullable<Text>,
open_links_in_new_tab -> Bool, open_links_in_new_tab -> Bool,
infinite_scroll_enabled -> Bool,
blur_nsfw -> Bool, blur_nsfw -> Bool,
auto_expand -> Bool, auto_expand -> Bool,
infinite_scroll_enabled -> Bool,
} }
} }
@ -805,7 +805,9 @@ diesel::table! {
data -> Json, data -> Json,
sensitive -> Bool, sensitive -> Bool,
published -> Timestamp, published -> Timestamp,
send_targets -> Jsonb, send_inboxes -> Array<Nullable<Text>>,
send_community_followers_of -> Array<Nullable<Int4>>,
send_all_instances -> Bool,
actor_type -> ActorTypeEnum, actor_type -> ActorTypeEnum,
actor_apub_id -> Nullable<Text>, actor_apub_id -> Nullable<Text>,
} }
@ -953,69 +955,69 @@ diesel::joinable!(site_language -> site (site_id));
diesel::joinable!(tagline -> local_site (local_site_id)); diesel::joinable!(tagline -> local_site (local_site_id));
diesel::allow_tables_to_appear_in_same_query!( diesel::allow_tables_to_appear_in_same_query!(
admin_purge_comment, admin_purge_comment,
admin_purge_community, admin_purge_community,
admin_purge_person, admin_purge_person,
admin_purge_post, admin_purge_post,
captcha_answer, captcha_answer,
comment, comment,
comment_aggregates, comment_aggregates,
comment_like, comment_like,
comment_reply, comment_reply,
comment_report, comment_report,
comment_saved, comment_saved,
community, community,
community_aggregates, community_aggregates,
community_block, community_block,
community_follower, community_follower,
community_language, community_language,
community_moderator, community_moderator,
community_person_ban, community_person_ban,
custom_emoji, custom_emoji,
custom_emoji_keyword, custom_emoji_keyword,
email_verification, email_verification,
federation_allowlist, federation_allowlist,
federation_blocklist, federation_blocklist,
federation_queue_state, federation_queue_state,
instance, instance,
language, language,
local_site, local_site,
local_site_rate_limit, local_site_rate_limit,
local_user, local_user,
local_user_language, local_user_language,
mod_add, mod_add,
mod_add_community, mod_add_community,
mod_ban, mod_ban,
mod_ban_from_community, mod_ban_from_community,
mod_feature_post, mod_feature_post,
mod_hide_community, mod_hide_community,
mod_lock_post, mod_lock_post,
mod_remove_comment, mod_remove_comment,
mod_remove_community, mod_remove_community,
mod_remove_post, mod_remove_post,
mod_transfer_community, mod_transfer_community,
password_reset_request, password_reset_request,
person, person,
person_aggregates, person_aggregates,
person_ban, person_ban,
person_block, person_block,
person_follower, person_follower,
person_mention, person_mention,
person_post_aggregates, person_post_aggregates,
post, post,
post_aggregates, post_aggregates,
post_like, post_like,
post_read, post_read,
post_report, post_report,
post_saved, post_saved,
private_message, private_message,
private_message_report, private_message_report,
received_activity, received_activity,
registration_application, registration_application,
secret, secret,
sent_activity, sent_activity,
site, site,
site_aggregates, site_aggregates,
site_language, site_language,
tagline, tagline,
); );

View File

@ -3,13 +3,15 @@ use crate::{
schema::sent_activity, schema::sent_activity,
}; };
use diesel::{ use diesel::{
backend::Backend,
deserialize::FromSql, deserialize::FromSql,
pg::{Pg, PgValue}, pg::{Pg, PgValue},
serialize::{Output, ToSql}, serialize::{Output, ToSql},
sql_types::Jsonb, sql_types::{Array, Jsonb, Nullable},
Queryable,
}; };
use serde_json::Value; use serde_json::Value;
use std::{collections::HashSet, fmt::Debug, io::Write}; use std::{collections::HashSet, fmt::Debug, hash::Hash, io::Write};
use url::Url; use url::Url;
#[derive( #[derive(
@ -72,17 +74,50 @@ pub struct SentActivity {
pub data: Value, pub data: Value,
pub sensitive: bool, pub sensitive: bool,
pub published: chrono::NaiveDateTime, pub published: chrono::NaiveDateTime,
pub send_targets: ActivitySendTargets, #[diesel(deserialize_as = ArrayToHashSet<DbUrl>)]
pub send_inboxes: HashSet<DbUrl>,
#[diesel(deserialize_as = ArrayToHashSet<CommunityId>)]
pub send_community_followers_of: HashSet<CommunityId>,
pub send_all_instances: bool,
pub actor_type: ActorType, pub actor_type: ActorType,
pub actor_apub_id: Option<DbUrl>, pub actor_apub_id: Option<DbUrl>,
} }
// wrapper to remove optional from array values and convert to hashset
pub struct ArrayToHashSet<T>(HashSet<T>);
impl<DB, T1, T2> Queryable<Array<Nullable<T2>>, DB> for ArrayToHashSet<T1>
where
DB: Backend,
T1: FromSql<T2, DB> + Hash + Eq,
Vec<std::option::Option<T1>>: FromSql<Array<Nullable<T2>>, DB>,
{
type Row = Vec<Option<T1>>;
fn build(row: Self::Row) -> diesel::deserialize::Result<Self> {
let res: diesel::deserialize::Result<HashSet<T1>> = row
.into_iter()
.map(|e| e.ok_or("array with null element".into()))
.collect();
res.map(ArrayToHashSet)
}
}
impl<T> From<ArrayToHashSet<T>> for HashSet<T> {
fn from(val: ArrayToHashSet<T>) -> Self {
val.0
}
}
#[derive(Insertable)] #[derive(Insertable)]
#[diesel(table_name = sent_activity)] #[diesel(table_name = sent_activity)]
pub struct SentActivityForm { pub struct SentActivityForm {
pub ap_id: DbUrl, pub ap_id: DbUrl,
pub data: Value, pub data: Value,
pub sensitive: bool, pub sensitive: bool,
pub send_targets: ActivitySendTargets, pub send_inboxes: Vec<Option<DbUrl>>,
pub send_community_followers_of: Vec<Option<i32>>,
pub send_all_instances: bool,
pub actor_type: ActorType, pub actor_type: ActorType,
pub actor_apub_id: DbUrl, pub actor_apub_id: DbUrl,
} }

View File

@ -32,7 +32,7 @@ use std::{
use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
/// save state to db every n sends if there's no failures (otherwise state is saved after every attempt) /// save state to db every n sends if there's no failures (otherwise state is saved after every attempt)
static SAVE_STATE_EVERY_IT: i64 = 100; static CHECK_SAVE_STATE_EVERY_IT: i64 = 100;
static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(10); static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(10);
/// loop fetch new activities from db and send them to the inboxes of the given instances /// loop fetch new activities from db and send them to the inboxes of the given instances
@ -74,7 +74,7 @@ pub async fn instance_worker(
} }
let mut processed_activities = 0; let mut processed_activities = 0;
'batch: while id < latest_id 'batch: while id < latest_id
&& processed_activities < SAVE_STATE_EVERY_IT && processed_activities < CHECK_SAVE_STATE_EVERY_IT
&& !stop.is_cancelled() && !stop.is_cancelled()
{ {
id += 1; id += 1;
@ -167,23 +167,23 @@ fn get_inbox_urls(
activity: &SentActivity, activity: &SentActivity,
) -> HashSet<Arc<Url>> { ) -> HashSet<Arc<Url>> {
let mut inbox_urls = HashSet::new(); let mut inbox_urls = HashSet::new();
let targets = &activity.send_targets;
if targets.all_instances { if activity.send_all_instances {
if let Some(site) = &site { if let Some(site) = &site {
// todo: when does an instance not have a site? // todo: when does an instance not have a site?
inbox_urls.insert(intern_url(Cow::Borrowed(site.inbox_url.deref()))); inbox_urls.insert(intern_url(Cow::Borrowed(site.inbox_url.deref())));
} }
} }
for t in &targets.community_followers_of { for t in &activity.send_community_followers_of {
if let Some(urls) = followed_communities.get(t) { if let Some(urls) = followed_communities.get(t) {
inbox_urls.extend(urls.iter().map(std::clone::Clone::clone)); inbox_urls.extend(urls.iter().map(std::clone::Clone::clone));
} }
} }
for inbox in &targets.inboxes { for inbox in &activity.send_inboxes {
if inbox.domain() != Some(&instance.domain) { if inbox.domain() != Some(&instance.domain) {
continue; continue;
} }
inbox_urls.insert(intern_url(Cow::Borrowed(inbox))); inbox_urls.insert(intern_url(Cow::Borrowed(inbox.inner())));
} }
inbox_urls inbox_urls
} }

View File

@ -1,5 +1,7 @@
ALTER TABLE sent_activity ALTER TABLE sent_activity
DROP COLUMN send_targets, DROP COLUMN send_inboxes,
DROP COLUMN send_community_followers_of,
DROP COLUMN send_all_instances,
DROP COLUMN actor_apub_id, DROP COLUMN actor_apub_id,
DROP COLUMN actor_type; DROP COLUMN actor_type;

View File

@ -4,15 +4,20 @@ CREATE TYPE actor_type_enum AS enum(
'person' 'person'
); );
-- actor_apub_id only null for old entries -- actor_apub_id only null for old entries before this migration
ALTER TABLE sent_activity ALTER TABLE sent_activity
ADD COLUMN send_targets jsonb NOT NULL DEFAULT '{"inboxes": [], "community_followers_of": [], "all_instances": false}', ADD COLUMN send_inboxes text[] NOT NULL DEFAULT '{}', -- list of specific inbox urls
ADD COLUMN send_community_followers_of integer[] NOT NULL DEFAULT '{}',
ADD COLUMN send_all_instances boolean NOT NULL DEFAULT FALSE,
ADD COLUMN actor_type actor_type_enum NOT NULL DEFAULT 'person', ADD COLUMN actor_type actor_type_enum NOT NULL DEFAULT 'person',
ADD COLUMN actor_apub_id text DEFAULT NULL; ADD COLUMN actor_apub_id text DEFAULT NULL;
ALTER TABLE sent_activity ALTER TABLE sent_activity
ALTER COLUMN send_targets DROP DEFAULT, ALTER COLUMN send_inboxes DROP DEFAULT,
ALTER COLUMN actor_type DROP DEFAULT; ALTER COLUMN send_community_followers_of DROP DEFAULT,
ALTER COLUMN send_all_instances DROP DEFAULT,
ALTER COLUMN actor_type DROP DEFAULT,
ALTER COLUMN actor_apub_id DROP DEFAULT;
CREATE TABLE federation_queue_state( CREATE TABLE federation_queue_state(
domain text PRIMARY KEY, domain text PRIMARY KEY,