Ignore incoming activities which have been received before (ref #1220)

This commit is contained in:
Felix Ableitner 2020-10-23 14:29:56 +02:00
parent 3d5647b16f
commit 6d17d5ead2
16 changed files with 169 additions and 91 deletions

View File

@ -73,7 +73,7 @@ pub(crate) async fn receive_create_comment(
websocket_id: None,
});
announce_if_community_is_local(create, &user, context, request_counter).await?;
announce_if_community_is_local(create, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -131,7 +131,7 @@ pub(crate) async fn receive_update_comment(
websocket_id: None,
});
announce_if_community_is_local(update, &user, context, request_counter).await?;
announce_if_community_is_local(update, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -183,7 +183,7 @@ pub(crate) async fn receive_like_comment(
websocket_id: None,
});
announce_if_community_is_local(like, &user, context, request_counter).await?;
announce_if_community_is_local(like, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -241,7 +241,7 @@ pub(crate) async fn receive_dislike_comment(
websocket_id: None,
});
announce_if_community_is_local(dislike, &user, context, request_counter).await?;
announce_if_community_is_local(dislike, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -276,8 +276,7 @@ pub(crate) async fn receive_delete_comment(
websocket_id: None,
});
let user = get_actor_as_user(&delete, context, request_counter).await?;
announce_if_community_is_local(delete, &user, context, request_counter).await?;
announce_if_community_is_local(delete, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}

View File

@ -57,7 +57,7 @@ pub(crate) async fn receive_undo_like_comment(
websocket_id: None,
});
announce_if_community_is_local(undo, &user, context, request_counter).await?;
announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -109,7 +109,7 @@ pub(crate) async fn receive_undo_dislike_comment(
websocket_id: None,
});
announce_if_community_is_local(undo, &user, context, request_counter).await?;
announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -145,8 +145,7 @@ pub(crate) async fn receive_undo_delete_comment(
websocket_id: None,
});
let user = get_actor_as_user(&undo, context, request_counter).await?;
announce_if_community_is_local(undo, &user, context, request_counter).await?;
announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -182,7 +181,6 @@ pub(crate) async fn receive_undo_remove_comment(
websocket_id: None,
});
let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}

View File

@ -1,4 +1,4 @@
use crate::activities::receive::{announce_if_community_is_local, get_actor_as_user};
use crate::activities::receive::announce_if_community_is_local;
use activitystreams::activity::{Delete, Remove, Undo};
use actix_web::HttpResponse;
use lemmy_db::{community::Community, community_view::CommunityView};
@ -33,8 +33,7 @@ pub(crate) async fn receive_delete_community(
websocket_id: None,
});
let user = get_actor_as_user(&delete, context, request_counter).await?;
announce_if_community_is_local(delete, &user, context, request_counter).await?;
announce_if_community_is_local(delete, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -95,8 +94,7 @@ pub(crate) async fn receive_undo_delete_community(
websocket_id: None,
});
let user = get_actor_as_user(&undo, context, request_counter).await?;
announce_if_community_is_local(undo, &user, context, request_counter).await?;
announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -128,7 +126,6 @@ pub(crate) async fn receive_undo_remove_community(
websocket_id: None,
});
let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}

View File

@ -39,7 +39,6 @@ where
/// community, the activity is announced to all community followers.
async fn announce_if_community_is_local<T, Kind>(
activity: T,
user: &User_,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError>
@ -62,7 +61,7 @@ where
if community.local {
community
.send_announce(activity.into_any_base()?, &user, context)
.send_announce(activity.into_any_base()?, context)
.await?;
}
Ok(())

View File

@ -51,7 +51,7 @@ pub(crate) async fn receive_create_post(
websocket_id: None,
});
announce_if_community_is_local(create, &user, context, request_counter).await?;
announce_if_community_is_local(create, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -89,7 +89,7 @@ pub(crate) async fn receive_update_post(
websocket_id: None,
});
announce_if_community_is_local(update, &user, context, request_counter).await?;
announce_if_community_is_local(update, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -134,7 +134,7 @@ pub(crate) async fn receive_like_post(
websocket_id: None,
});
announce_if_community_is_local(like, &user, context, request_counter).await?;
announce_if_community_is_local(like, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -185,7 +185,7 @@ pub(crate) async fn receive_dislike_post(
websocket_id: None,
});
announce_if_community_is_local(dislike, &user, context, request_counter).await?;
announce_if_community_is_local(dislike, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -214,8 +214,7 @@ pub(crate) async fn receive_delete_post(
websocket_id: None,
});
let user = get_actor_as_user(&delete, context, request_counter).await?;
announce_if_community_is_local(delete, &user, context, request_counter).await?;
announce_if_community_is_local(delete, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}

