Compare commits

...

16 Commits

11
Cargo.lock generated

@ -729,7 +729,7 @@ name = "dashmap"
version = "4.0.0-rc6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"once_cell 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"once_cell 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1218,7 +1218,7 @@ name = "futures-task"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"once_cell 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"once_cell 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -2325,7 +2325,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "once_cell"
version = "1.4.0"
version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -2696,6 +2696,7 @@ dependencies = [
"ldap3 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lindera-tantivy 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"migrations_internals 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"once_cell 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
"openssl 0.10.30 (registry+https://github.com/rust-lang/crates.io-index)",
"plume-api 0.4.0",
"plume-common 0.4.0",
@ -3868,7 +3869,7 @@ dependencies = [
"murmurhash32 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"notify 4.0.15 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
"once_cell 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"once_cell 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
"owned-read 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"owning_ref 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -4982,7 +4983,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum num-traits 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)" = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611"
"checksum num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
"checksum object 0.20.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1ab52be62400ca80aa00285d25253d7f7c437b7375c4de678f5405d3afe82ca5"
"checksum once_cell 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
"checksum once_cell 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
"checksum onig 5.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e4e723fc996fff1aeab8f62205f3e8528bf498bdd5eadb2784d2d31f30077947"
"checksum onig_sys 69.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3814583fad89f3c60ae0701d80e87e1fd3028741723deda72d0d4a0ecf0cb0db"
"checksum opaque-debug 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c"

@ -33,6 +33,7 @@ diesel-derive-newtype = "0.1.2"
glob = "0.3.0"
lindera-tantivy = { version = "0.1.3", optional = true }
riker = "0.4"
once_cell = "1.5.2"
[dependencies.chrono]
features = ["serde"]

@ -98,7 +98,6 @@ pub(crate) mod tests {
source: String::new(),
cover_id: None,
},
&rockets.searcher,
)
.unwrap();

