#730 Go async

Open
igalic wants to merge 46 commits from go/async into master
  1. +1
    -0
      .gitignore
  2. +1806
    -1883
      Cargo.lock
  3. +12
    -10
      Cargo.toml
  4. +0
    -2
      build.rs
  5. +9
    -5
      plume-common/Cargo.toml
  6. +4
    -3
      plume-common/src/activity_pub/inbox.rs
  7. +42
    -37
      plume-common/src/activity_pub/mod.rs
  8. +0
    -1
      plume-common/src/activity_pub/request.rs
  9. +0
    -3
      plume-common/src/activity_pub/sign.rs
  10. +0
    -2
      plume-common/src/lib.rs
  11. +17
    -6
      plume-models/Cargo.toml
  12. +6
    -4
      plume-models/src/admin.rs
  13. +20
    -16
      plume-models/src/api_tokens.rs
  14. +7
    -6
      plume-models/src/blogs.rs
  15. +8
    -8
      plume-models/src/comments.rs
  16. +6
    -6
      plume-models/src/db_conn.rs
  17. +2
    -1
      plume-models/src/headers.rs
  18. +0
    -1
      plume-models/src/inbox.rs
  19. +7
    -0
      plume-models/src/lib.rs
  20. +60
    -20
      plume-models/src/lists.rs
  21. +5
    -7
      plume-models/src/medias.rs
  22. +2
    -2
      plume-models/src/mentions.rs
  23. +30
    -14
      plume-models/src/plume_rocket.rs
  24. +17
    -12
      plume-models/src/posts.rs
  25. +28
    -8
      plume-models/src/timeline/query.rs
  26. +76
    -70
      plume-models/src/users.rs
  27. +2
    -1
      po/plume/ar.po
  28. +1
    -1
      rust-toolchain
  29. +3
    -1
      src/api/authorization.rs
  30. +25
    -16
      src/api/mod.rs
  31. +2
    -2
      src/api/posts.rs
  32. +22
    -22
      src/inbox.rs
  33. +3
    -23
      src/main.rs
  34. +31
    -18
      src/routes/blogs.rs
  35. +23
    -17
      src/routes/comments.rs
  36. +23
    -11
      src/routes/errors.rs
  37. +11
    -7
      src/routes/instance.rs
  38. +11
    -10
      src/routes/mod.rs
  39. +40
    -28
      src/routes/posts.rs
  40. +2
    -2
      src/routes/reshares.rs
  41. +8
    -6
      src/routes/session.rs
  42. +68
    -57
      src/routes/user.rs
  43. +36
    -6
      src/routes/well_known.rs
  44. +7
    -7
      src/template_utils.rs
  45. +4
    -0
      templates/base.rs.html
  46. +2
    -0
      templates/blogs/details.rs.html
  47. +2
    -0
      templates/blogs/edit.rs.html
  48. +2
    -0
      templates/posts/details.rs.html

+ 1
- 0
.gitignore View File

@@ -18,3 +18,4 @@ tags.*
search_index
.buildconfig
__pycache__
.vscode/

+ 1806
- 1883
Cargo.lock
File diff suppressed because it is too large
View File


+ 12
- 10
Cargo.toml View File

@@ -8,6 +8,7 @@ edition = "2018"
[dependencies]
activitypub = "0.1.3"
askama_escape = "0.1"
async-trait = "*"
atom_syndication = "0.6"
clap = "2.33"
colored = "1.8"
@@ -20,20 +21,19 @@ heck = "0.3.0"
lettre = "0.9.2"
lettre_email = "0.9.2"
num_cpus = "1.10"
rocket = "0.4.2"
rocket_contrib = { version = "0.4.2", features = ["json"] }
rocket_i18n = { git = "https://github.com/Plume-org/rocket_i18n", rev = "e922afa7c366038b3433278c03b1456b346074f2" }
rocket = { git = "https://github.com/SergioBenitez/Rocket", rev = "async" }
rocket_contrib = { git = "https://github.com/SergioBenitez/Rocket", rev = "async" , features = ["json"] }
rpassword = "4.0"
runtime-fmt = "0.4.0"
scheduled-thread-pool = "0.2.2"
serde = "1.0"
serde_json = "1.0"
serde_qs = "0.5"
shrinkwraprs = "0.2.1"
syntect = "3.3"
validator = "0.8"
validator_derive = "0.8"
webfinger = "0.4.1"
tokio = "0.2"
validator = "0.10"
validator_derive = "0.10"
webfinger = { git = "https://github.com/Plume-org/webfinger", rev = "4e8f12810c4a7ba7a07bbcb722cd265fdff512b6", features = ["async"] }

[[bin]]
name = "plume"
@@ -65,9 +65,11 @@ path = "plume-common"
[dependencies.plume-models]
path = "plume-models"

[dependencies.rocket_csrf]
git = "https://github.com/fdb-hiroshima/rocket_csrf"
rev = "29910f2829e7e590a540da3804336577b48c7b31"
[dependencies.rocket_i18n]
git = "https://github.com/Plume-org/rocket_i18n"
branch = "go-async"
default-features = false
features = ["rocket"]

[build-dependencies]
ructe = "0.9.0"


+ 0
- 2
build.rs View File

@@ -1,5 +1,3 @@
use rsass;

use ructe::Ructe;
use std::process::{Command, Stdio};
use std::{ffi::OsStr, fs::*, io::Write, path::*};


+ 9
- 5
plume-common/Cargo.toml View File

@@ -6,22 +6,22 @@ edition = "2018"

[dependencies]
activitypub = "0.1.1"
activitystreams-derive = "0.1.1"
activitystreams-derive = "0.2"
activitystreams-traits = "0.1.0"
array_tool = "1.0"
base64 = "0.10"
futures-util = "*"
heck = "0.3.0"
hex = "0.3"
hyper = "0.12.33"
hyper = "0.13"
openssl = "0.10.22"
rocket = "0.4.0"
reqwest = "0.9"
rocket = { git = "https://github.com/SergioBenitez/Rocket", rev = "async" }
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
shrinkwraprs = "0.2.1"
syntect = "3.3"
tokio = "0.1.22"
tokio = "0.2"
regex-syntax = { version = "0.6.17", default-features = false, features = ["unicode-perl"] }

[dependencies.chrono]
@@ -31,3 +31,7 @@ version = "0.4"
[dependencies.pulldown-cmark]
default-features = false
version = "0.2.0"

[dependencies.reqwest]
features = ["json", "blocking"]
version = "0.10"

+ 4
- 3
plume-common/src/activity_pub/inbox.rs View File