View File

@ -52,7 +52,7 @@ pub(crate) async fn receive_undo_like_post(
websocket_id: None,
});
announce_if_community_is_local(undo, &user, context, request_counter).await?;
announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -98,7 +98,7 @@ pub(crate) async fn receive_undo_dislike_post(
websocket_id: None,
});
announce_if_community_is_local(undo, &user, context, request_counter).await?;
announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -127,8 +127,7 @@ pub(crate) async fn receive_undo_delete_post(
websocket_id: None,
});
let user = get_actor_as_user(&undo, context, request_counter).await?;
announce_if_community_is_local(undo, &user, context, request_counter).await?;
announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
@ -158,7 +157,6 @@ pub(crate) async fn receive_undo_remove_post(
websocket_id: None,
});
let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}

View File

@ -95,7 +95,7 @@ impl ActorType for Community {
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
send_to_community_followers(delete, self, context, None).await?;
send_to_community_followers(delete, self, context).await?;
Ok(())
}
@ -121,7 +121,7 @@ impl ActorType for Community {
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
send_to_community_followers(undo, self, context, None).await?;
send_to_community_followers(undo, self, context).await?;
Ok(())
}
@ -134,7 +134,7 @@ impl ActorType for Community {
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
send_to_community_followers(remove, self, context, None).await?;
send_to_community_followers(remove, self, context).await?;
Ok(())
}
@ -155,7 +155,7 @@ impl ActorType for Community {
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
send_to_community_followers(undo, self, context, None).await?;
send_to_community_followers(undo, self, context).await?;
Ok(())
}
@ -164,7 +164,6 @@ impl ActorType for Community {
async fn send_announce(
&self,
activity: AnyBase,
sender: &User_,
context: &LemmyContext,
) -> Result<(), LemmyError> {
let mut announce = Announce::new(self.actor_id.to_owned(), activity);
@ -174,13 +173,7 @@ impl ActorType for Community {
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
send_to_community_followers(
announce,
self,
context,
Some(sender.get_shared_inbox_url()?),
)
.await?;
send_to_community_followers(announce, self, context).await?;
Ok(())
}

View File

@ -121,7 +121,6 @@ impl ActorType for User_ {
async fn send_announce(
&self,
_activity: AnyBase,
_sender: &User_,
_context: &LemmyContext,
) -> Result<(), LemmyError> {
unimplemented!()

View File

@ -74,24 +74,19 @@ pub async fn send_to_community_followers<T, Kind>(
activity: T,
community: &Community,
context: &LemmyContext,
sender_shared_inbox: Option<Url>,
) -> Result<(), LemmyError>
where
T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
// dont send to the local instance, nor to the instance where the activity originally came from,
// because that would result in a database error (same data inserted twice)
let community_shared_inbox = community.get_shared_inbox_url()?;
let follower_inboxes: Vec<Url> = community
.get_follower_inboxes(context.pool())
.await?
.iter()
.filter(|inbox| Some(inbox) != sender_shared_inbox.as_ref().as_ref())
.filter(|inbox| inbox != &&community_shared_inbox)
.filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
.unique()
.filter(|inbox| inbox.host_str() != Some(&Settings::get().hostname))
.filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
.map(|inbox| inbox.to_owned())
.collect();
debug!(
@ -133,7 +128,7 @@ where
// if this is a local community, we need to do an announce from the community instead
if community.local {
community
.send_announce(activity.into_any_base()?, creator, context)
.send_announce(activity.into_any_base()?, context)
.await?;
} else {
let inbox = community.get_shared_inbox_url()?;
@ -223,7 +218,8 @@ where
// This is necessary because send_comment and send_comment_mentions
// might send the same ap_id
if insert_into_db {
insert_activity(actor.user_id(), activity.clone(), true, pool).await?;
let id = activity.id().context(location_info!())?;
insert_activity(id, actor.user_id(), activity.clone(), true, pool).await?;
}
for i in inboxes {

View File

@ -3,6 +3,7 @@ use crate::{
check_is_apub_id_valid,
extensions::signatures::verify_signature,
fetcher::get_or_fetch_and_upsert_user,
inbox::{get_activity_id, is_activity_already_known},
insert_activity,
ActorType,
};
@ -80,6 +81,11 @@ pub async fn community_inbox(
verify_signature(&request, &user)?;
let activity_id = get_activity_id(&activity, user_uri)?;
if is_activity_already_known(context.pool(), &activity_id).await? {
return Ok(HttpResponse::Ok().finish());
}
let any_base = activity.clone().into_any_base()?;
let kind = activity.kind().context(location_info!())?;
let user_id = user.id;
@ -88,7 +94,14 @@ pub async fn community_inbox(
ValidTypes::Undo => handle_undo_follow(any_base, user, community, &context).await,
};
insert_activity(user_id, activity.clone(), false, context.pool()).await?;
insert_activity(
&activity_id,
user_id,
activity.clone(),
false,
context.pool(),
)
.await?;
res
}

View File

@ -1,3 +1,37 @@
use activitystreams::base::{BaseExt, Extends};
use anyhow::Context;
use lemmy_db::{activity::Activity, DbPool};
use lemmy_structs::blocking;
use lemmy_utils::{location_info, LemmyError};
use serde::{export::fmt::Debug, Serialize};
use url::Url;
pub mod community_inbox;
pub mod shared_inbox;
pub mod user_inbox;
pub(crate) fn get_activity_id<T, Kind>(activity: &T, creator_uri: &Url) -> Result<Url, LemmyError>
where
T: BaseExt<Kind> + Extends<Kind> + Debug,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
let creator_domain = creator_uri.host_str().context(location_info!())?;
let activity_id = activity.id(creator_domain)?;
Ok(activity_id.context(location_info!())?.to_owned())
}
pub(crate) async fn is_activity_already_known(
pool: &DbPool,
activity_id: &Url,
) -> Result<bool, LemmyError> {
let activity_id = activity_id.to_string();
let existing = blocking(pool, move |conn| {
Activity::read_from_apub_id(&conn, &activity_id)
})
.await?;
match existing {
Ok(_) => Ok(true),
Err(_) => Ok(false),
}
}

View File

@ -42,6 +42,7 @@ use crate::{
check_is_apub_id_valid,
extensions::signatures::verify_signature,
fetcher::get_or_fetch_and_upsert_actor,
inbox::{get_activity_id, is_activity_already_known},
insert_activity,
ActorType,
};
@ -104,6 +105,11 @@ pub async fn shared_inbox(
let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?;
verify_signature(&request, actor.as_ref())?;
let activity_id = get_activity_id(&activity, &actor_id)?;
if is_activity_already_known(context.pool(), &activity_id).await? {
return Ok(HttpResponse::Ok().finish());
}
let any_base = activity.clone().into_any_base()?;
let kind = activity.kind().context(location_info!())?;
let res = match kind {
@ -119,7 +125,14 @@ pub async fn shared_inbox(
ValidTypes::Undo => receive_undo(&context, any_base, actor_id, request_counter).await,
};
insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?;
insert_activity(
&activity_id,
actor.user_id(),
activity.clone(),
false,
context.pool(),
)
.await?;
res
}
@ -142,6 +155,9 @@ async fn receive_announce(
let inner_id = object.id().context(location_info!())?.to_owned();
check_is_apub_id_valid(&inner_id)?;
if is_activity_already_known(context.pool(), &inner_id).await? {
return Ok(HttpResponse::Ok().finish());
}
match kind {
Some("Create") => receive_create(context, object, inner_id, request_counter).await,

View File

@ -3,6 +3,7 @@ use crate::{
check_is_apub_id_valid,
extensions::signatures::verify_signature,
fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_community},
inbox::{get_activity_id, is_activity_already_known},
insert_activity,
ActorType,
FromApub,
@ -83,6 +84,11 @@ pub async fn user_inbox(
let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?;
verify_signature(&request, actor.as_ref())?;
let activity_id = get_activity_id(&activity, actor_uri)?;
if is_activity_already_known(context.pool(), &activity_id).await? {
return Ok(HttpResponse::Ok().finish());
}
let any_base = activity.clone().into_any_base()?;
let kind = activity.kind().context(location_info!())?;
let res = match kind {
@ -101,7 +107,14 @@ pub async fn user_inbox(
}
};
insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?;
insert_activity(
&activity_id,
actor.user_id(),
activity.clone(),
false,
context.pool(),
)
.await?;
res
}

View File

@ -22,7 +22,7 @@ use activitystreams::{
};
use activitystreams_ext::{Ext1, Ext2};
use anyhow::{anyhow, Context};
use lemmy_db::{activity::do_insert_activity, user::User_, DbPool};
use lemmy_db::{activity::Activity, user::User_, DbPool};
use lemmy_structs::blocking;
use lemmy_utils::{location_info, settings::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
@ -195,7 +195,6 @@ pub trait ActorType {
async fn send_announce(
&self,
activity: AnyBase,
sender: &User_,
context: &LemmyContext,
) -> Result<(), LemmyError>;
@ -249,16 +248,18 @@ pub trait ActorType {
/// Store a sent or received activity in the database, for logging purposes. These records are not
/// persistent.
pub async fn insert_activity<T>(
ap_id: &Url,
user_id: i32,
data: T,
activity: T,
local: bool,
pool: &DbPool,
) -> Result<(), LemmyError>
where
T: Serialize + std::fmt::Debug + Send + 'static,
{
let ap_id = ap_id.to_string();
blocking(pool, move |conn| {
do_insert_activity(conn, user_id, &data, local)
Activity::insert(conn, ap_id, user_id, &activity, local)
})
.await??;
Ok(())

View File

@ -12,6 +12,7 @@ use std::{
#[table_name = "activity"]
pub struct Activity {
pub id: i32,
pub ap_id: String,
pub user_id: i32,
pub data: Value,
pub local: bool,
@ -22,6 +23,7 @@ pub struct Activity {
#[derive(Insertable, AsChangeset)]
#[table_name = "activity"]
pub struct ActivityForm {
pub ap_id: String,
pub user_id: i32,
pub data: Value,
pub local: bool,
@ -53,30 +55,39 @@ impl Crud<ActivityForm> for Activity {
}
}
pub fn do_insert_activity<T>(
conn: &PgConnection,
user_id: i32,
data: &T,
local: bool,
) -> Result<Activity, IoError>
where
T: Serialize + Debug,
{
debug!("inserting activity for user {}: ", user_id);
debug!("{}", serde_json::to_string_pretty(&data)?);
let activity_form = ActivityForm {
user_id,
data: serde_json::to_value(&data)?,
local,
updated: None,
};
let result = Activity::create(&conn, &activity_form);
match result {
Ok(s) => Ok(s),
Err(e) => Err(IoError::new(
ErrorKind::Other,
format!("Failed to insert activity into database: {}", e),
)),
impl Activity {
pub fn insert<T>(
conn: &PgConnection,
ap_id: String,
user_id: i32,
data: &T,
local: bool,
) -> Result<Self, IoError>
where
T: Serialize + Debug,
{
debug!("inserting activity for user {}: ", user_id);
debug!("{}", serde_json::to_string_pretty(&data)?);
let activity_form = ActivityForm {
ap_id,
user_id,
data: serde_json::to_value(&data)?,
local,
updated: None,
};
let result = Activity::create(&conn, &activity_form);
match result {
Ok(s) => Ok(s),
Err(e) => Err(IoError::new(
ErrorKind::Other,
format!("Failed to insert activity into database: {}", e),
)),
}
}
pub fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Self, Error> {
use crate::schema::activity::dsl::*;
activity.filter(ap_id.eq(object_id)).first::<Self>(conn)
}
}
@ -125,16 +136,24 @@ mod tests {
let inserted_creator = User_::create(&conn, &creator_form).unwrap();
let ap_id =
"https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c";
let test_json: Value = serde_json::from_str(
r#"{
"street": "Article Circle Expressway 1",
"city": "North Pole",
"postcode": "99705",
"state": "Alaska"
}"#,
"@context": "https://www.w3.org/ns/activitystreams",
"id": "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c",
"type": "Delete",
"actor": "https://enterprise.lemmy.ml/u/riker",
"to": "https://www.w3.org/ns/activitystreams#Public",
"cc": [
"https://enterprise.lemmy.ml/c/main/"
],
"object": "https://enterprise.lemmy.ml/post/32"
}"#,
)
.unwrap();
let activity_form = ActivityForm {
ap_id: ap_id.to_string(),
user_id: inserted_creator.id,
data: test_json.to_owned(),
local: true,
@ -144,6 +163,7 @@ mod tests {
let inserted_activity = Activity::create(&conn, &activity_form).unwrap();
let expected_activity = Activity {
ap_id: ap_id.to_string(),
id: inserted_activity.id,
user_id: inserted_creator.id,
data: test_json,
@ -153,9 +173,11 @@ mod tests {
};
let read_activity = Activity::read(&conn, inserted_activity.id).unwrap();
let read_activity_by_apub_id = Activity::read_from_apub_id(&conn, ap_id).unwrap();
User_::delete(&conn, inserted_creator.id).unwrap();
assert_eq!(expected_activity, read_activity);
assert_eq!(expected_activity, read_activity_by_apub_id);
assert_eq!(expected_activity, inserted_activity);
}
}

View File

@ -1,6 +1,7 @@
table! {
activity (id) {
id -> Int4,
ap_id -> Text,
user_id -> Int4,
data -> Jsonb,
local -> Bool,