@ -20,6 +20,8 @@ extern crate tantivy;
extern crate riker;
use plume_common::activity_pub::inbox::InboxError;
use riker::actors::{ActorSystem, ChannelRef};
use std::sync::Arc;
#[cfg(not(any(feature = "sqlite", feature = "postgres")))]
compile_error!("Either feature \"sqlite\" or \"postgres\" must be enabled for this crate.");
@ -32,6 +34,10 @@ pub type Connection = diesel::SqliteConnection;
#[cfg(all(not(feature = "sqlite"), feature = "postgres"))]
pub type Connection = diesel::PgConnection;
use once_cell::sync::OnceCell;
pub static ACTORS: OnceCell<Arc<ActorSystem>> = OnceCell::new();
pub static POST_CHAN: OnceCell<ChannelRef<crate::search::searcher::PostEvent>> = OnceCell::new();// TODO: define wrapper enum of Post
/// All the possible errors that can be encoutered in this crate
#[derive(Debug)]
pub enum Error {

@ -1,7 +1,17 @@
use crate::{
ap_url, blogs::Blog, instance::Instance, medias::Media, mentions::Mention, post_authors::*,
safe_string::SafeString, schema::posts, search::Searcher, search::UpdateDocument, tags::*,
timeline::*, users::User, Connection, Error, PlumeRocket, Result, CONFIG,
ap_url,
blogs::Blog,
instance::Instance,
medias::Media,
mentions::Mention,
post_authors::*,
safe_string::SafeString,
schema::posts,
search::{AddDocument, Searcher, UpdateDocument, PostEvent::*},
tags::*,
timeline::*,
users::User,
Connection, Error, PlumeRocket, Result, ACTORS, CONFIG,
};
use activitypub::{
activity::{Create, Delete, Update},
@ -65,7 +75,7 @@ impl Post {
find_by!(posts, find_by_ap_url, ap_url as &str);
last!(posts);
pub fn insert(conn: &Connection, new: NewPost, searcher: &Searcher) -> Result<Self> {
pub fn insert(conn: &Connection, new: NewPost) -> Result<Self> {
diesel::insert_into(posts::table)
.values(new)
.execute(conn)?;
@ -80,13 +90,22 @@ impl Post {
let _: Post = post.save_changes(conn)?;
}
searcher.add_document(&post)?;
if post.published {
let searcher_actor = ACTORS.get().unwrap().clone().select("/plume/user/searcher-actor").unwrap();
searcher_actor.try_tell(AddDocument(post.clone()), None);
}
Ok(post)
}
pub fn update(&self, conn: &Connection) -> Result<Self> {
diesel::update(self).set(self).execute(conn)?;
let post = Self::get(conn, self.id)?;
if post.published {
crate::POST_CHAN.get().unwrap().tell(Publish { msg: PostUpdated(post.clone()), topic: "post.updated".into() }, None);
}
Ok(post)
}
@ -561,7 +580,6 @@ impl FromId<PlumeRocket> for Post {
fn from_activity(c: &PlumeRocket, article: LicensedArticle) -> Result<Self> {
let conn = &*c.conn;
let searcher = &c.searcher;
let license = article.custom_props.license_string().unwrap_or_default();
let article = article.object;
@ -606,7 +624,6 @@ impl FromId<PlumeRocket> for Post {
source: article.ap_object_props.source_object::<Source>()?.content,
cover_id: cover,
},
searcher,
)?;
for author in authors {
@ -648,6 +665,9 @@ impl FromId<PlumeRocket> for Post {
Timeline::add_to_all_timelines(c, &post, Kind::Original)?;
let searcher_actor = ACTORS.get().unwrap().clone().select("/plume/user/searcher-actor").unwrap();
searcher_actor.try_tell(AddDocument(post.clone()), None);
Ok(post)
}
}
@ -729,7 +749,7 @@ impl AsObject<User, Update, &PlumeRocket> for PostUpdate {
fn activity(self, c: &PlumeRocket, actor: User, _id: &str) -> Result<()> {
let conn = &*c.conn;
let searcher_actor = &c.actors.select("searcher-actor").unwrap();
let searcher_actor = ACTORS.get().unwrap().clone().select("/plume/user/searcher-actor").unwrap();
let mut post = Post::from_id(c, &self.ap_url, None).map_err(|(_, e)| e)?;
if !post.is_author(conn, actor.id)? {
@ -835,7 +855,6 @@ mod tests {
source: "Hello".into(),
cover_id: None,
},
&r.searcher,
)
.unwrap();
PostAuthor::insert(

@ -1,5 +1,5 @@
mod query;
mod searcher;
pub mod searcher;
mod tokenizer;
pub use self::query::PlumeQuery as Query;
pub use self::searcher::*;
@ -145,9 +145,9 @@ pub(crate) mod tests {
source: "".to_owned(),
cover_id: None,
},
&searcher,
)
.unwrap();
searcher.add_document(&post).unwrap();
PostAuthor::insert(
conn,
NewPostAuthor {
@ -165,7 +165,7 @@ pub(crate) mod tests {
let newtitle = random_hex()[..8].to_owned();
post.title = newtitle.clone();
post.update(conn).unwrap();
searcher.update_document(&post).unwrap();
searcher.commit();
assert_eq!(
searcher.search_document(Query::from_str(&newtitle).unwrap(), (0, 1))[0].id,
@ -214,9 +214,9 @@ pub(crate) mod tests {
source: "".to_owned(),
cover_id: None,
},
&searcher,
)
.unwrap();
searcher.add_document(&post).unwrap();
searcher.commit();

@ -30,7 +30,7 @@ pub struct Searcher {
}
#[derive(Clone, Debug)]
pub struct AddDocument(Post);
pub struct AddDocument(pub Post);
#[derive(Clone, Debug)]
pub struct UpdateDocument(pub Post);
@ -38,37 +38,51 @@ pub struct UpdateDocument(pub Post);
#[derive(Clone, Debug)]
pub struct DeleteDocument(Post);
#[actor(AddDocument, UpdateDocument)]
pub struct SearcherActor(Searcher);
#[derive(Clone, Debug)]
pub enum PostEvent {
PostPublished(Post),
PostUpdated(Post),
PostDeleted(Post),
}
impl Actor for SearcherActor {
type Msg = SearcherActorMsg;
impl From<PostEvent> for Post {
fn from(event: PostEvent) -> Self {
use PostEvent::*;
// forwards Msg to the respective Receive<T> implementation
fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
self.receive(ctx, msg, sender);
match event {
PostPublished(post) => post,
PostUpdated(post) => post,
PostDeleted(post) => post,
}
}
}
impl Receive<AddDocument> for SearcherActor {
type Msg = SearcherActorMsg;
pub struct SearcherActor(Arc<Searcher>);
fn receive(&mut self, _ctx: &Context<Self::Msg>, msg: AddDocument, _sender: Sender) {
let _ = self.0.add_document(&msg.0);
impl Actor for SearcherActor {
type Msg = PostEvent;
// forwards Msg to the respective Receive<T> implementation
fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
self.receive(ctx, msg, sender);
}
}
impl Receive<UpdateDocument> for SearcherActor {
type Msg = SearcherActorMsg;
impl SearcherActor {
fn receive(&mut self, _ctx: &Context<PostEvent>, msg: PostEvent, _sender: Sender) {
use PostEvent::*;
fn receive(&mut self, _ctx: &Context<Self::Msg>, msg: UpdateDocument, _sender: Sender) {
let _ = self.0.update_document(&msg.0);
match msg {
PostPublished(post) => { self.0.add_document(&post).unwrap(); },
PostUpdated(post) => { self.0.update_document(&post).unwrap(); },
PostDeleted(post) => { self.0.delete_document(&post); },
}
}
}
impl ActorFactoryArgs<Arc<Searcher>> for SearcherActor {
fn create_args(searcher: Arc<Searcher>) -> Self {
SearcherActor(Arc::try_unwrap(searcher).ok().unwrap())
SearcherActor(searcher)
}
}

@ -408,7 +408,6 @@ mod tests {
source: "you must say GNU/Linux, not Linux!!!".to_string(),
cover_id: None,
},
&r.searcher,
)
.unwrap();
assert!(gnu_tl.matches(r, &gnu_post, Kind::Original).unwrap());
@ -428,7 +427,6 @@ mod tests {
source: "so is Microsoft".to_string(),
cover_id: None,
},
&r.searcher,
)
.unwrap();
assert!(!gnu_tl.matches(r, &non_free_post, Kind::Original).unwrap());
@ -481,7 +479,6 @@ mod tests {
subtitle: "".to_string(),
cover_id: None,
},
&r.searcher,
)
.unwrap();
assert!(my_tl.matches(r, &post, Kind::Original).unwrap()); // matches because of "blog in fav_blogs" (and there is no cover)
@ -503,7 +500,6 @@ mod tests {
subtitle: "".to_string(),
cover_id: None,
},
&r.searcher,
)
.unwrap();
assert!(!my_tl.matches(r, &post, Kind::Like(&users[1])).unwrap());
@ -549,7 +545,6 @@ mod tests {
source: "you must say GNU/Linux, not Linux!!!".to_string(),
cover_id: None,
},
&r.searcher,
)
.unwrap();
@ -568,7 +563,6 @@ mod tests {
source: "so is Microsoft".to_string(),
cover_id: None,
},
&r.searcher,
)
.unwrap();
@ -608,7 +602,6 @@ mod tests {
source: "you must say GNU/Linux, not Linux!!!".to_string(),
cover_id: None,
},
&r.searcher,
)
.unwrap();
gnu_post
@ -745,7 +738,6 @@ mod tests {
source: "you must say GNU/Linux, not Linux!!!".to_string(),
cover_id: None,
},
&r.searcher,
)
.unwrap();
gnu_post.update_tags(conn, vec![Tag::build_activity("free".to_owned()).unwrap()]).unwrap();
@ -779,7 +771,6 @@ mod tests {
source: "you must say GNU/Linux, not Linux!!!".to_string(),
cover_id: None,
},
&r.searcher,
)
.unwrap();

@ -7,7 +7,8 @@ use plume_api::posts::*;
use plume_common::{activity_pub::broadcast, utils::md_to_html};
use plume_models::{
blogs::Blog, db_conn::DbConn, instance::Instance, medias::Media, mentions::*, post_authors::*,
posts::*, safe_string::SafeString, tags::*, timeline::*, users::User, Error, PlumeRocket,
posts::*, safe_string::SafeString, tags::*, timeline::*, users::User,
Error, PlumeRocket,
};
#[get("/posts/<id>")]
@ -104,7 +105,6 @@ pub fn create(
rockets: PlumeRocket,
) -> Api<PostData> {
let conn = &*rockets.conn;
let search = &rockets.searcher;
let worker = &rockets.worker;
let author = User::get(conn, auth.0.user_id)?;
@ -154,7 +154,6 @@ pub fn create(
source: payload.source.clone(),
cover_id: payload.cover_id,
},
search,
)?;
PostAuthor::insert(

@ -14,8 +14,8 @@ extern crate riker;
use clap::App;
use plume_models::{
db_conn::init_pool, migrations::IMPORTED_MIGRATIONS, search::Searcher, search::SearcherActor,
CONFIG,
db_conn::init_pool, migrations::IMPORTED_MIGRATIONS, search::{Searcher, SearcherActor, PostEvent},
ACTORS, CONFIG, POST_CHAN,
};
use riker::actors::*;
use rocket_csrf::CsrfFairingBuilder;
@ -86,9 +86,15 @@ Then try to restart Plume.
let searcher = Arc::new(Searcher::new(dbpool.clone()));
let sys = SystemBuilder::new().name("plume").create().unwrap();
let _ = sys
let chan: ChannelRef<PostEvent> = channel("post_events", &sys).unwrap();
ACTORS.set(Arc::new(sys)).unwrap();
let searcher_actor = ACTORS
.get().unwrap()
.clone()
.actor_of_args::<SearcherActor, _>("searcher-actor", searcher.clone())
.unwrap();
POST_CHAN.set(chan).unwrap();
POST_CHAN.get().unwrap().tell(Subscribe { actor: Box::new(searcher_actor), topic: "*".into() }, None);
let commiter = searcher.clone();
workpool.execute_with_fixed_delay(
@ -235,7 +241,7 @@ Then try to restart Plume.
.manage(dbpool)
.manage(Arc::new(workpool))
.manage(searcher)
.manage(Arc::new(sys))
.manage(ACTORS.get().unwrap().clone())
.manage(include_i18n!())
.attach(
CsrfFairingBuilder::new()

@ -1,6 +1,5 @@
use chrono::Utc;
use heck::{CamelCase, KebabCase};
use riker::actors::*;
use rocket::request::LenientForm;
use rocket::response::{Flash, Redirect};
use rocket_i18n::I18n;
@ -27,7 +26,6 @@ use plume_models::{
post_authors::*,
posts::*,
safe_string::SafeString,
search::UpdateDocument,
tags::*,
timeline::*,
users::User,
@ -299,6 +297,8 @@ pub fn update(
post.source = form.content.clone();
post.license = form.license.clone();
post.cover_id = form.cover;
post.update(&*conn)
.expect("post::update: update error");
if post.published {
post.update_mentions(
@ -351,9 +351,6 @@ pub fn update(
}
}
let searcher_actor = rockets.actors.select("searcher-actor").unwrap();
searcher_actor.try_tell(UpdateDocument(post.clone()), None);
Flash::success(
Redirect::to(uri!(details: blog = blog, slug = new_slug, responding_to = _)),
i18n!(intl, "Your article has been updated."),
@ -476,7 +473,6 @@ pub fn create(
source: form.content.clone(),
cover_id: form.cover,
},
&rockets.searcher,
)
.expect("post::create: post save error");

Loading…
Cancel
Save