Paginate the outbox responses. Fixes #669 (#681)

* Paginate the outbox responses. Fixes #669

* Address Ana's review

* Make outbox_fetch page through instance outboxes

* Fix infinite loop in fetch_outbox

* Fix off by one
pull/687/head
Violet White 5 years ago committed by Ana Gelez
parent 866465c603
commit 52d860d402

@ -1,4 +1,9 @@
use activitypub::{actor::Group, collection::OrderedCollection, object::Image, CustomObject};
use activitypub::{
actor::Group,
collection::{OrderedCollection, OrderedCollectionPage},
object::Image,
CustomObject,
};
use chrono::NaiveDateTime;
use diesel::{self, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, SaveChangesDsl};
use openssl::{
@ -22,7 +27,7 @@ use safe_string::SafeString;
use schema::blogs;
use search::Searcher;
use users::User;
use {Connection, Error, PlumeRocket, Result};
use {ap_url, Connection, Error, PlumeRocket, Result, ITEMS_PER_PAGE};
pub type CustomGroup = CustomObject<ApSignature, Group>;
@ -220,12 +225,49 @@ impl Blog {
coll.collection_props.items = serde_json::to_value(self.get_activities(conn)?)?;
coll.collection_props
.set_total_items_u64(self.get_activities(conn)?.len() as u64)?;
coll.collection_props
.set_first_link(Id::new(ap_url(&format!("{}?page=1", &self.outbox_url))))?;
coll.collection_props
.set_last_link(Id::new(ap_url(&format!(
"{}?page={}",
&self.outbox_url,
(self.get_activities(conn)?.len() as u64 + ITEMS_PER_PAGE as u64 - 1) as u64
/ ITEMS_PER_PAGE as u64
))))?;
Ok(ActivityStream::new(coll))
}
pub fn outbox_page(
&self,
conn: &Connection,
(min, max): (i32, i32),
) -> Result<ActivityStream<OrderedCollectionPage>> {
let mut coll = OrderedCollectionPage::default();
let acts = self.get_activity_page(&conn, (min, max))?;
//This still doesn't do anything because the outbox
//doesn't do anything yet
coll.collection_page_props.set_next_link(Id::new(&format!(
"{}?page={}",
&self.outbox_url,
min / ITEMS_PER_PAGE + 1
)))?;
coll.collection_page_props.set_prev_link(Id::new(&format!(
"{}?page={}",
&self.outbox_url,
min / ITEMS_PER_PAGE - 1
)))?;
coll.collection_props.items = serde_json::to_value(acts)?;
Ok(ActivityStream::new(coll))
}
fn get_activities(&self, _conn: &Connection) -> Result<Vec<serde_json::Value>> {
Ok(vec![])
}
fn get_activity_page(
&self,
_conn: &Connection,
(_min, _max): (i32, i32),
) -> Result<Vec<serde_json::Value>> {
Ok(vec![])
}
pub fn get_keypair(&self) -> Result<PKey<Private>> {
PKey::from_rsa(Rsa::private_key_from_pem(

@ -75,7 +75,7 @@ impl From<bcrypt::BcryptError> for Error {
Error::Signature
}
}
pub const ITEMS_PER_PAGE: i32 = 12;
impl From<openssl::error::ErrorStack> for Error {
fn from(_: openssl::error::ErrorStack) -> Self {
Error::Signature

@ -1,7 +1,7 @@
use activitypub::{
activity::Delete,
actor::Person,
collection::OrderedCollection,
collection::{OrderedCollection, OrderedCollectionPage},
object::{Image, Tombstone},
Activity, CustomObject, Endpoint,
};
@ -49,7 +49,7 @@ use safe_string::SafeString;
use schema::users;
use search::Searcher;
use timeline::Timeline;
use {ap_url, Connection, Error, PlumeRocket, Result};
use {ap_url, Connection, Error, PlumeRocket, Result, ITEMS_PER_PAGE};
pub type CustomPerson = CustomObject<ApSignature, Person>;
@ -320,21 +320,52 @@ impl User {
.load::<User>(conn)
.map_err(Error::from)
}
pub fn outbox(&self, conn: &Connection) -> Result<ActivityStream<OrderedCollection>> {
let acts = self.get_activities(conn)?;
let n_acts = acts.len();
let mut coll = OrderedCollection::default();
let first = &format!("{}?page=1", &self.outbox_url);
let last = &format!(
"{}?page={}",
&self.outbox_url,
self.get_activities_count(&conn) / i64::from(ITEMS_PER_PAGE) + 1
);
coll.collection_props.set_first_link(Id::new(first))?;
coll.collection_props.set_last_link(Id::new(last))?;
coll.collection_props
.set_total_items_u64(self.get_activities_count(&conn) as u64)?;
Ok(ActivityStream::new(coll))
}
pub fn outbox_page(
&self,
conn: &Connection,
(min, max): (i32, i32),
) -> Result<ActivityStream<OrderedCollectionPage>> {
let acts = self.get_activities_page(conn, (min, max))?;
let n_acts = self.get_activities_count(&conn);
let mut coll = OrderedCollectionPage::default();
if n_acts - i64::from(min) >= i64::from(ITEMS_PER_PAGE) {
coll.collection_page_props.set_next_link(Id::new(&format!(
"{}?page={}",
&self.outbox_url,
min / ITEMS_PER_PAGE + 2
)))?;
}
if min > 0 {
coll.collection_page_props.set_prev_link(Id::new(&format!(
"{}?page={}",
&self.outbox_url,
min / ITEMS_PER_PAGE
)))?;
}
coll.collection_props.items = serde_json::to_value(acts)?;
coll.collection_props.set_total_items_u64(n_acts as u64)?;
coll.collection_page_props
.set_part_of_link(Id::new(&self.outbox_url))?;
Ok(ActivityStream::new(coll))
}
pub fn fetch_outbox<T: Activity>(&self) -> Result<Vec<T>> {
fn fetch_outbox_page<T: Activity>(&self, url: &str) -> Result<(Vec<T>, Option<String>)> {
let mut res = ClientBuilder::new()
.connect_timeout(Some(std::time::Duration::from_secs(5)))
.build()?
.get(&self.outbox_url[..])
.get(url)
.header(
ACCEPT,
HeaderValue::from_str(
@ -347,12 +378,62 @@ impl User {
.send()?;
let text = &res.text()?;
let json: serde_json::Value = serde_json::from_str(text)?;
Ok(json["items"]
let items = json["items"]
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|j| serde_json::from_value(j.clone()).ok())
.collect::<Vec<T>>())
.collect::<Vec<T>>();
let next = match json.get("next") {
Some(x) => Some(x.as_str().unwrap().to_owned()),
None => None,
};
Ok((items, next))
}
pub fn fetch_outbox<T: Activity>(&self) -> Result<Vec<T>> {
let mut res = ClientBuilder::new()
.connect_timeout(Some(std::time::Duration::from_secs(5)))
.build()?
.get(&self.outbox_url[..])
.header(
ACCEPT,
HeaderValue::from_str(
&ap_accept_header()
.into_iter()
.collect::<Vec<_>>()
.join(", "),
)?,
)
.send()?;
let text = &res.text()?;
let json: serde_json::Value = serde_json::from_str(text)?;
if let Some(first) = json.get("first") {
let mut items: Vec<T> = Vec::new();
let mut next = first.as_str().unwrap().to_owned();
while let Ok((mut page, nxt)) = self.fetch_outbox_page(&next) {
if page.is_empty() {
break;
}
items.extend(page.drain(..));
if let Some(n) = nxt {
if n == next {
break;
}
next = n;
} else {
break;
}
}
Ok(items)
} else {
Ok(json["items"]
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|j| serde_json::from_value(j.clone()).ok())
.collect::<Vec<T>>())
}
}
pub fn fetch_followers_ids(&self) -> Result<Vec<String>> {
@ -379,14 +460,31 @@ impl User {
.filter_map(|j| serde_json::from_value(j.clone()).ok())
.collect::<Vec<String>>())
}
fn get_activities(&self, conn: &Connection) -> Result<Vec<serde_json::Value>> {
fn get_activities_count(&self, conn: &Connection) -> i64 {
use schema::post_authors;
use schema::posts;
let posts_by_self = PostAuthor::belonging_to(self).select(post_authors::post_id);
posts::table
.filter(posts::published.eq(true))
.filter(posts::id.eq_any(posts_by_self))
.count()
.first(conn)
.unwrap()
}
fn get_activities_page(
&self,
conn: &Connection,
(min, max): (i32, i32),
) -> Result<Vec<serde_json::Value>> {
use schema::post_authors;
use schema::posts;
let posts_by_self = PostAuthor::belonging_to(self).select(post_authors::post_id);
let posts = posts::table
.filter(posts::published.eq(true))
.filter(posts::id.eq_any(posts_by_self))
.order(posts::creation_date.desc())
.offset(min.into())
.limit((max - min).into())
.load::<Post>(conn)?;
Ok(posts
.into_iter()

@ -182,6 +182,7 @@ Then try to restart Plume
routes::blogs::details,
routes::blogs::activity_details,
routes::blogs::outbox,
routes::blogs::outbox_page,
routes::blogs::new,
routes::blogs::new_auth,
routes::blogs::create,
@ -262,6 +263,7 @@ Then try to restart Plume
routes::user::follow_auth,
routes::user::activity_details,
routes::user::outbox,
routes::user::outbox_page,
routes::user::inbox,
routes::user::ap_followers,
routes::user::new,

@ -1,4 +1,4 @@
use activitypub::collection::OrderedCollection;
use activitypub::collection::{OrderedCollection, OrderedCollectionPage};
use atom_syndication::{Entry, FeedBuilder};
use diesel::SaveChangesDsl;
use rocket::{
@ -347,7 +347,16 @@ pub fn outbox(name: String, rockets: PlumeRocket) -> Option<ActivityStream<Order
let blog = Blog::find_by_fqn(&rockets, &name).ok()?;
Some(blog.outbox(&*rockets.conn).ok()?)
}
#[allow(unused_variables)]
#[get("/~/<name>/outbox?<page>")]
pub fn outbox_page(
name: String,
page: Page,
rockets: PlumeRocket,
) -> Option<ActivityStream<OrderedCollectionPage>> {
let blog = Blog::find_by_fqn(&rockets, &name).ok()?;
Some(blog.outbox_page(&*rockets.conn, page.limits()).ok()?)
}
#[get("/~/<name>/atom.xml")]
pub fn atom_feed(name: String, rockets: PlumeRocket) -> Option<Content<String>> {
let blog = Blog::find_by_fqn(&rockets, &name).ok()?;

@ -1,5 +1,6 @@
#![warn(clippy::too_many_arguments)]
use atom_syndication::{ContentBuilder, Entry, EntryBuilder, LinkBuilder, Person, PersonBuilder};
use plume_models::{posts::Post, Connection, CONFIG, ITEMS_PER_PAGE};
use rocket::{
http::{
hyper::header::{CacheControl, CacheDirective, ETag, EntityTag},
@ -17,9 +18,6 @@ use std::{
};
use template_utils::Ructe;
use plume_models::{posts::Post, Connection, CONFIG};
const ITEMS_PER_PAGE: i32 = 12;
/// Special return type used for routes that "cannot fail", and instead
/// `Redirect`, or `Flash<Redirect>`, when we cannot deliver a `Ructe` Response
#[allow(clippy::large_enum_variant)]

@ -1,4 +1,7 @@
use activitypub::{activity::Create, collection::OrderedCollection};
use activitypub::{
activity::Create,
collection::{OrderedCollection, OrderedCollectionPage},
};
use atom_syndication::{Entry, FeedBuilder};
use diesel::SaveChangesDsl;
use rocket::{
@ -553,7 +556,15 @@ pub fn outbox(name: String, rockets: PlumeRocket) -> Option<ActivityStream<Order
let user = User::find_by_fqn(&rockets, &name).ok()?;
user.outbox(&*rockets.conn).ok()
}
#[get("/@/<name>/outbox?<page>")]
pub fn outbox_page(
name: String,
page: Page,
rockets: PlumeRocket,
) -> Option<ActivityStream<OrderedCollectionPage>> {
let user = User::find_by_fqn(&rockets, &name).ok()?;
user.outbox_page(&*rockets.conn, page.limits()).ok()
}
#[post("/@/<name>/inbox", data = "<data>")]
pub fn inbox(
name: String,

Loading…
Cancel
Save