@@ -279,8 +279,9 @@ pub trait FromId<C>: Sized {

/// Dereferences an ID
fn deref(id: &str) -> Result<Self::Object, (Option<serde_json::Value>, Self::Error)> {
reqwest::ClientBuilder::new()
.connect_timeout(Some(std::time::Duration::from_secs(5)))
// Use blocking reqwest API here, since defer cannot be async (yet)
reqwest::blocking::Client::builder()
.connect_timeout(std::time::Duration::from_secs(5))
.build()
.map_err(|_| (None, InboxError::DerefError.into()))?
.get(id)
@@ -296,7 +297,7 @@ pub trait FromId<C>: Sized {
)
.send()
.map_err(|_| (None, InboxError::DerefError))
.and_then(|mut r| {
.and_then(|r| {
let json: serde_json::Value = r
.json()
.map_err(|_| (None, InboxError::InvalidObject(None)))?;


+ 42
- 37
plume-common/src/activity_pub/mod.rs View File

@@ -1,14 +1,12 @@
use activitypub::{Activity, Link, Object};
use array_tool::vec::Uniq;
use reqwest::r#async::ClientBuilder;
use reqwest::ClientBuilder;
use rocket::{
http::Status,
request::{FromRequest, Request},
response::{Responder, Response},
response::{Responder, Response, Result},
Outcome,
};
use serde_json;
use tokio::prelude::*;

use self::sign::Signable;

@@ -62,39 +60,45 @@ impl<T> ActivityStream<T> {
ActivityStream(t)
}
}
impl<'r, O: Object> Responder<'r> for ActivityStream<O> {
fn respond_to(self, request: &Request<'_>) -> Result<Response<'r>, Status> {
#[rocket::async_trait]
impl<'r, O: Object + Send + 'r> Responder<'r> for ActivityStream<O> {
async fn respond_to(self, request: &'r Request<'_>) -> Result<'r> {
let mut json = serde_json::to_value(&self.0).map_err(|_| Status::InternalServerError)?;
json["@context"] = context();
serde_json::to_string(&json).respond_to(request).map(|r| {
Response::build_from(r)
let result = serde_json::to_string(&json).map_err(rocket::response::Debug);
match result.respond_to(request).await {
Ok(r) => Response::build_from(r)
.raw_header("Content-Type", "application/activity+json")
.finalize()
})
.ok(),
Err(e) => Err(e),
}
}
}

#[derive(Clone)]
pub struct ApRequest;
#[rocket::async_trait]
impl<'a, 'r> FromRequest<'a, 'r> for ApRequest {
type Error = ();

fn from_request(request: &'a Request<'r>) -> Outcome<Self, (Status, Self::Error), ()> {
async fn from_request(request: &'a Request<'r>) -> Outcome<Self, (Status, Self::Error), ()> {
request
.headers()
.get_one("Accept")
.map(|header| {
header
.split(',')
.map(|ct| match ct.trim() {
// bool for Forward: true if found a valid Content-Type for Plume first (HTML), false otherwise
"application/ld+json; profile=\"https://w3.org/ns/activitystreams\""
| "application/ld+json;profile=\"https://w3.org/ns/activitystreams\""
| "application/activity+json"
| "application/ld+json" => Outcome::Success(ApRequest),
"text/html" => Outcome::Forward(true),
_ => Outcome::Forward(false),
.map(|ct| {
match ct.trim() {
// bool for Forward: true if found a valid Content-Type for Plume first (HTML),
// false otherwise
"application/ld+json; profile=\"https://w3.org/ns/activitystreams\""
| "application/ld+json;profile=\"https://w3.org/ns/activitystreams\""
| "application/activity+json"
| "application/ld+json" => Outcome::Success(ApRequest),
"text/html" => Outcome::Forward(true),
_ => Outcome::Forward(false),
}
})
.fold(Outcome::Forward(false), |out, ct| {
if out.clone().forwarded().unwrap_or_else(|| out.is_success()) {
@@ -130,36 +134,38 @@ where
.sign(sender)
.expect("activity_pub::broadcast: signature error");

let mut rt = tokio::runtime::current_thread::Runtime::new()
.expect("Error while initializing tokio runtime for federation");
let client = ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(5))
let rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.expect("Can't build client");
.expect("Error while initializing tokio runtime for federation");
for inbox in boxes {
let body = signed.to_string();
let mut headers = request::headers();
headers.insert("Digest", request::Digest::digest(&body));
rt.spawn(
let sig = request::signature(sender, &headers)
.expect("activity_pub::broadcast: request signature error");
let client = ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(5))
.build()
.expect("Can't build client");
rt.spawn(async move {
client
.post(&inbox)
.headers(headers.clone())
.header(
"Signature",
request::signature(sender, &headers)
.expect("activity_pub::broadcast: request signature error"),
)
.header("Signature", sig)
.body(body)
.send()
.and_then(|r| r.into_body().concat2())
.await
.unwrap()
.text()
.await
.map(move |response| {
println!("Successfully sent activity to inbox ({})", inbox);
println!("Response: \"{:?}\"\n", response)
})
.map_err(|e| println!("Error while sending to inbox ({:?})", e)),
);
.map_err(|e| println!("Error while sending to inbox ({:?})", e))
});
}
rt.run().unwrap();
}

#[derive(Shrinkwrap, Clone, Serialize, Deserialize)]
@@ -203,8 +209,7 @@ pub struct PublicKey {
pub public_key_pem: Option<serde_json::Value>,
}

#[derive(Clone, Debug, Default, UnitString)]
#[activitystreams(Hashtag)]
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct HashtagType;

#[derive(Clone, Debug, Default, Deserialize, Serialize, Properties)]


+ 0
- 1
plume-common/src/activity_pub/request.rs View File

@@ -1,4 +1,3 @@
use base64;
use chrono::{offset::Utc, DateTime};
use openssl::hash::{Hasher, MessageDigest};
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE, DATE, USER_AGENT};


+ 0
- 3
plume-common/src/activity_pub/sign.rs View File

@@ -1,10 +1,7 @@
use super::request;
use base64;
use chrono::{naive::NaiveDateTime, DateTime, Duration, Utc};
use hex;
use openssl::{pkey::PKey, rsa::Rsa, sha::sha256};
use rocket::http::HeaderMap;
use serde_json;

/// Returns (public key, private key)
pub fn gen_keypair() -> (Vec<u8>, Vec<u8>) {


+ 0
- 2
plume-common/src/lib.rs View File

@@ -2,9 +2,7 @@

#[macro_use]
extern crate activitystreams_derive;
use activitystreams_traits;

use serde;
#[macro_use]
extern crate shrinkwraprs;
#[macro_use]


+ 17
- 6
plume-models/Cargo.toml View File

@@ -10,24 +10,24 @@ ammonia = "2.1.1"
askama_escape = "0.1"
bcrypt = "0.5"
guid-create = "0.1"
futures = "0.3"
heck = "0.3.0"
itertools = "0.8.0"
lazy_static = "1.0"
migrations_internals= "1.4.0"
openssl = "0.10.22"
rocket = "0.4.0"
rocket_i18n = { git = "https://github.com/Plume-org/rocket_i18n", rev = "e922afa7c366038b3433278c03b1456b346074f2" }
reqwest = "0.9"
rocket = { git = "https://github.com/SergioBenitez/Rocket", rev = "async" }
scheduled-thread-pool = "0.2.2"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
tantivy = "0.10.1"
tokio = "0.2"
url = "2.1"
walkdir = "2.2"
webfinger = "0.4.1"
webfinger = { git = "https://github.com/Plume-org/webfinger", rev = "4e8f12810c4a7ba7a07bbcb722cd265fdff512b6", features = ["async"] }
whatlang = "0.7.1"
shrinkwraprs = "0.2.1"
shrinkwraprs = "0.3"
diesel-derive-newtype = "0.1.2"
glob = "0.3.0"

@@ -48,8 +48,19 @@ path = "../plume-common"
[dependencies.plume-macro]
path = "../plume-macro"

[dependencies.reqwest]
features = ["json", "blocking"]
version = "0.10"


[dependencies.rocket_i18n]
git = "https://github.com/Plume-org/rocket_i18n"
branch = "go-async"
default-features = false
features = ["rocket"]

[dev-dependencies]
diesel_migrations = "1.3.0"
diesel_migrations = "1.4.0"

[features]
postgres = ["diesel/postgres", "plume-macro/postgres" ]


+ 6
- 4
plume-models/src/admin.rs View File

@@ -8,11 +8,12 @@ use rocket::{
/// Wrapper around User to use as a request guard on pages reserved to admins.
pub struct Admin(pub User);

#[rocket::async_trait]
impl<'a, 'r> FromRequest<'a, 'r> for Admin {
type Error = ();

fn from_request(request: &'a Request<'r>) -> request::Outcome<Admin, ()> {
let user = request.guard::<User>()?;
async fn from_request(request: &'a Request<'r>) -> request::Outcome<Self, Self::Error> {
let user = try_outcome!(User::from_request(request).await);
if user.is_admin() {
Outcome::Success(Admin(user))
} else {
@@ -24,11 +25,12 @@ impl<'a, 'r> FromRequest<'a, 'r> for Admin {
/// Same as `Admin` but for moderators.
pub struct Moderator(pub User);

#[rocket::async_trait]
impl<'a, 'r> FromRequest<'a, 'r> for Moderator {
type Error = ();

fn from_request(request: &'a Request<'r>) -> request::Outcome<Moderator, ()> {
let user = request.guard::<User>()?;
async fn from_request(request: &'a Request<'r>) -> request::Outcome<Self, Self::Error> {
let user = try_outcome!(User::from_request(request).await);
if user.is_moderator() {
Outcome::Success(Moderator(user))
} else {


+ 20
- 16
plume-models/src/api_tokens.rs View File

@@ -76,32 +76,36 @@ pub enum TokenError {
DbError,
}

#[rocket::async_trait]
impl<'a, 'r> FromRequest<'a, 'r> for ApiToken {
type Error = TokenError;

fn from_request(request: &'a Request<'r>) -> request::Outcome<ApiToken, TokenError> {
async fn from_request(request: &'a Request<'r>) -> request::Outcome<Self, Self::Error> {
let headers: Vec<_> = request.headers().get("Authorization").collect();
if headers.len() != 1 {
return Outcome::Failure((Status::BadRequest, TokenError::NoHeader));
}

let mut parsed_header = headers[0].split(' ');
let auth_type = parsed_header.next().map_or_else(
|| Outcome::Failure((Status::BadRequest, TokenError::NoType)),
Outcome::Success,
)?;
let val = parsed_header.next().map_or_else(
|| Outcome::Failure((Status::BadRequest, TokenError::NoValue)),
Outcome::Success,
)?;

if auth_type == "Bearer" {
let conn = request
.guard::<DbConn>()
.map_failure(|_| (Status::InternalServerError, TokenError::DbError))?;
if let Ok(token) = ApiToken::find_by_value(&*conn, val) {
return Outcome::Success(token);
if let Some(auth_type) = parsed_header.next() {
if let Some(val) = parsed_header.next() {
if auth_type == "Bearer" {
if let Outcome::Success(conn) = DbConn::from_request(request).await {
if let Ok(token) = ApiToken::find_by_value(&*conn, val) {
return Outcome::Success(token);
}
} else {
return Outcome::Failure((
Status::InternalServerError,
TokenError::DbError,
));
}
}
} else {
return Outcome::Failure((Status::BadRequest, TokenError::NoValue));
}
} else {
return Outcome::Failure((Status::BadRequest, TokenError::NoType));
}

Outcome::Forward(())


+ 7
- 6
plume-models/src/blogs.rs View File

@@ -20,7 +20,6 @@ use plume_common::activity_pub::{
inbox::{AsActor, FromId},
sign, ActivityStream, ApSignature, Id, IntoId, PublicKey, Source,
};
use serde_json;
use url::Url;
use webfinger::*;

@@ -71,7 +70,8 @@ impl Blog {
insert!(blogs, NewBlog, |inserted, conn| {
let instance = inserted.get_instance(conn)?;
if inserted.outbox_url.is_empty() {
inserted.outbox_url = instance.compute_box(BLOG_PREFIX, &inserted.actor_id, "outbox");
inserted.outbox_url =
instance.compute_box(BLOG_PREFIX, &inserted.actor_id, r#"outbox"#);
}

if inserted.inbox_url.is_empty() {
@@ -132,7 +132,7 @@ impl Blog {
.map_err(Error::from)
}

pub fn find_by_fqn(c: &PlumeRocket, fqn: &str) -> Result<Blog> {
pub async fn find_by_fqn(c: &PlumeRocket, fqn: &str) -> Result<Blog> {
let from_db = blogs::table
.filter(blogs::fqn.eq(fqn))
.first(&*c.conn)
@@ -140,12 +140,13 @@ impl Blog {
if let Some(from_db) = from_db {
Ok(from_db)
} else {
Blog::fetch_from_webfinger(c, fqn)
Blog::fetch_from_webfinger(c, fqn).await
}
}

fn fetch_from_webfinger(c: &PlumeRocket, acct: &str) -> Result<Blog> {
resolve_with_prefix(Prefix::Group, acct.to_owned(), true)?
async fn fetch_from_webfinger(c: &PlumeRocket, acct: &str) -> Result<Blog> {
resolve_with_prefix(Prefix::Group, acct.to_owned(), true)
.await?
.links
.into_iter()
.find(|l| l.mime_type == Some(String::from("application/activity+json")))


+ 8
- 8
plume-models/src/comments.rs View File

@@ -17,6 +17,7 @@ use activitypub::{
};
use chrono::{self, NaiveDateTime};
use diesel::{self, ExpressionMethods, QueryDsl, RunQueryDsl, SaveChangesDsl};
use futures::stream::{self, StreamExt};
use plume_common::{
activity_pub::{
inbox::{AsActor, AsObject, FromId},
@@ -24,7 +25,6 @@ use plume_common::{
},
utils,
};
use serde_json;
use std::collections::HashSet;

#[derive(Queryable, Identifiable, Clone, AsChangeset)]
@@ -105,7 +105,7 @@ impl Comment {
.unwrap_or(false)
}

pub fn to_activity(&self, c: &PlumeRocket) -> Result<Note> {
pub async fn to_activity(&self, c: &PlumeRocket) -> Result<Note> {
let author = User::get(&c.conn, self.author_id)?;
let (html, mentions, _hashtags) = utils::md_to_html(
self.content.get().as_ref(),
@@ -132,18 +132,18 @@ impl Comment {
note.object_props.set_attributed_to_link(author.into_id())?;
note.object_props.set_to_link_vec(to)?;
note.object_props.set_tag_link_vec(
mentions
.into_iter()
.filter_map(|m| Mention::build_activity(c, &m).ok())
.collect::<Vec<link::Mention>>(),
stream::iter(mentions)
.filter_map(|m| async move { Mention::build_activity(c, &m).await.ok() })
.collect::<Vec<link::Mention>>()
.await,
)?;
Ok(note)
}

pub fn create_activity(&self, c: &PlumeRocket) -> Result<Create> {
pub async fn create_activity(&self, c: &PlumeRocket) -> Result<Create> {
let author = User::get(&c.conn, self.author_id)?;

let note = self.to_activity(c)?;
let note = self.to_activity(c).await?;
let mut act = Create::default();
act.create_props.set_actor_link(author.into_id())?;
act.create_props.set_object_object(note.clone())?;


+ 6
- 6
plume-models/src/db_conn.rs View File

@@ -7,7 +7,7 @@ use diesel::{dsl::sql_query, ConnectionError, RunQueryDsl};
use rocket::{
http::Status,
request::{self, FromRequest},
Outcome, Request, State,
Outcome, Request,
};
use std::ops::Deref;

@@ -21,14 +21,14 @@ pub struct DbConn(pub PooledConnection<ConnectionManager<Connection>>);
/// Attempts to retrieve a single connection from the managed database pool. If
/// no pool is currently managed, fails with an `InternalServerError` status. If
/// no connections are available, fails with a `ServiceUnavailable` status.
#[rocket::async_trait]
impl<'a, 'r> FromRequest<'a, 'r> for DbConn {
type Error = ();

fn from_request(request: &'a Request<'r>) -> request::Outcome<Self, Self::Error> {
let pool = request.guard::<State<'_, DbPool>>()?;
match pool.get() {
Ok(conn) => Outcome::Success(DbConn(conn)),
Err(_) => Outcome::Failure((Status::ServiceUnavailable, ())),
async fn from_request(request: &'a Request<'r>) -> request::Outcome<Self, Self::Error> {
match DbConn::from_request(request).await {
Outcome::Success(a) => Outcome::Success(a),
_ => Outcome::Failure((Status::ServiceUnavailable, ())),
}
}
}


+ 2
- 1
plume-models/src/headers.rs View File

@@ -6,10 +6,11 @@ use rocket::{

pub struct Headers<'r>(pub HeaderMap<'r>);

#[rocket::async_trait]
impl<'a, 'r> FromRequest<'a, 'r> for Headers<'r> {
type Error = ();

fn from_request(request: &'a Request<'r>) -> request::Outcome<Self, ()> {
async fn from_request(request: &'a Request<'r>) -> request::Outcome<Self, ()> {
let mut headers = HeaderMap::new();
for header in request.headers().clone().into_iter() {
headers.add(header);


+ 0
- 1
plume-models/src/inbox.rs View File

@@ -1,5 +1,4 @@
use activitypub::activity::*;
use serde_json;

use crate::{
comments::Comment,


+ 7
- 0
plume-models/src/lib.rs View File

@@ -4,6 +4,7 @@

#[macro_use]
extern crate diesel;
extern crate futures;
#[macro_use]
extern crate lazy_static;
#[macro_use]
@@ -75,6 +76,12 @@ impl From<std::option::NoneError> for Error {
}
}

impl From<Error> for std::option::NoneError {
fn from(_: Error) -> Self {
std::option::NoneError
}
}

impl From<url::ParseError> for Error {
fn from(_: url::ParseError) -> Self {
Error::Url


+ 60
- 20
plume-models/src/lists.rs View File

@@ -7,8 +7,8 @@ use crate::{
use diesel::{self, ExpressionMethods, QueryDsl, RunQueryDsl};
use std::convert::{TryFrom, TryInto};

/// Represent what a list is supposed to store. Represented in database as an integer
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// Represent what a list is supposed to store. Represented in database as an integer
pub enum ListType {
User,
Blog,
@@ -58,7 +58,11 @@ struct NewList<'a> {
}

macro_rules! func {
(@elem User $id:expr, $value:expr) => {
(
$(#[$outer:meta])*
@elem User $id:expr, $value:expr
) => {
$(#[$outer])*
NewListElem {
list_id: $id,
user_id: Some(*$value),
@@ -66,7 +70,11 @@ macro_rules! func {
word: None,
}
};
(@elem Blog $id:expr, $value:expr) => {
(
$(#[$outer:meta])*
@elem Blog $id:expr, $value:expr
) => {
$(#[$outer])*
NewListElem {
list_id: $id,
user_id: None,
@@ -74,7 +82,11 @@ macro_rules! func {
word: None,
}
};
(@elem Word $id:expr, $value:expr) => {
(
$(#[$outer:meta])*
@elem Word $id:expr, $value:expr
) => {
$(#[$outer])*
NewListElem {
list_id: $id,
user_id: None,
@@ -82,7 +94,11 @@ macro_rules! func {
word: Some($value),
}
};
(@elem Prefix $id:expr, $value:expr) => {
(
$(#[$outer:meta])*
@elem Prefix $id:expr, $value:expr
) => {
$(#[$outer])*
NewListElem {
list_id: $id,
user_id: None,
@@ -99,7 +115,11 @@ macro_rules! func {
(@out_type Word) => { String };
(@out_type Prefix) => { String };

(add: $fn:ident, $kind:ident) => {
(
$(#[$outer:meta])*
add: $fn:ident, $kind:ident
) => {
$(#[$outer])*
pub fn $fn(&self, conn: &Connection, vals: &[func!(@in_type $kind)]) -> Result<()> {
if self.kind() != ListType::$kind {
return Err(Error::InvalidValue);
@@ -116,7 +136,11 @@ macro_rules! func {
}
};

(list: $fn:ident, $kind:ident, $table:ident) => {
(
$(#[$outer:meta])*
list: $fn:ident, $kind:ident, $table:ident
) => {
$(#[$outer])*
pub fn $fn(&self, conn: &Connection) -> Result<Vec<func!(@out_type $kind)>> {
if self.kind() != ListType::$kind {
return Err(Error::InvalidValue);
@@ -132,7 +156,11 @@ macro_rules! func {



(set: $fn:ident, $kind:ident, $add:ident) => {
(
$(#[$outer:meta])*
set: $fn:ident, $kind:ident, $add:ident
) => {
$(#[$outer])*
pub fn $fn(&self, conn: &Connection, val: &[func!(@in_type $kind)]) -> Result<()> {
if self.kind() != ListType::$kind {
return Err(Error::InvalidValue);
@@ -246,23 +274,35 @@ impl List {
private::ListElem::prefix_in_list(conn, self, word)
}

/// Insert new users in a list
func! {add: add_users, User}
func! {
/// Insert new users in a list
add: add_users, User
}

/// Insert new blogs in a list
func! {add: add_blogs, Blog}
func! {
/// Insert new blogs in a list
add: add_blogs, Blog
}

/// Insert new words in a list
func! {add: add_words, Word}
func! {
/// Insert new words in a list
add: add_words, Word
}

/// Insert new prefixes in a list
func! {add: add_prefixes, Prefix}
func! {
/// Insert new prefixes in a list
add: add_prefixes, Prefix
}

/// Get all users in the list
func! {list: list_users, User, users}
func! {
/// Get all users in the list
list: list_users, User, users
}

/// Get all blogs in the list
func! {list: list_blogs, Blog, blogs}
func! {
/// Get all blogs in the list
list: list_blogs, Blog, blogs
}

/// Get all words in the list
pub fn list_words(&self, conn: &Connection) -> Result<Vec<String>> {


+ 5
- 7
plume-models/src/medias.rs View File

@@ -10,8 +10,8 @@ use plume_common::{
activity_pub::{inbox::FromId, Id},
utils::MediaProcessor,
};
use reqwest;
use std::{fs, path::Path};
use tokio::prelude::*;

#[derive(Clone, Identifiable, Queryable)]
pub struct Media {
@@ -197,7 +197,7 @@ impl Media {
}

// TODO: merge with save_remote?
pub fn from_activity(c: &PlumeRocket, image: &Image) -> Result<Media> {
pub async fn from_activity(c: &PlumeRocket, image: &Image) -> Result<Media> {
let conn = &*c.conn;
let remote_url = image.object_props.url_string().ok()?;
let ext = remote_url
@@ -211,11 +211,9 @@ impl Media {
ext
));

let mut dest = fs::File::create(path.clone()).ok()?;
reqwest::get(remote_url.as_str())
.ok()?
.copy_to(&mut dest)
.ok()?;
let mut dest = tokio::fs::File::create(path.clone()).await?;
let contents = reqwest::get(remote_url.as_str()).await?.bytes().await?;
dest.write_all(&contents).await?;

Media::insert(
conn,


+ 2
- 2
plume-models/src/mentions.rs View File

@@ -52,8 +52,8 @@ impl Mention {
}
}

pub fn build_activity(c: &PlumeRocket, ment: &str) -> Result<link::Mention> {
let user = User::find_by_fqn(c, ment)?;
pub async fn build_activity(c: &PlumeRocket, ment: &str) -> Result<link::Mention> {
let user = User::find_by_fqn(c, ment).await?;
let mut mention = link::Mention::default();
mention.link_props.set_href_string(user.ap_url)?;
mention.link_props.set_name_string(format!("@{}", ment))?;


+ 30
- 14
plume-models/src/plume_rocket.rs View File

@@ -20,20 +20,35 @@ mod module {
pub flash_msg: Option<(String, String)>,
}

#[rocket::async_trait]
impl<'a, 'r> FromRequest<'a, 'r> for PlumeRocket {
type Error = ();

fn from_request(request: &'a Request<'r>) -> request::Outcome<PlumeRocket, ()> {
let conn = request.guard::<DbConn>()?;
let intl = request.guard::<rocket_i18n::I18n>()?;
let user = request.guard::<users::User>().succeeded();
let worker = request.guard::<'_, State<'_, Arc<ScheduledThreadPool>>>()?;
let searcher = request.guard::<'_, State<'_, Arc<search::Searcher>>>()?;
let flash_msg = request.guard::<FlashMessage<'_, '_>>().succeeded();
async fn from_request(request: &'a Request<'r>) -> request::Outcome<Self, Self::Error> {
let conn = DbConn::from_request(request).await.succeeded().unwrap();
let intl = rocket_i18n::I18n::from_request(request)
.await
.succeeded()
.unwrap();
let user = users::User::from_request(request)
.await
.succeeded()
.unwrap();
let worker = request
.guard::<State<'_, Arc<ScheduledThreadPool>>>()
.await
.succeeded()
.unwrap();
let searcher = request
.guard::<State<'_, Arc<search::Searcher>>>()
.await
.succeeded()
.unwrap();
let flash_msg = request.guard::<FlashMessage<'_, '_>>().await.succeeded();
Outcome::Success(PlumeRocket {
conn,
intl,
user,
user: Some(user),
flash_msg: flash_msg.map(|f| (f.name().into(), f.msg().into())),
worker: worker.clone(),
searcher: searcher.clone(),
@@ -60,17 +75,18 @@ mod module {
pub worker: Arc<ScheduledThreadPool>,
}

#[rocket::async_trait]
impl<'a, 'r> FromRequest<'a, 'r> for PlumeRocket {
type Error = ();

fn from_request(request: &'a Request<'r>) -> request::Outcome<PlumeRocket, ()> {
let conn = request.guard::<DbConn>()?;
let user = request.guard::<users::User>().succeeded();
let worker = request.guard::<'_, State<'_, Arc<ScheduledThreadPool>>>()?;
let searcher = request.guard::<'_, State<'_, Arc<search::Searcher>>>()?;
async fn from_request(request: &'a Request<'r>) -> request::Outcome<Self, Self::Error> {
let conn = try_outcome!(DbConn::from_request(request).await);
let user = try_outcome!(users::User::from_request(request).await);
let worker = try_outcome!(request.guard::<'_, State<'_, Arc<ScheduledThreadPool>>>());
let searcher = try_outcome!(request.guard::<'_, State<'_, Arc<search::Searcher>>>());
Outcome::Success(PlumeRocket {
conn,
user,
user: Some(user),
worker: worker.clone(),
searcher: searcher.clone(),
})


+ 17
- 12
plume-models/src/posts.rs View File

@@ -19,8 +19,8 @@ use plume_common::{
},
utils::md_to_html,
};
use serde_json;
use std::collections::HashSet;
use tokio::runtime::Runtime;

pub type LicensedArticle = CustomObject<Licensed, Article>;

@@ -579,11 +579,11 @@ impl FromId<PlumeRocket> for Post {
}
});

let cover = article
.object_props
.icon_object::<Image>()
.ok()
.and_then(|img| Media::from_activity(&c, &img).ok().map(|m| m.id));
let image = article.object_props.icon_object::<Image>().ok().unwrap();
let mut r = Runtime::new().unwrap();
let cover =
Some(r.block_on(async { Media::from_activity(&c, &image).await.ok().unwrap().id }));

let title = article.object_props.name_string()?;
let post = Post::insert(
@@ -699,17 +699,22 @@ impl FromId<PlumeRocket> for PostUpdate {
}

fn from_activity(c: &PlumeRocket, updated: LicensedArticle) -> Result<Self> {
let image = updated
.object
.object_props
.icon_object::<Image>()
.ok()
.unwrap();
let mut r = Runtime::new().unwrap();
let cover =
Some(r.block_on(async { Media::from_activity(&c, &image).await.ok().unwrap().id }));

Ok(PostUpdate {
ap_url: updated.object.object_props.id_string()?,
title: updated.object.object_props.name_string().ok(),
subtitle: updated.object.object_props.summary_string().ok(),
content: updated.object.object_props.content_string().ok(),
cover: updated
.object
.object_props
.icon_object::<Image>()
.ok()
.and_then(|img| Media::from_activity(&c, &img).ok().map(|m| m.id)),
cover,
source: updated
.object
.ap_object_props


+ 28
- 8
plume-models/src/timeline/query.rs View File

@@ -7,7 +7,9 @@ use crate::{
users::User,
PlumeRocket, Result,
};
use futures::stream::{self, StreamExt};
use plume_common::activity_pub::inbox::AsActor;
use tokio::runtime::Runtime;
use whatlang::{self, Lang};

#[derive(Debug, Clone, PartialEq)]
@@ -295,15 +297,33 @@ impl WithList {
}
}
List::Array(list) => match self {
WithList::Blog => Ok(list
.iter()
.filter_map(|b| Blog::find_by_fqn(rocket, b).ok())
.any(|b| b.id == post.blog_id)),
WithList::Blog => {
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
Ok(stream::iter(list)
.filter_map(|b| async move {
Some(Blog::find_by_fqn(rocket, b).await.ok().unwrap())
})
.collect::<Vec<_>>()
.await
.into_iter()
.any(|b| b.id == post.blog_id))
})
}
WithList::Author { boosts, likes } => match kind {
Kind::Original => Ok(list
.iter()
.filter_map(|a| User::find_by_fqn(rocket, a).ok())
.any(|a| post.is_author(&rocket.conn, a.id).unwrap_or(false))),
Kind::Original => {
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
Ok(stream::iter(list)
.filter_map(|a| async move {
Some(User::find_by_fqn(rocket, a).await.ok().unwrap())
})
.collect::<Vec<_>>()
.await
.into_iter()
.any(|a| post.is_author(&rocket.conn, a.id).unwrap_or(false)))
})
}
Kind::Reshare(u) => {
if *boosts {
Ok(list.iter().any(|user| &u.fqn == user))


+ 76
- 70
plume-models/src/users.rs View File

@@ -11,7 +11,6 @@ use activitypub::{
object::{Image, Tombstone},
Activity, CustomObject, Endpoint,
};
use bcrypt;
use chrono::{NaiveDateTime, Utc};
use diesel::{self, BelongingToDsl, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl};
use openssl::{
@@ -37,7 +36,6 @@ use rocket::{
outcome::IntoOutcome,
request::{self, FromRequest, Request},
};
use serde_json;
use std::{
cmp::PartialEq,
hash::{Hash, Hasher},
@@ -191,7 +189,7 @@ impl User {
.map_err(Error::from)
}

pub fn find_by_fqn(c: &PlumeRocket, fqn: &str) -> Result<User> {
pub async fn find_by_fqn(c: &PlumeRocket, fqn: &str) -> Result<User> {
let from_db = users::table
.filter(users::fqn.eq(fqn))
.first(&*c.conn)
@@ -199,12 +197,13 @@ impl User {
if let Some(from_db) = from_db {
Ok(from_db)
} else {
User::fetch_from_webfinger(c, fqn)
User::fetch_from_webfinger(c, fqn).await
}
}

fn fetch_from_webfinger(c: &PlumeRocket, acct: &str) -> Result<User> {
let link = resolve(acct.to_owned(), true)?
async fn fetch_from_webfinger(c: &PlumeRocket, acct: &str) -> Result<User> {
let link = resolve(acct.to_owned(), true)
.await?
.links
.into_iter()
.find(|l| l.mime_type == Some(String::from("application/activity+json")))
@@ -212,8 +211,9 @@ impl User {
User::from_id(c, link.href.as_ref()?, None).map_err(|(_, e)| e)
}

pub fn fetch_remote_interact_uri(acct: &str) -> Result<String> {
resolve(acct.to_owned(), true)?
pub async fn fetch_remote_interact_uri(acct: &str) -> Result<String> {
resolve(acct.to_owned(), true)
.await?
.links
.into_iter()
.find(|l| l.rel == "http://ostatus.org/schema/1.0/subscribe")
@@ -221,9 +221,9 @@ impl User {
.ok_or(Error::Webfinger)
}

fn fetch(url: &str) -> Result<CustomPerson> {
let mut res = ClientBuilder::new()
.connect_timeout(Some(std::time::Duration::from_secs(5)))
async fn fetch(url: &str) -> Result<CustomPerson> {
let res = ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(5))
.build()?
.get(url)
.header(
@@ -235,8 +235,9 @@ impl User {
.join(", "),
)?,
)
.send()?;
let text = &res.text()?;
.send()
.await?;
let text = &res.text().await?;
// without this workaround, publicKey is not correctly deserialized
let ap_sign = serde_json::from_str::<ApSignature>(text)?;
let mut json = serde_json::from_str::<CustomPerson>(text)?;
@@ -244,48 +245,48 @@ impl User {
Ok(json)
}

pub fn fetch_from_url(c: &PlumeRocket, url: &str) -> Result<User> {
User::fetch(url).and_then(|json| User::from_activity(c, json))
pub async fn fetch_from_url(c: &PlumeRocket, url: &str) -> Result<User> {
let json = User::fetch(url).await?;
User::from_activity(c, json)
}

pub fn refetch(&self, conn: &Connection) -> Result<()> {
User::fetch(&self.ap_url.clone()).and_then(|json| {
let avatar = Media::save_remote(
conn,
json.object
.object_props
.icon_image()?
.object_props
.url_string()?,
&self,
)
.ok();
pub async fn refetch(&self, conn: &Connection) -> Result<()> {
let json = User::fetch(&self.ap_url.clone()).await?;
let avatar = Media::save_remote(
conn,
json.object
.object_props
.icon_image()?
.object_props
.url_string()?,
&self,
)
.ok();

diesel::update(self)
.set((
users::username.eq(json.object.ap_actor_props.preferred_username_string()?),
users::display_name.eq(json.object.object_props.name_string()?),
users::outbox_url.eq(json.object.ap_actor_props.outbox_string()?),
users::inbox_url.eq(json.object.ap_actor_props.inbox_string()?),
users::summary.eq(SafeString::new(
&json
.object
.object_props
.summary_string()
.unwrap_or_default(),
)),
users::followers_endpoint.eq(json.object.ap_actor_props.followers_string()?),
users::avatar_id.eq(avatar.map(|a| a.id)),
users::last_fetched_date.eq(Utc::now().naive_utc()),
users::public_key.eq(json
.custom_props
.public_key_publickey()?
.public_key_pem_string()?),
))
.execute(conn)
.map(|_| ())
.map_err(Error::from)
})
diesel::update(self)
.set((
users::username.eq(json.object.ap_actor_props.preferred_username_string()?),
users::display_name.eq(json.object.object_props.name_string()?),
users::outbox_url.eq(json.object.ap_actor_props.outbox_string()?),
users::inbox_url.eq(json.object.ap_actor_props.inbox_string()?),
users::summary.eq(SafeString::new(
&json
.object
.object_props
.summary_string()
.unwrap_or_default(),
)),
users::followers_endpoint.eq(json.object.ap_actor_props.followers_string()?),
users::avatar_id.eq(avatar.map(|a| a.id)),
users::last_fetched_date.eq(Utc::now().naive_utc()),
users::public_key.eq(json
.custom_props
.public_key_publickey()?
.public_key_pem_string()?),
))
.execute(conn)
.map(|_| ())
.map_err(Error::from)
}

pub fn hash_pass(pass: &str) -> Result<String> {
@@ -356,9 +357,10 @@ impl User {
.set_part_of_link(Id::new(&self.outbox_url))?;
Ok(ActivityStream::new(coll))
}
fn fetch_outbox_page<T: Activity>(&self, url: &str) -> Result<(Vec<T>, Option<String>)> {
let mut res = ClientBuilder::new()
.connect_timeout(Some(std::time::Duration::from_secs(5)))

async fn fetch_outbox_page<T: Activity>(&self, url: &str) -> Result<(Vec<T>, Option<String>)> {
let res = ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(5))
.build()?
.get(url)
.header(
@@ -370,8 +372,9 @@ impl User {
.join(", "),
)?,
)
.send()?;
let text = &res.text()?;
.send()
.await?;
let text = &res.text().await?;
let json: serde_json::Value = serde_json::from_str(text)?;
let items = json["items"]
.as_array()
@@ -386,9 +389,9 @@ impl User {
};
Ok((items, next))
}
pub fn fetch_outbox<T: Activity>(&self) -> Result<Vec<T>> {
let mut res = ClientBuilder::new()
.connect_timeout(Some(std::time::Duration::from_secs(5)))
pub async fn fetch_outbox<T: Activity>(&self) -> Result<Vec<T>> {
let res = ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(5))
.build()?
.get(&self.outbox_url[..])
.header(
@@ -400,13 +403,14 @@ impl User {
.join(", "),
)?,
)
.send()?;
let text = &res.text()?;
.send()
.await?;
let text = &res.text().await?;
let json: serde_json::Value = serde_json::from_str(text)?;
if let Some(first) = json.get("first") {
let mut items: Vec<T> = Vec::new();
let mut next = first.as_str().unwrap().to_owned();
while let Ok((mut page, nxt)) = self.fetch_outbox_page(&next) {
while let Ok((mut page, nxt)) = self.fetch_outbox_page(&next).await {
if page.is_empty() {
break;
}
@@ -431,9 +435,9 @@ impl User {
}
}

pub fn fetch_followers_ids(&self) -> Result<Vec<String>> {
let mut res = ClientBuilder::new()
.connect_timeout(Some(std::time::Duration::from_secs(5)))
pub async fn fetch_followers_ids(&self) -> Result<Vec<String>> {
let res = ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(5))
.build()?
.get(&self.followers_endpoint[..])
.header(
@@ -445,8 +449,9 @@ impl User {
.join(", "),
)?,
)
.send()?;
let text = &res.text()?;
.send()
.await?;
let text = &res.text().await?;
let json: serde_json::Value = serde_json::from_str(text)?;
Ok(json["items"]
.as_array()
@@ -789,11 +794,12 @@ impl User {
}
}

#[rocket::async_trait]
impl<'a, 'r> FromRequest<'a, 'r> for User {
type Error = ();

fn from_request(request: &'a Request<'r>) -> request::Outcome<User, ()> {
let conn = request.guard::<DbConn>()?;
async fn from_request(request: &'a Request<'r>) -> request::Outcome<Self, Self::Error> {
let conn = try_outcome!(DbConn::from_request(request).await);
request
.cookies()
.get_private(AUTH_COOKIE)


+ 2
- 1
po/plume/ar.po View File

@@ -10,7 +10,8 @@ msgstr ""
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Plural-Forms: nplurals=6; plural=(n==0 ? 0 : n==1 ? 1 : n==2 ? 2 : n%100>=3 && n%100<=10 ? 3 : n%100>=11 && n%100<=99 ? 4 : 5);\n"
"Plural-Forms: nplurals=6; plural=(n==0 ? 0 : n==1 ? 1 : n==2 ? 2 : n%100>=3 "
"&& n%100<=10 ? 3 : n%100>=11 && n%100<=99 ? 4 : 5);\n"
"X-Crowdin-Project: plume\n"
"X-Crowdin-Language: ar\n"
"X-Crowdin-File: /master/po/plume/plume.pot\n"


+ 1
- 1
rust-toolchain View File

@@ -1 +1 @@
nightly-2020-01-15
nightly-2020-05-05

+ 3
- 1
src/api/authorization.rs View File

@@ -35,6 +35,7 @@ impl Scope for plume_models::posts::Post {

pub struct Authorization<A, S>(pub ApiToken, PhantomData<(A, S)>);

#[rocket::async_trait]
impl<'a, 'r, A, S> FromRequest<'a, 'r> for Authorization<A, S>
where
A: Action,
@@ -42,9 +43,10 @@ where
{
type Error = ();

fn from_request(request: &'a Request<'r>) -> request::Outcome<Authorization<A, S>, ()> {
async fn from_request(request: &'a Request<'r>) -> request::Outcome<Authorization<A, S>, ()> {
request
.guard::<ApiToken>()
.await
.map_failure(|_| (Status::Unauthorized, ()))
.and_then(|token| {
if token.can(A::to_str(), S::to_str()) {


+ 25
- 16
src/api/mod.rs View File

@@ -4,7 +4,6 @@ use rocket::{
response::{self, Responder},
};
use rocket_contrib::json::Json;
use serde_json;

use plume_common::utils::random_hex;
use plume_models::{api_tokens::*, apps::App, users::User, Error, PlumeRocket};
@@ -26,21 +25,31 @@ impl From<std::option::NoneError> for ApiError {
}
}

#[rocket::async_trait]
impl<'r> Responder<'r> for ApiError {
fn respond_to(self, req: &Request<'_>) -> response::Result<'r> {
async fn respond_to(self, req: &'r Request<'_>) -> response::Result<'r> {
match self.0 {
Error::NotFound => Json(json!({
"error": "Not found"
}))
.respond_to(req),
Error::Unauthorized => Json(json!({
"error": "You are not authorized to access this resource"
}))
.respond_to(req),
_ => Json(json!({
"error": "Server error"
}))
.respond_to(req),
Error::NotFound => {
Json(json!({
"error": "Not found"
}))
.respond_to(req)
.await
}
Error::Unauthorized => {
Json(json!({
"error": "You are not authorized to access this resource"
}))
.respond_to(req)
.await
}
_ => {
Json(json!({
"error": "Server error"
}))
.respond_to(req)
.await
}
}
}
}
@@ -55,14 +64,14 @@ pub struct OAuthRequest {
}

#[get("/oauth2?<query..>")]
pub fn oauth(
pub async fn oauth(
query: Form<OAuthRequest>,
rockets: PlumeRocket,
) -> Result<Json<serde_json::Value>, ApiError> {
let conn = &*rockets.conn;
let app = App::find_by_client_id(conn, &query.client_id)?;
if app.client_secret == query.client_secret {
if let Ok(user) = User::find_by_fqn(&rockets, &query.username) {
if let Ok(user) = User::find_by_fqn(&rockets, &query.username).await {
if user.auth(&query.password) {
let token = ApiToken::insert(
conn,


+ 2
- 2
src/api/posts.rs View File

@@ -98,7 +98,7 @@ pub fn list(
}

#[post("/posts", data = "<payload>")]
pub fn create(
pub async fn create(
auth: Authorization<Write, Post>,
payload: Json<NewPostData>,
rockets: PlumeRocket,
@@ -192,7 +192,7 @@ pub fn create(
for m in mentions.into_iter() {
Mention::from_activity(
&*conn,
&Mention::build_activity(&rockets, &m)?,
&Mention::build_activity(&rockets, &m).await?,
post.id,
true,
true,


+ 22
- 22
src/inbox.rs View File

@@ -9,9 +9,9 @@ use plume_models::{
use rocket::{data::*, http::Status, response::status, Outcome::*, Request};
use rocket_contrib::json::*;
use serde::Deserialize;
use std::io::Read;
use tokio::io::AsyncReadExt;

pub fn handle_incoming(
pub async fn handle_incoming(
rockets: PlumeRocket,
data: SignedJson<serde_json::Value>,
headers: Headers<'_>,
@@ -32,6 +32,7 @@ pub fn handle_incoming(
// maybe we just know an old key?
actor
.refetch(conn)
.await
.and_then(|_| User::get(conn, actor.id))
.and_then(|u| {
if verify_http_headers(&u, &headers.0, &sig).is_secure() || act.clone().verify(&u) {
@@ -73,32 +74,31 @@ impl<'a, T: Deserialize<'a>> FromData<'a> for SignedJson<T> {
type Owned = String;
type Borrowed = str;

fn transform(
r: &Request<'_>,
d: Data,
) -> Transform<rocket::data::Outcome<Self::Owned, Self::Error>> {
let size_limit = r.limits().get("json").unwrap_or(JSON_LIMIT);
let mut s = String::with_capacity(512);
match d.open().take(size_limit).read_to_string(&mut s) {
Ok(_) => Transform::Borrowed(Success(s)),
Err(e) => Transform::Borrowed(Failure((Status::BadRequest, JsonError::Io(e)))),
}
fn transform<'r>(r: &'r Request, d: Data) -> TransformFuture<'r, Self::Owned, Self::Error> {
Box::pin(async move {
let size_limit = r.limits().get("json").unwrap_or(JSON_LIMIT);
let mut s = String::with_capacity(512);
let outcome = match d.open().take(size_limit).read_to_string(&mut s).await {
Ok(_) => Success(s),
Err(e) => Failure((Status::BadRequest, JsonError::Io(e))),
};
Transform::Borrowed(outcome)
})
}

fn from_data(
_: &Request<'_>,
o: Transformed<'a, Self>,
) -> rocket::data::Outcome<Self, Self::Error> {
let string = o.borrowed()?;
match serde_json::from_str(&string) {
Ok(v) => Success(SignedJson(Digest::from_body(&string), Json(v))),
Err(e) => {
if e.is_data() {
Failure((Status::UnprocessableEntity, JsonError::Parse(string, e)))
} else {
Failure((Status::BadRequest, JsonError::Parse(string, e)))
) -> FromDataFuture<'a, Self, Self::Error> {
Box::pin(async move {
let string = try_outcome!(o.borrowed());
match serde_json::from_str(&string) {
Ok(v) => Success(SignedJson(Digest::from_body(&string), Json(v))),
Err(e) if e.is_data() => {
return Failure((Status::UnprocessableEntity, JsonError::Parse(string, e)))
}
Err(e) => Failure((Status::BadRequest, JsonError::Parse(string, e))),
}
}
})
}
}

+ 3
- 23
src/main.rs View File

@@ -1,16 +1,15 @@
#![allow(clippy::too_many_arguments)]
#![feature(decl_macro, proc_macro_hygiene, try_trait)]
#![feature(proc_macro_hygiene, try_trait)]

#[macro_use]
extern crate gettext_macros;
#[macro_use]
extern crate rocket;
#[macro_use]
extern crate runtime_fmt;
#[macro_use]
extern crate serde_json;
#[macro_use]
extern crate validator_derive;
extern crate validator;

use clap::App;
use diesel::r2d2::ConnectionManager;
@@ -21,7 +20,6 @@ use plume_models::{
search::{Searcher as UnmanagedSearcher, SearcherError},
Connection, Error, CONFIG,
};
use rocket_csrf::CsrfFairingBuilder;
use scheduled_thread_pool::ScheduledThreadPool;
use std::process::exit;
use std::sync::{Arc, Mutex};
@@ -275,25 +273,7 @@ Then try to restart Plume
.manage(dbpool)
.manage(Arc::new(workpool))
.manage(searcher)
.manage(include_i18n!())
.attach(
CsrfFairingBuilder::new()
.set_default_target(
"/csrf-violation?target=<uri>".to_owned(),
rocket::http::Method::Post,
)
.add_exceptions(vec![
("/inbox".to_owned(), "/inbox".to_owned(), None),
(
"/@/<name>/inbox".to_owned(),
"/@/<name>/inbox".to_owned(),
None,
),
("/api/<path..>".to_owned(), "/api/<path..>".to_owned(), None),
])
.finalize()
.expect("main: csrf fairing creation error"),
);
.manage(include_i18n!());

#[cfg(feature = "test")]
let rocket = rocket.mount("/test", routes![test_routes::health,]);


+ 31
- 18
src/routes/blogs.rs View File

@@ -19,10 +19,14 @@ use plume_models::{
};

#[get("/~/<name>?<page>", rank = 2)]
pub fn details(name: String, page: Option<Page>, rockets: PlumeRocket) -> Result<Ructe, ErrorPage> {
pub async fn details(
name: String,
page: Option<Page>,
rockets: PlumeRocket,
) -> Result<Ructe, ErrorPage> {
let page = page.unwrap_or_default();
let conn = &*rockets.conn;
let blog = Blog::find_by_fqn(&rockets, &name)?;
let blog = Blog::find_by_fqn(&rockets, &name).await?;
let posts = Post::blog_page(conn, &blog, page.limits())?;
let articles_count = Post::count_for_blog(conn, &blog)?;
let authors = &blog.list_authors(conn)?;
@@ -38,12 +42,12 @@ pub fn details(name: String, page: Option<Page>, rockets: PlumeRocket) -> Result
}

#[get("/~/<name>", rank = 1)]
pub fn activity_details(
pub async fn activity_details(
name: String,
rockets: PlumeRocket,
_ap: ApRequest,
) -> Option<ActivityStream<CustomGroup>> {
let blog = Blog::find_by_fqn(&rockets, &name).ok()?;
let blog = Blog::find_by_fqn(&rockets, &name).await?;
Some(ActivityStream::new(blog.to_activity(&*rockets.conn).ok()?))
}

@@ -83,7 +87,7 @@ fn valid_slug(title: &str) -> Result<(), ValidationError> {
}

#[post("/blogs/new", data = "<form>")]
pub fn create(form: LenientForm<NewBlogForm>, rockets: PlumeRocket) -> RespondOrRedirect {
pub async fn create(form: LenientForm<NewBlogForm>, rockets: PlumeRocket) -> RespondOrRedirect {
let slug = utils::make_actor_id(&form.title);
let conn = &*rockets.conn;
let intl = &rockets.intl.catalog;
@@ -93,7 +97,7 @@ pub fn create(form: LenientForm<NewBlogForm>, rockets: PlumeRocket) -> RespondOr
Ok(_) => ValidationErrors::new(),
Err(e) => e,
};
if Blog::find_by_fqn(&rockets, &slug).is_ok() {
if Blog::find_by_fqn(&rockets, &slug).await.is_ok() {
errors.add(
"title",
ValidationError {
@@ -143,9 +147,11 @@ pub fn create(form: LenientForm<NewBlogForm>, rockets: PlumeRocket) -> RespondOr
}

#[post("/~/<name>/delete")]
pub fn delete(name: String, rockets: PlumeRocket) -> RespondOrRedirect {
pub async fn delete(name: String, rockets: PlumeRocket) -> RespondOrRedirect {
let conn = &*rockets.conn;
let blog = Blog::find_by_fqn(&rockets, &name).expect("blog::delete: blog not found");
let blog = Blog::find_by_fqn(&rockets, &name)
.await
.expect("blog::delete: blog not found");

if rockets
.user
@@ -184,9 +190,9 @@ pub struct EditForm {
}

#[get("/~/<name>/edit")]
pub fn edit(name: String, rockets: PlumeRocket) -> Result<Ructe, ErrorPage> {
pub async fn edit(name: String, rockets: PlumeRocket) -> Result<Ructe, ErrorPage> {
let conn = &*rockets.conn;
let blog = Blog::find_by_fqn(&rockets, &name)?;
let blog = Blog::find_by_fqn(&rockets, &name).await?;
if rockets
.user
.clone()
@@ -233,14 +239,16 @@ fn check_media(conn: &Connection, id: i32, user: &User) -> bool {
}

#[put("/~/<name>/edit", data = "<form>")]
pub fn update(
pub async fn update(
name: String,
form: LenientForm<EditForm>,
rockets: PlumeRocket,
) -> RespondOrRedirect {
let conn = &*rockets.conn;
let intl = &rockets.intl.catalog;
let mut blog = Blog::find_by_fqn(&rockets, &name).expect("blog::update: blog not found");
let mut blog = Blog::find_by_fqn(&rockets, &name)
.await
.expect("blog::update: blog not found");
if !rockets
.user
.clone()
@@ -342,23 +350,28 @@ pub fn update(
}

#[get("/~/<name>/outbox")]
pub fn outbox(name: String, rockets: PlumeRocket) -> Option<ActivityStream<OrderedCollection>> {
let blog = Blog::find_by_fqn(&rockets, &name).ok()?;
pub async fn outbox(
name: String,
rockets: PlumeRocket,
) -> Option<ActivityStream<OrderedCollection>> {
let blog = Blog::find_by_fqn(&rockets, &name).await?;
Some(blog.outbox(&*rockets.conn).ok()?)
}

#[allow(unused_variables)]
#[get("/~/<name>/outbox?<page>")]
pub fn outbox_page(
pub async fn outbox_page(
name: String,
page: Page,
rockets: PlumeRocket,
) -> Option<ActivityStream<OrderedCollectionPage>> {
let blog = Blog::find_by_fqn(&rockets, &name).ok()?;
let blog = Blog::find_by_fqn(&rockets, &name).await?;
Some(blog.outbox_page(&*rockets.conn, page.limits()).ok()?)
}

#[get("/~/<name>/atom.xml")]
pub fn atom_feed(name: String, rockets: PlumeRocket) -> Option<Content<String>> {
let blog = Blog::find_by_fqn(&rockets, &name).ok()?;
pub async fn atom_feed(name: String, rockets: PlumeRocket) -> Option<Content<String>> {
let blog = Blog::find_by_fqn(&rockets, &name).await?;
let conn = &*rockets.conn;
let entries = Post::get_recents_for_blog(&*conn, &blog, 15).ok()?;
let uri = Instance::get_local()


+ 23
- 17
src/routes/comments.rs View File

@@ -22,13 +22,13 @@ use plume_models::{
#[derive(Default, FromForm, Debug, Validate)]
pub struct NewCommentForm {
pub responding_to: Option<i32>,
#[validate(length(min = "1", message = "Your comment can't be empty"))]
#[validate(length(min = 1, message = "Your comment can't be empty"))]
pub content: String,
pub warning: String,
}

#[post("/~/<blog_name>/<slug>/comment", data = "<form>")]
pub fn create(
pub async fn create(
blog_name: String,
slug: String,
form: LenientForm<NewCommentForm>,
@@ -36,10 +36,12 @@ pub fn create(
rockets: PlumeRocket,
) -> Result<Flash<Redirect>, Ructe> {
let conn = &*rockets.conn;
let blog = Blog::find_by_fqn(&rockets, &blog_name).expect("comments::create: blog error");
let blog = Blog::find_by_fqn(&rockets, &blog_name)
.await
.expect("comments::create: blog error");
let post = Post::find_by_slug(&*conn, &slug, blog.id).expect("comments::create: post error");
form.validate()
.map(|_| {
match form.validate() {
Ok(_ok) => {
let (html, mentions, _hashtags) = utils::md_to_html(
form.content.as_ref(),
Some(
@@ -66,6 +68,7 @@ pub fn create(
.expect("comments::create: insert error");
let new_comment = comm
.create_activity(&rockets)
.await
.expect("comments::create: activity error");

// save mentions
@@ -73,6 +76,7 @@ pub fn create(
Mention::from_activity(
&*conn,
&Mention::build_activity(&rockets, &ment)
.await
.expect("comments::create: build mention error"),
comm.id,
false,
@@ -90,14 +94,14 @@ pub fn create(
.worker
.execute(move || broadcast(&user_clone, new_comment, dest));

Flash::success(
Ok(Flash::success(
Redirect::to(
uri!(super::posts::details: blog = blog_name, slug = slug, responding_to = _),
),
i18n!(&rockets.intl.catalog, "Your comment has been posted."),
)
})
.map_err(|errors| {
))
}
Err(errors) => {
// TODO: de-duplicate this code
let comments = CommentTree::from_post(&*conn, &post, Some(&user))
.expect("comments::create: comments error");
@@ -106,7 +110,7 @@ pub fn create(
.responding_to
.and_then(|r| Comment::get(&*conn, r).ok());

render!(posts::details(
Err(render!(posts::details(
&rockets.to_context(),
post.clone(),
blog,
@@ -133,8 +137,9 @@ pub fn create(
post.get_authors(&*conn)
.expect("comments::create: authors error")[0]
.clone()
))
})
)))
}
}
}

#[post("/~/<blog>/<slug>/comment/<id>/delete")]
@@ -174,15 +179,16 @@ pub fn delete(
}

#[get("/~/<_blog>/<_slug>/comment/<id>")]
pub fn activity_pub(
pub async fn activity_pub(
_blog: String,
_slug: String,
id: i32,
_ap: ApRequest,
rockets: PlumeRocket,
) -> Option<ActivityStream<Note>> {
Comment::get(&*rockets.conn, id)
.and_then(|c| c.to_activity(&rockets))
.ok()
.map(ActivityStream::new)
let c = match Comment::get(&*rockets.conn, id) {
Ok(c) => c.to_activity(&rockets).await.ok(),
Err(_) => None,
};
c.map(ActivityStream::new)
}

+ 23
- 11
src/routes/errors.rs View File

@@ -1,6 +1,7 @@
use crate::template_utils::{IntoContext, Ructe};
use plume_models::{Error, PlumeRocket};
use rocket::{
request::FromRequest,
response::{self, Responder},
Request,
};
@@ -14,35 +15,46 @@ impl From<Error> for ErrorPage {
}
}

#[rocket::async_trait]
impl<'r> Responder<'r> for ErrorPage {
fn respond_to(self, req: &Request<'_>) -> response::Result<'r> {
let rockets = req.guard::<PlumeRocket>().unwrap();
async fn respond_to(self, req: &'r Request<'_>) -> response::Result<'r> {
let rockets = PlumeRocket::from_request(req).await.unwrap();

match self.0 {
Error::NotFound => render!(errors::not_found(&rockets.to_context())).respond_to(req),
Error::NotFound => {
render!(errors::not_found(&rockets.to_context()))
.respond_to(req)
.await
}
Error::Unauthorized => {
render!(errors::not_found(&rockets.to_context())).respond_to(req)
render!(errors::not_found(&rockets.to_context()))
.respond_to(req)
.await
}
_ => {
render!(errors::not_found(&rockets.to_context()))
.respond_to(req)
.await
}
_ => render!(errors::not_found(&rockets.to_context())).respond_to(req),
}
}
}

#[catch(404)]
pub fn not_found(req: &Request<'_>) -> Ructe {
let rockets = req.guard::<PlumeRocket>().unwrap();
pub async fn not_found(req: &Request<'_>) -> Ructe {
let rockets = req.guard::<PlumeRocket>().await.unwrap();
render!(errors::not_found(&rockets.to_context()))
}

#[catch(422)]
pub fn unprocessable_entity(req: &Request<'_>) -> Ructe {
let rockets = req.guard::<PlumeRocket>().unwrap();
pub async fn unprocessable_entity(req: &Request<'_>) -> Ructe {
let rockets = req.guard::<PlumeRocket>().await.unwrap();
render!(errors::unprocessable_entity(&rockets.to_context()))
}

#[catch(500)]
pub fn server_error(req: &Request<'_>) -> Ructe {
let rockets = req.guard::<PlumeRocket>().unwrap();
pub async fn server_error(req: &Request<'_>) -> Ructe {
let rockets = req.guard::<PlumeRocket>().await.unwrap();
render!(errors::server_error(&rockets.to_context()))
}



+ 11
- 7
src/routes/instance.rs View File

@@ -5,7 +5,7 @@ use rocket::{
use rocket_contrib::json::Json;
use rocket_i18n::I18n;
use scheduled_thread_pool::ScheduledThreadPool;
use serde_json;
use std::path::PathBuf;
use std::str::FromStr;
use validator::{Validate, ValidationErrors};

@@ -76,12 +76,12 @@ pub fn admin_mod(_mod: Moderator, rockets: PlumeRocket) -> Ructe {

#[derive(Clone, FromForm, Validate)]
pub struct InstanceSettingsForm {
#[validate(length(min = "1"))]
#[validate(length(min = 1))]
pub name: String,
pub open_registrations: bool,
pub short_description: SafeString,
pub long_description: SafeString,
#[validate(length(min = "1"))]
#[validate(length(min = 1))]
pub default_license: String,