Add removal of old rate limit buckets

pull/3009/head
dullbananas 2023-06-13 02:31:06 +00:00
parent fd618d6b5b
commit 6b4c3ec5f6
4 changed files with 68 additions and 11 deletions

View File

@ -1,5 +1,6 @@
use crate::{error::LemmyError, IpAddr}; use crate::{error::LemmyError, IpAddr};
use actix_web::dev::{ConnectionInfo, Service, ServiceRequest, ServiceResponse, Transform}; use actix_web::dev::{ConnectionInfo, Service, ServiceRequest, ServiceResponse, Transform};
use enum_map::enum_map;
use futures::future::{ok, Ready}; use futures::future::{ok, Ready};
use rate_limiter::{RateLimitStorage, RateLimitType}; use rate_limiter::{RateLimitStorage, RateLimitType};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -9,6 +10,7 @@ use std::{
rc::Rc, rc::Rc,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
task::{Context, Poll}, task::{Context, Poll},
time::Duration,
}; };
use tokio::sync::{mpsc, mpsc::Sender, OnceCell}; use tokio::sync::{mpsc, mpsc::Sender, OnceCell};
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
@ -105,6 +107,33 @@ impl RateLimitCell {
Ok(()) Ok(())
} }
/// Remove buckets older than the given duration
pub fn remove_older_than(&self, mut duration: Duration) {
let mut guard = self
.rate_limit
.lock()
.expect("Failed to lock rate limit mutex for reading");
let rate_limit = &guard.rate_limit_config;
// If any rate limit interval is greater than `duration`, then the largest interval is used instead. This preserves buckets that would not pass the rate limit check.
let max_interval_secs = enum_map! {
RateLimitType::Message => rate_limit.message_per_second,
RateLimitType::Post => rate_limit.post_per_second,
RateLimitType::Register => rate_limit.register_per_second,
RateLimitType::Image => rate_limit.image_per_second,
RateLimitType::Comment => rate_limit.comment_per_second,
RateLimitType::Search => rate_limit.search_per_second,
}
.into_values()
.max()
.and_then(|max| u64::try_from(max).ok())
.unwrap_or(0);
duration = std::cmp::max(duration, Duration::from_secs(max_interval_secs));
guard.rate_limiter.remove_older_than(duration)
}
pub fn message(&self) -> RateLimitedGuard { pub fn message(&self) -> RateLimitedGuard {
self.kind(RateLimitType::Message) self.kind(RateLimitType::Message)
} }

View File

@ -1,6 +1,9 @@
use crate::IpAddr; use crate::IpAddr;
use enum_map::{enum_map, EnumMap}; use enum_map::{enum_map, EnumMap};
use std::{collections::HashMap, time::Instant}; use std::{
collections::HashMap,
time::{Duration, Instant},
};
use tracing::debug; use tracing::debug;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -67,4 +70,14 @@ impl RateLimitStorage {
true true
} }
} }
/// Remove buckets older than the given duration
pub(super) fn remove_older_than(&mut self, duration: Duration) {
// Only retain buckets that were last used after `instant`
let Some(instant) = Instant::now().checked_sub(duration) else { return };
self
.buckets
.retain(|_ip_addr, buckets| buckets.values().all(|bucket| bucket.last_checked > instant));
}
} }

View File

@ -119,21 +119,25 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
.with(TracingMiddleware::default()) .with(TracingMiddleware::default())
.build(); .build();
let context = LemmyContext::create(
pool.clone(),
client.clone(),
secret.clone(),
rate_limit_cell.clone(),
);
// Schedules various cleanup tasks for the DB // Schedules various cleanup tasks for the DB
thread::spawn(move || { thread::spawn({
scheduled_tasks::setup(db_url, user_agent).expect("Couldn't set up scheduled_tasks"); let context = context.clone();
move || {
scheduled_tasks::setup(db_url, user_agent, context).expect("Couldn't set up scheduled_tasks");
}
}); });
// Create Http server with websocket support // Create Http server with websocket support
let settings_bind = settings.clone(); let settings_bind = settings.clone();
HttpServer::new(move || { HttpServer::new(move || {
let context = LemmyContext::create( let context = context.clone();
pool.clone(),
client.clone(),
secret.clone(),
rate_limit_cell.clone(),
);
let federation_config = FederationConfig::builder() let federation_config = FederationConfig::builder()
.domain(settings.hostname.clone()) .domain(settings.hostname.clone())
.app_data(context.clone()) .app_data(context.clone())

View File

@ -7,6 +7,7 @@ use diesel::{
}; };
// Import week days and WeekDay // Import week days and WeekDay
use diesel::{sql_query, PgConnection, RunQueryDsl}; use diesel::{sql_query, PgConnection, RunQueryDsl};
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{ use lemmy_db_schema::{
schema::{ schema::{
activity, activity,
@ -27,7 +28,11 @@ use std::{thread, time::Duration};
use tracing::info; use tracing::info;
/// Schedules various cleanup tasks for lemmy in a background thread /// Schedules various cleanup tasks for lemmy in a background thread
pub fn setup(db_url: String, user_agent: String) -> Result<(), LemmyError> { pub fn setup(
db_url: String,
user_agent: String,
context_1: LemmyContext,
) -> Result<(), LemmyError> {
// Setup the connections // Setup the connections
let mut scheduler = Scheduler::new(); let mut scheduler = Scheduler::new();
@ -58,6 +63,12 @@ pub fn setup(db_url: String, user_agent: String) -> Result<(), LemmyError> {
clear_old_activities(&mut conn_3); clear_old_activities(&mut conn_3);
}); });
// Remove old rate limit buckets every week
scheduler.every(CTimeUnits::weeks(1)).run(move || {
let week = Duration::from_secs(3600 * 24 * 7);
context_1.settings_updated_channel().remove_older_than(week);
});
scheduler.every(CTimeUnits::days(1)).run(move || { scheduler.every(CTimeUnits::days(1)).run(move || {
update_instance_software(&mut conn_4, &user_agent); update_instance_software(&mut conn_4, &user_agent);
}); });