plume-model: refactor Searcher to have its own DbPool #809
10 zmienionych plików z 226 dodań i 155 usunięć
|
@ -1,8 +1,7 @@
|
|||
use dotenv;
|
||||
|
||||
use clap::App;
|
||||
use diesel::Connection;
|
||||
use plume_models::{instance::Instance, Connection as Conn, CONFIG};
|
||||
use plume_models::{db_conn::init_pool, instance::Instance};
|
||||
use std::io::{self, prelude::*};
|
||||
|
||||
mod instance;
|
||||
|
@ -26,22 +25,27 @@ fn main() {
|
|||
Err(ref e) if e.not_found() => eprintln!("no .env was found"),
|
||||
e => e.map(|_| ()).unwrap(),
|
||||
}
|
||||
let conn = Conn::establish(CONFIG.database_url.as_str());
|
||||
let _ = conn.as_ref().map(|conn| Instance::cache_local(conn));
|
||||
let db_pool = init_pool()
|
||||
.expect("Couldn't create a database pool, please check DATABASE_URL in your .env");
|
||||
let _ = db_pool
|
||||
.get()
|
||||
.as_ref()
|
||||
.map(|conn| Instance::cache_local(conn));
|
||||
|
||||
match matches.subcommand() {
|
||||
("instance", Some(args)) => {
|
||||
instance::run(args, &conn.expect("Couldn't connect to the database."))
|
||||
}
|
||||
("migration", Some(args)) => {
|
||||
migration::run(args, &conn.expect("Couldn't connect to the database."))
|
||||
}
|
||||
("search", Some(args)) => {
|
||||
search::run(args, &conn.expect("Couldn't connect to the database."))
|
||||
}
|
||||
("users", Some(args)) => {
|
||||
users::run(args, &conn.expect("Couldn't connect to the database."))
|
||||
}
|
||||
("instance", Some(args)) => instance::run(
|
||||
args,
|
||||
&db_pool.get().expect("Couldn't connect to the database."),
|
||||
),
|
||||
("migration", Some(args)) => migration::run(
|
||||
args,
|
||||
&db_pool.get().expect("Couldn't connect to the database."),
|
||||
),
|
||||
("search", Some(args)) => search::run(args, db_pool),
|
||||
("users", Some(args)) => users::run(
|
||||
args,
|
||||
&db_pool.get().expect("Couldn't connect to the database."),
|
||||
),
|
||||
_ => app.print_help().expect("Couldn't print help"),
|
||||
};
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use clap::{App, Arg, ArgMatches, SubCommand};
|
||||
|
||||
use plume_models::{search::Searcher, Connection, CONFIG};
|
||||
use plume_models::{db_conn::DbPool, search::Searcher, CONFIG};
|
||||
use std::fs::{read_dir, remove_file};
|
||||
use std::io::ErrorKind;
|
||||
use std::path::Path;
|
||||
|
@ -52,7 +52,7 @@ pub fn command<'a, 'b>() -> App<'a, 'b> {
|
|||
)
|
||||
}
|
||||
|
||||
pub fn run<'a>(args: &ArgMatches<'a>, conn: &Connection) {
|
||||
pub fn run<'a>(args: &ArgMatches<'a>, conn: DbPool) {
|
||||
let conn = conn;
|
||||
match args.subcommand() {
|
||||
("init", Some(x)) => init(x, conn),
|
||||
|
@ -63,7 +63,7 @@ pub fn run<'a>(args: &ArgMatches<'a>, conn: &Connection) {
|
|||
}
|
||||
}
|
||||
|
||||
fn init<'a>(args: &ArgMatches<'a>, conn: &Connection) {
|
||||
fn init<'a>(args: &ArgMatches<'a>, db_pool: DbPool) {
|
||||
let path = args
|
||||
.value_of("path")
|
||||
.map(|p| Path::new(p).join("search_index"))
|
||||
|
@ -82,8 +82,8 @@ fn init<'a>(args: &ArgMatches<'a>, conn: &Connection) {
|
|||
}
|
||||
};
|
||||
if can_do || force {
|
||||
let searcher = Searcher::create(&path, &CONFIG.search_tokenizers).unwrap();
|
||||
refill(args, conn, Some(searcher));
|
||||
let searcher = Searcher::create(&path, db_pool.clone(), &CONFIG.search_tokenizers).unwrap();
|
||||
refill(args, db_pool, Some(searcher));
|
||||
} else {
|
||||
eprintln!(
|
||||
"Can't create new index, {} exist and is not empty",
|
||||
|
@ -92,16 +92,16 @@ fn init<'a>(args: &ArgMatches<'a>, conn: &Connection) {
|
|||
}
|
||||
}
|
||||
|
||||
fn refill<'a>(args: &ArgMatches<'a>, conn: &Connection, searcher: Option<Searcher>) {
|
||||
fn refill<'a>(args: &ArgMatches<'a>, db_pool: DbPool, searcher: Option<Searcher>) {
|
||||
let path = args.value_of("path");
|
||||
let path = match path {
|
||||
Some(path) => Path::new(path).join("search_index"),
|
||||
None => Path::new(&CONFIG.search_index).to_path_buf(),
|
||||
};
|
||||
let searcher =
|
||||
searcher.unwrap_or_else(|| Searcher::open(&path, &CONFIG.search_tokenizers).unwrap());
|
||||
let searcher = searcher
|
||||
.unwrap_or_else(|| Searcher::open(&path, db_pool, &CONFIG.search_tokenizers).unwrap());
|
||||
|
||||
searcher.fill(conn).expect("Couldn't import post");
|
||||
searcher.fill().expect("Couldn't import post");
|
||||
println!("Commiting result");
|
||||
searcher.commit();
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::Connection;
|
||||
use crate::{instance::Instance, Connection, CONFIG};
|
||||
use diesel::r2d2::{
|
||||
ConnectionManager, CustomizeConnection, Error as ConnError, Pool, PooledConnection,
|
||||
};
|
||||
|
@ -13,6 +13,20 @@ use std::ops::Deref;
|
|||
|
||||
pub type DbPool = Pool<ConnectionManager<Connection>>;
|
||||
|
||||
/// Initializes a database pool.
|
||||
pub fn init_pool() -> Option<DbPool> {
|
||||
let manager = ConnectionManager::<Connection>::new(CONFIG.database_url.as_str());
|
||||
let mut builder = DbPool::builder()
|
||||
.connection_customizer(Box::new(PragmaForeignKey))
|
||||
.min_idle(CONFIG.db_min_idle);
|
||||
if let Some(max_size) = CONFIG.db_max_size {
|
||||
builder = builder.max_size(max_size);
|
||||
};
|
||||
let pool = builder.build(manager).ok()?;
|
||||
Instance::cache_local(&pool.get().unwrap());
|
||||
Some(pool)
|
||||
}
|
||||
|
||||
// From rocket documentation
|
||||
|
||||
// Connection request guard type: a wrapper around an r2d2 pooled connection.
|
||||
|
|
|
@ -40,6 +40,7 @@ pub enum Error {
|
|||
Io(std::io::Error),
|
||||
MissingApProperty,
|
||||
NotFound,
|
||||
DbPool,
|
||||
Request,
|
||||
SerDe,
|
||||
Search(search::SearcherError),
|
||||
|
@ -303,6 +304,10 @@ mod tests {
|
|||
db_conn::DbConn((*DB_POOL).get().unwrap())
|
||||
}
|
||||
|
||||
pub fn pool<'a>() -> db_conn::DbPool {
|
||||
(*DB_POOL).clone()
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref DB_POOL: db_conn::DbPool = {
|
||||
let pool = db_conn::DbPool::builder()
|
||||
|
|
|
@ -2,7 +2,7 @@ pub use self::module::PlumeRocket;
|
|||
|
||||
#[cfg(not(test))]
|
||||
mod module {
|
||||
use crate::{db_conn::DbConn, search, users};
|
||||
use crate::{db_conn::DbConn, users};
|
||||
use rocket::{
|
||||
request::{self, FlashMessage, FromRequest, Request},
|
||||
Outcome, State,
|
||||
|
@ -15,7 +15,6 @@ mod module {
|
|||
pub conn: DbConn,
|
||||
pub intl: rocket_i18n::I18n,
|
||||
pub user: Option<users::User>,
|
||||
pub searcher: Arc<search::Searcher>,
|
||||
pub worker: Arc<ScheduledThreadPool>,
|
||||
pub flash_msg: Option<(String, String)>,
|
||||
}
|
||||
|
@ -28,7 +27,6 @@ mod module {
|
|||
let intl = request.guard::<rocket_i18n::I18n>()?;
|
||||
let user = request.guard::<users::User>().succeeded();
|
||||
let worker = request.guard::<'_, State<'_, Arc<ScheduledThreadPool>>>()?;
|
||||
let searcher = request.guard::<'_, State<'_, Arc<search::Searcher>>>()?;
|
||||
let flash_msg = request.guard::<FlashMessage<'_, '_>>().succeeded();
|
||||
Outcome::Success(PlumeRocket {
|
||||
conn,
|
||||
|
@ -36,7 +34,6 @@ mod module {
|
|||
user,
|
||||
flash_msg: flash_msg.map(|f| (f.name().into(), f.msg().into())),
|
||||
worker: worker.clone(),
|
||||
searcher: searcher.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -56,7 +53,6 @@ mod module {
|
|||
pub struct PlumeRocket {
|
||||
pub conn: DbConn,
|
||||
pub user: Option<users::User>,
|
||||
pub searcher: Arc<search::Searcher>,
|
||||
pub worker: Arc<ScheduledThreadPool>,
|
||||
}
|
||||
|
||||
|
@ -67,12 +63,10 @@ mod module {
|
|||
let conn = request.guard::<DbConn>()?;
|
||||
let user = request.guard::<users::User>().succeeded();
|
||||
let worker = request.guard::<'_, State<'_, Arc<ScheduledThreadPool>>>()?;
|
||||
let searcher = request.guard::<'_, State<'_, Arc<search::Searcher>>>()?;
|
||||
Outcome::Success(PlumeRocket {
|
||||
conn,
|
||||
user,
|
||||
worker: worker.clone(),
|
||||
searcher: searcher.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,14 +78,14 @@ impl Post {
|
|||
let _: Post = post.save_changes(conn)?;
|
||||
}
|
||||
|
||||
searcher.add_document(conn, &post)?;
|
||||
searcher.add_document(&post)?;
|
||||
Ok(post)
|
||||
}
|
||||
|
||||
pub fn update(&self, conn: &Connection, searcher: &Searcher) -> Result<Self> {
|
||||
diesel::update(self).set(self).execute(conn)?;
|
||||
let post = Self::get(conn, self.id)?;
|
||||
searcher.update_document(conn, &post)?;
|
||||
searcher.update_document(&post)?;
|
||||
Ok(post)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ pub use self::tokenizer::TokenizerKind;
|
|||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use super::{Query, Searcher, TokenizerKind};
|
||||
use super::{Query, Searcher};
|
||||
use diesel::Connection;
|
||||
use plume_common::utils::random_hex;
|
||||
use std::env::temp_dir;
|
||||
|
@ -20,15 +20,16 @@ pub(crate) mod tests {
|
|||
posts::{NewPost, Post},
|
||||
safe_string::SafeString,
|
||||
tests::db,
|
||||
tests::pool,
|
||||
CONFIG,
|
||||
};
|
||||
|
||||
pub(crate) fn get_searcher(tokenizers: &SearchTokenizerConfig) -> Searcher {
|
||||
let dir = temp_dir().join(&format!("plume-test-{}", random_hex()));
|
||||
if dir.exists() {
|
||||
Searcher::open(&dir, tokenizers)
|
||||
Searcher::open(&dir, pool(), tokenizers)
|
||||
} else {
|
||||
Searcher::create(&dir, tokenizers)
|
||||
Searcher::create(&dir, pool(), tokenizers)
|
||||
}
|
||||
.unwrap()
|
||||
}
|
||||
|
@ -103,20 +104,20 @@ pub(crate) mod tests {
|
|||
fn open() {
|
||||
let dir = temp_dir().join(format!("plume-test-{}", random_hex()));
|
||||
{
|
||||
Searcher::create(&dir, &CONFIG.search_tokenizers).unwrap();
|
||||
Searcher::create(&dir, pool(), &CONFIG.search_tokenizers).unwrap();
|
||||
}
|
||||
Searcher::open(&dir, &CONFIG.search_tokenizers).unwrap();
|
||||
Searcher::open(&dir, pool(), &CONFIG.search_tokenizers).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn create() {
|
||||
let dir = temp_dir().join(format!("plume-test-{}", random_hex()));
|
||||
|
||||
assert!(Searcher::open(&dir, &CONFIG.search_tokenizers).is_err());
|
||||
assert!(Searcher::open(&dir, pool(), &CONFIG.search_tokenizers).is_err());
|
||||
{
|
||||
Searcher::create(&dir, &CONFIG.search_tokenizers).unwrap();
|
||||
Searcher::create(&dir, pool(), &CONFIG.search_tokenizers).unwrap();
|
||||
}
|
||||
Searcher::open(&dir, &CONFIG.search_tokenizers).unwrap(); //verify it's well created
|
||||
Searcher::open(&dir, pool(), &CONFIG.search_tokenizers).unwrap(); //verify it's well created
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -158,7 +159,7 @@ pub(crate) mod tests {
|
|||
|
||||
searcher.commit();
|
||||
assert_eq!(
|
||||
searcher.search_document(conn, Query::from_str(&title).unwrap(), (0, 1))[0].id,
|
||||
searcher.search_document(Query::from_str(&title).unwrap(), (0, 1))[0].id,
|
||||
post.id
|
||||
);
|
||||
|
||||
|
@ -167,17 +168,17 @@ pub(crate) mod tests {
|
|||
post.update(conn, &searcher).unwrap();
|
||||
searcher.commit();
|
||||
assert_eq!(
|
||||
searcher.search_document(conn, Query::from_str(&newtitle).unwrap(), (0, 1))[0].id,
|
||||
searcher.search_document(Query::from_str(&newtitle).unwrap(), (0, 1))[0].id,
|
||||
post.id
|
||||
);
|
||||
assert!(searcher
|
||||
.search_document(conn, Query::from_str(&title).unwrap(), (0, 1))
|
||||
.search_document(Query::from_str(&title).unwrap(), (0, 1))
|
||||
.is_empty());
|
||||
|
||||
post.delete(conn, &searcher).unwrap();
|
||||
searcher.commit();
|
||||
assert!(searcher
|
||||
.search_document(conn, Query::from_str(&newtitle).unwrap(), (0, 1))
|
||||
.search_document(Query::from_str(&newtitle).unwrap(), (0, 1))
|
||||
.is_empty());
|
||||
Ok(())
|
||||
});
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
use crate::{
|
||||
config::SearchTokenizerConfig, instance::Instance, posts::Post, schema::posts,
|
||||
search::query::PlumeQuery, tags::Tag, Connection, Result,
|
||||
config::SearchTokenizerConfig, db_conn::DbPool, instance::Instance, posts::Post, schema::posts,
|
||||
search::query::PlumeQuery, tags::Tag, Error, Result, CONFIG,
|
||||
};
|
||||
use chrono::Datelike;
|
||||
use chrono::{Datelike, Utc};
|
||||
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
|
||||
use itertools::Itertools;
|
||||
use rocket::request::{self, FromRequest, Outcome, Request, State};
|
||||
use std::{cmp, fs::create_dir_all, io, path::Path, sync::Mutex};
|
||||
use tantivy::{
|
||||
collector::TopDocs, directory::MmapDirectory, schema::*, Index, IndexReader, IndexWriter,
|
||||
|
@ -25,9 +26,110 @@ pub struct Searcher {
|
|||
index: Index,
|
||||
reader: IndexReader,
|
||||
writer: Mutex<Option<IndexWriter>>,
|
||||
dbpool: DbPool,
|
||||
}
|
||||
|
||||
impl Searcher {
|
||||
/// Initializes a new `Searcher`, ready to be used by
|
||||
/// Plume.
|
||||
///
|
||||
/// The main task of this function is to try everything
|
||||
/// to get a valid `Searcher`:
|
||||
///
|
||||
/// - first it tries to open the search index normally (using the options from `CONFIG`)
|
||||
/// - if it fails, it makes a back-up of the index files, deletes the original ones,
|
||||
/// and recreate the whole index. It removes the backup only if the re-creation
|
||||
/// succeeds.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if it needs to create a backup and it can't, or if it fails
|
||||
/// to recreate the search index.
|
||||
///
|
||||
/// After that, it can also panic if there are still errors remaining.
|
||||
///
|
||||
/// The panic messages are normally explicit enough for a human to
|
||||
/// understand how to fix the issue when they see it.
|
||||
pub fn new(db_pool: DbPool) -> Self {
|
||||
// We try to open the index a first time
|
||||
let searcher = match Self::open(
|
||||
&CONFIG.search_index,
|
||||
db_pool.clone(),
|
||||
&CONFIG.search_tokenizers,
|
||||
) {
|
||||
// The index may be corrupted, inexistent or use an older format.
|
||||
// In this case, we can easily recover by deleting and re-creating it.
|
||||
Err(Error::Search(SearcherError::InvalidIndexDataError)) => {
|
||||
if Self::create(
|
||||
&CONFIG.search_index,
|
||||
db_pool.clone(),
|
||||
&CONFIG.search_tokenizers,
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
let current_path = Path::new(&CONFIG.search_index);
|
||||
let backup_path =
|
||||
format!("{}.{}", ¤t_path.display(), Utc::now().timestamp());
|
||||
let backup_path = Path::new(&backup_path);
|
||||
std::fs::rename(current_path, backup_path)
|
||||
.expect("Error while backing up search index directory for re-creation");
|
||||
if Self::create(
|
||||
&CONFIG.search_index,
|
||||
db_pool.clone(),
|
||||
&CONFIG.search_tokenizers,
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
if std::fs::remove_dir_all(backup_path).is_err() {
|
||||
eprintln!(
|
||||
"error on removing backup directory: {}. it remains",
|
||||
backup_path.display()
|
||||
);
|
||||
}
|
||||
} else {
|
||||
panic!("Error while re-creating search index in new index format. Remove search index and run `plm search init` manually.");
|
||||
}
|
||||
}
|
||||
Self::open(&CONFIG.search_index, db_pool, &CONFIG.search_tokenizers)
|
||||
}
|
||||
// If it opened successfully or if it was another kind of
|
||||
// error (that we don't know how to handle), don't do anything more
|
||||
other => other,
|
||||
};
|
||||
|
||||
// At this point, if there are still errors, we just panic
|
||||
#[allow(clippy::match_wild_err_arm)]
|
||||
match searcher {
|
||||
Err(Error::Search(e)) => match e {
|
||||
SearcherError::WriteLockAcquisitionError => panic!(
|
||||
r#"
|
||||
Your search index is locked. Plume can't start. To fix this issue
|
||||
make sure no other Plume instance is started, and run:
|
||||
|
||||
plm search unlock
|
||||
|
||||
Then try to restart Plume.
|
||||
"#
|
||||
),
|
||||
SearcherError::IndexOpeningError => panic!(
|
||||
r#"
|
||||
Plume was unable to open the search index. If you created the index
|
||||
before, make sure to run Plume in the same directory it was created in, or
|
||||
to set SEARCH_INDEX accordingly. If you did not yet create the search
|
||||
index, run this command:
|
||||
|
||||
plm search init
|
||||
|
||||
Then try to restart Plume
|
||||
"#
|
||||
),
|
||||
e => Err(e).unwrap(),
|
||||
},
|
||||
Err(_) => panic!("Unexpected error while opening search index"),
|
||||
Ok(s) => s,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn schema() -> Schema {
|
||||
let tag_indexing = TextOptions::default().set_indexing_options(
|
||||
TextFieldIndexing::default()
|
||||
|
@ -67,7 +169,11 @@ impl Searcher {
|
|||
schema_builder.build()
|
||||
}
|
||||
|
||||
pub fn create(path: &dyn AsRef<Path>, tokenizers: &SearchTokenizerConfig) -> Result<Self> {
|
||||
pub fn create(
|
||||
path: &dyn AsRef<Path>,
|
||||
dbpool: DbPool,
|
||||
tokenizers: &SearchTokenizerConfig,
|
||||
) -> Result<Self> {
|
||||
let schema = Self::schema();
|
||||
|
||||
create_dir_all(path).map_err(|_| SearcherError::IndexCreationError)?;
|
||||
|
@ -95,10 +201,15 @@ impl Searcher {
|
|||
.try_into()
|
||||
.map_err(|_| SearcherError::IndexCreationError)?,
|
||||
index,
|
||||
dbpool,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn open(path: &dyn AsRef<Path>, tokenizers: &SearchTokenizerConfig) -> Result<Self> {
|
||||
pub fn open(
|
||||
path: &dyn AsRef<Path>,
|
||||
dbpool: DbPool,
|
||||
tokenizers: &SearchTokenizerConfig,
|
||||
) -> Result<Self> {
|
||||
let mut index =
|
||||
Index::open(MmapDirectory::open(path).map_err(|_| SearcherError::IndexOpeningError)?)
|
||||
.map_err(|_| SearcherError::IndexOpeningError)?;
|
||||
|
@ -150,10 +261,11 @@ impl Searcher {
|
|||
}
|
||||
})?,
|
||||
index,
|
||||
dbpool,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn add_document(&self, conn: &Connection, post: &Post) -> Result<()> {
|
||||
pub fn add_document(&self, post: &Post) -> Result<()> {
|
||||
if !post.published {
|
||||
return Ok(());
|
||||
}
|
||||
|
@ -175,15 +287,19 @@ impl Searcher {
|
|||
let lang = schema.get_field("lang").unwrap();
|
||||
let license = schema.get_field("license").unwrap();
|
||||
|
||||
let conn = match self.dbpool.get() {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(Error::DbPool),
|
||||
};
|
||||
let mut writer = self.writer.lock().unwrap();
|
||||
let writer = writer.as_mut().unwrap();
|
||||
writer.add_document(doc!(
|
||||
post_id => i64::from(post.id),
|
||||
author => post.get_authors(conn)?.into_iter().map(|u| u.fqn).join(" "),
|
||||
author => post.get_authors(&conn)?.into_iter().map(|u| u.fqn).join(" "),
|
||||
creation_date => i64::from(post.creation_date.num_days_from_ce()),
|
||||
instance => Instance::get(conn, post.get_blog(conn)?.instance_id)?.public_domain,
|
||||
tag => Tag::for_post(conn, post.id)?.into_iter().map(|t| t.tag).join(" "),
|
||||
blog_name => post.get_blog(conn)?.title,
|
||||
instance => Instance::get(&conn, post.get_blog(&conn)?.instance_id)?.public_domain,
|
||||
tag => Tag::for_post(&conn, post.id)?.into_iter().map(|t| t.tag).join(" "),
|
||||
blog_name => post.get_blog(&conn)?.title,
|
||||
content => post.content.get().clone(),
|
||||
subtitle => post.subtitle.clone(),
|
||||
title => post.title.clone(),
|
||||
|
@ -203,17 +319,12 @@ impl Searcher {
|
|||
writer.delete_term(doc_id);
|
||||
}
|
||||
|
||||
pub fn update_document(&self, conn: &Connection, post: &Post) -> Result<()> {
|
||||
pub fn update_document(&self, post: &Post) -> Result<()> {
|
||||
self.delete_document(post);
|
||||
self.add_document(conn, post)
|
||||
self.add_document(post)
|
||||
}
|
||||
|
||||
pub fn search_document(
|
||||
&self,
|
||||
conn: &Connection,
|
||||
query: PlumeQuery,
|
||||
(min, max): (i32, i32),
|
||||
) -> Vec<Post> {
|
||||
pub fn search_document(&self, query: PlumeQuery, (min, max): (i32, i32)) -> Vec<Post> {
|
||||
let schema = self.index.schema();
|
||||
let post_id = schema.get_field("post_id").unwrap();
|
||||
|
||||
|
@ -222,24 +333,33 @@ impl Searcher {
|
|||
let searcher = self.reader.searcher();
|
||||
let res = searcher.search(&query.into_query(), &collector).unwrap();
|
||||
|
||||
let conn = match self.dbpool.get() {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Vec::new(),
|
||||
};
|
||||
|
||||
res.get(min as usize..)
|
||||
.unwrap_or(&[])
|
||||
.iter()
|
||||
.filter_map(|(_, doc_add)| {
|
||||
let doc = searcher.doc(*doc_add).ok()?;
|
||||
let id = doc.get_first(post_id)?;
|
||||
Post::get(conn, id.i64_value() as i32).ok()
|
||||
Post::get(&conn, id.i64_value() as i32).ok()
|
||||
//borrow checker don't want me to use filter_map or and_then here
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn fill(&self, conn: &Connection) -> Result<()> {
|
||||
pub fn fill(&self) -> Result<()> {
|
||||
let conn = match self.dbpool.get() {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(Error::DbPool),
|
||||
};
|
||||
for post in posts::table
|
||||
.filter(posts::published.eq(true))
|
||||
.load::<Post>(conn)?
|
||||
.load::<Post>(&conn)?
|
||||
{
|
||||
self.update_document(conn, &post)?
|
||||
self.update_document(&post)?
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -254,3 +374,12 @@ impl Searcher {
|
|||
self.writer.lock().unwrap().take();
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, 'r> FromRequest<'a, 'r> for Searcher {
|
||||
type Error = ();
|
||||
|
||||
fn from_request(request: &'a Request<'r>) -> request::Outcome<Searcher, Self::Error> {
|
||||
let searcher = request.guard::<State<'_, Searcher>>()?;
|
||||
Outcome::Success(*searcher.inner())
|
||||
}
|
||||
}
|
||||
|
|
93
src/main.rs
93
src/main.rs
|
@ -10,20 +10,10 @@ extern crate serde_json;
|
|||
#[macro_use]
|
||||
extern crate validator_derive;
|
||||
|
||||
use chrono::Utc;
|
||||
use clap::App;
|
||||
use diesel::r2d2::ConnectionManager;
|
||||
use plume_models::{
|
||||
db_conn::{DbPool, PragmaForeignKey},
|
||||
instance::Instance,
|
||||
migrations::IMPORTED_MIGRATIONS,
|
||||
search::{Searcher as UnmanagedSearcher, SearcherError},
|
||||
Connection, Error, CONFIG,
|
||||
};
|
||||
use plume_models::{db_conn::init_pool, migrations::IMPORTED_MIGRATIONS, search::Searcher, CONFIG};
|
||||
use rocket_csrf::CsrfFairingBuilder;
|
||||
use scheduled_thread_pool::ScheduledThreadPool;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::process::exit;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
@ -48,26 +38,6 @@ include!(concat!(env!("OUT_DIR"), "/templates.rs"));
|
|||
|
||||
compile_i18n!();
|
||||
|
||||
/// Initializes a database pool.
|
||||
fn init_pool() -> Option<DbPool> {
|
||||
match dotenv::dotenv() {
|
||||
Ok(path) => println!("Configuration read from {}", path.display()),
|
||||
Err(ref e) if e.not_found() => eprintln!("no .env was found"),
|
||||
e => e.map(|_| ()).unwrap(),
|
||||
}
|
||||
|
||||
let manager = ConnectionManager::<Connection>::new(CONFIG.database_url.as_str());
|
||||
let mut builder = DbPool::builder()
|
||||
.connection_customizer(Box::new(PragmaForeignKey))
|
||||
.min_idle(CONFIG.db_min_idle);
|
||||
if let Some(max_size) = CONFIG.db_max_size {
|
||||
builder = builder.max_size(max_size);
|
||||
};
|
||||
let pool = builder.build(manager).ok()?;
|
||||
Instance::cache_local(&pool.get().unwrap());
|
||||
Some(pool)
|
||||
}
|
||||
|
||||
fn main() {
|
||||
App::new("Plume")
|
||||
.bin_name("plume")
|
||||
|
@ -82,6 +52,13 @@ and https://docs.joinplu.me/installation/init for more info.
|
|||
"#,
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
match dotenv::dotenv() {
|
||||
Ok(path) => println!("Configuration read from {}", path.display()),
|
||||
Err(ref e) if e.not_found() => eprintln!("no .env was found"),
|
||||
e => e.map(|_| ()).unwrap(),
|
||||
}
|
||||
|
||||
let dbpool = init_pool().expect("main: database pool initialization error");
|
||||
if IMPORTED_MIGRATIONS
|
||||
.is_pending(&dbpool.get().unwrap())
|
||||
|
@ -100,60 +77,8 @@ Then try to restart Plume.
|
|||
)
|
||||
}
|
||||
let workpool = ScheduledThreadPool::with_name("worker {}", num_cpus::get());
|
||||
// we want a fast exit here, so
|
||||
let mut open_searcher =
|
||||
UnmanagedSearcher::open(&CONFIG.search_index, &CONFIG.search_tokenizers);
|
||||
if let Err(Error::Search(SearcherError::InvalidIndexDataError)) = open_searcher {
|
||||
if UnmanagedSearcher::create(&CONFIG.search_index, &CONFIG.search_tokenizers).is_err() {
|
||||
let current_path = Path::new(&CONFIG.search_index);
|
||||
let backup_path = format!("{}.{}", ¤t_path.display(), Utc::now().timestamp());
|
||||
let backup_path = Path::new(&backup_path);
|
||||
fs::rename(current_path, backup_path)
|
||||
.expect("main: error on backing up search index directory for recreating");
|
||||
if UnmanagedSearcher::create(&CONFIG.search_index, &CONFIG.search_tokenizers).is_ok() {
|
||||
if fs::remove_dir_all(backup_path).is_err() {
|
||||
eprintln!(
|
||||
"error on removing backup directory: {}. it remains",
|
||||
backup_path.display()
|
||||
);
|
||||
}
|
||||
} else {
|
||||
panic!("main: error on recreating search index in new index format. remove search index and run `plm search init` manually");
|
||||
}
|
||||
}
|
||||
open_searcher = UnmanagedSearcher::open(&CONFIG.search_index, &CONFIG.search_tokenizers);
|
||||
}
|
||||
#[allow(clippy::match_wild_err_arm)]
|
||||
let searcher = match open_searcher {
|
||||
Err(Error::Search(e)) => match e {
|
||||
SearcherError::WriteLockAcquisitionError => panic!(
|
||||
r#"
|
||||
Your search index is locked. Plume can't start. To fix this issue
|
||||
make sure no other Plume instance is started, and run:
|
||||
|
||||
plm search unlock
|
||||
|
||||
Then try to restart Plume.
|
||||
"#
|
||||
),
|
||||
SearcherError::IndexOpeningError => panic!(
|
||||
r#"
|
||||
Plume was unable to open the search index. If you created the index
|
||||
before, make sure to run Plume in the same directory it was created in, or
|
||||
to set SEARCH_INDEX accordingly. If you did not yet create the search
|
||||
index, run this command:
|
||||
|
||||
plm search init
|
||||
|
||||
Then try to restart Plume
|
||||
"#
|
||||
),
|
||||
e => Err(e).unwrap(),
|
||||
},
|
||||
Err(_) => panic!("Unexpected error while opening search index"),
|
||||
Ok(s) => Arc::new(s),
|
||||
};
|
||||
|
||||
let searcher = Arc::new(Searcher::new(dbpool.clone()));
|
||||
let commiter = searcher.clone();
|
||||
workpool.execute_with_fixed_delay(
|
||||
Duration::from_secs(5),
|
||||
|
|
|
@ -51,7 +51,6 @@ macro_rules! param_to_query {
|
|||
|
||||
#[get("/search?<query..>")]
|
||||
pub fn search(query: Option<Form<SearchQuery>>, rockets: PlumeRocket) -> Ructe {
|
||||
let conn = &*rockets.conn;
|
||||
let query = query.map(Form::into_inner).unwrap_or_default();
|
||||
let page = query.page.unwrap_or_default();
|
||||
let mut parsed_query =
|
||||
|
@ -72,7 +71,7 @@ pub fn search(query: Option<Form<SearchQuery>>, rockets: PlumeRocket) -> Ructe {
|
|||
} else {
|
||||
let res = rockets
|
||||
.searcher
|
||||
.search_document(&conn, parsed_query, page.limits());
|
||||
.search_document(parsed_query, page.limits());
|
||||
let next_page = if res.is_empty() { 0 } else { page.0 + 1 };
|
||||
render!(search::result(
|
||||
&rockets.to_context(),
|
||||
|
|
Ładowanie…
Dodaj tabelę
Odniesienie w nowym zgłoszeniu