Federated blogging application, thanks to ActivityPub https://joinplu.me
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

420 lines
15 KiB

  1. use crate::{
  2. config::SearchTokenizerConfig, db_conn::DbPool, instance::Instance, posts::Post, schema::posts,
  3. search::query::PlumeQuery, tags::Tag, Error, Result, CONFIG,
  4. };
  5. use chrono::{Datelike, Utc};
  6. use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
  7. use itertools::Itertools;
  8. use riker::actors::*;
  9. use std::{cmp, fs::create_dir_all, io, path::Path, sync::Arc, sync::Mutex};
  10. use tantivy::{
  11. collector::TopDocs, directory::MmapDirectory, schema::*, Index, IndexReader, IndexWriter,
  12. ReloadPolicy, TantivyError, Term,
  13. };
  14. use whatlang::{detect as detect_lang, Lang};
  15. #[derive(Debug)]
  16. pub enum SearcherError {
  17. IndexCreationError,
  18. WriteLockAcquisitionError,
  19. IndexOpeningError,
  20. IndexEditionError,
  21. InvalidIndexDataError,
  22. }
  23. pub struct Searcher {
  24. index: Index,
  25. reader: IndexReader,
  26. writer: Mutex<Option<IndexWriter>>,
  27. dbpool: DbPool,
  28. }
  29. #[derive(Clone, Debug)]
  30. pub struct AddDocument(Post);
  31. #[derive(Clone, Debug)]
  32. pub struct UpdateDocument(pub Post);
  33. #[derive(Clone, Debug)]
  34. pub struct DeleteDocument(Post);
  35. #[actor(AddDocument, UpdateDocument)]
  36. pub struct SearcherActor(Searcher);
  37. impl Actor for SearcherActor {
  38. type Msg = SearcherActorMsg;
  39. // forwards Msg to the respective Receive<T> implementation
  40. fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
  41. self.receive(ctx, msg, sender);
  42. }
  43. }
  44. impl Receive<AddDocument> for SearcherActor {
  45. type Msg = SearcherActorMsg;
  46. fn receive(&mut self, _ctx: &Context<Self::Msg>, msg: AddDocument, _sender: Sender) {
  47. let _ = self.0.add_document(&msg.0);
  48. }
  49. }
  50. impl Receive<UpdateDocument> for SearcherActor {
  51. type Msg = SearcherActorMsg;
  52. fn receive(&mut self, _ctx: &Context<Self::Msg>, msg: UpdateDocument, _sender: Sender) {
  53. let _ = self.0.update_document(&msg.0);
  54. }
  55. }
  56. impl ActorFactoryArgs<Arc<Searcher>> for SearcherActor {
  57. fn create_args(searcher: Arc<Searcher>) -> Self {
  58. SearcherActor(Arc::try_unwrap(searcher).ok().unwrap())
  59. }
  60. }
  61. impl Searcher {
  62. /// Initializes a new `Searcher`, ready to be used by
  63. /// Plume.
  64. ///
  65. /// The main task of this function is to try everything
  66. /// to get a valid `Searcher`:
  67. ///
  68. /// - first it tries to open the search index normally (using the options from `CONFIG`)
  69. /// - if it fails, it makes a back-up of the index files, deletes the original ones,
  70. /// and recreate the whole index. It removes the backup only if the re-creation
  71. /// succeeds.
  72. ///
  73. /// # Panics
  74. ///
  75. /// This function panics if it needs to create a backup and it can't, or if it fails
  76. /// to recreate the search index.
  77. ///
  78. /// After that, it can also panic if there are still errors remaining.
  79. ///
  80. /// The panic messages are normally explicit enough for a human to
  81. /// understand how to fix the issue when they see it.
  82. pub fn new(db_pool: DbPool) -> Self {
  83. // We try to open the index a first time
  84. let searcher = match Self::open(
  85. &CONFIG.search_index,
  86. db_pool.clone(),
  87. &CONFIG.search_tokenizers,
  88. ) {
  89. // The index may be corrupted, inexistent or use an older format.
  90. // In this case, we can easily recover by deleting and re-creating it.
  91. Err(Error::Search(SearcherError::InvalidIndexDataError)) => {
  92. if Self::create(
  93. &CONFIG.search_index,
  94. db_pool.clone(),
  95. &CONFIG.search_tokenizers,
  96. )
  97. .is_err()
  98. {
  99. let current_path = Path::new(&CONFIG.search_index);
  100. let backup_path =
  101. format!("{}.{}", &current_path.display(), Utc::now().timestamp());
  102. let backup_path = Path::new(&backup_path);
  103. std::fs::rename(current_path, backup_path)
  104. .expect("Error while backing up search index directory for re-creation");
  105. if Self::create(
  106. &CONFIG.search_index,
  107. db_pool.clone(),
  108. &CONFIG.search_tokenizers,
  109. )
  110. .is_ok()
  111. {
  112. if std::fs::remove_dir_all(backup_path).is_err() {
  113. eprintln!(
  114. "error on removing backup directory: {}. it remains",
  115. backup_path.display()
  116. );
  117. }
  118. } else {
  119. panic!("Error while re-creating search index in new index format. Remove search index and run `plm search init` manually.");
  120. }
  121. }
  122. Self::open(&CONFIG.search_index, db_pool, &CONFIG.search_tokenizers)
  123. }
  124. // If it opened successfully or if it was another kind of
  125. // error (that we don't know how to handle), don't do anything more
  126. other => other,
  127. };
  128. // At this point, if there are still errors, we just panic
  129. #[allow(clippy::match_wild_err_arm)]
  130. match searcher {
  131. Err(Error::Search(e)) => match e {
  132. SearcherError::WriteLockAcquisitionError => panic!(
  133. r#"
  134. Your search index is locked. Plume can't start. To fix this issue
  135. make sure no other Plume instance is started, and run:
  136. plm search unlock
  137. Then try to restart Plume.
  138. "#
  139. ),
  140. SearcherError::IndexOpeningError => panic!(
  141. r#"
  142. Plume was unable to open the search index. If you created the index
  143. before, make sure to run Plume in the same directory it was created in, or
  144. to set SEARCH_INDEX accordingly. If you did not yet create the search
  145. index, run this command:
  146. plm search init
  147. Then try to restart Plume
  148. "#
  149. ),
  150. e => Err(e).unwrap(),
  151. },
  152. Err(_) => panic!("Unexpected error while opening search index"),
  153. Ok(s) => s,
  154. }
  155. }
  156. pub fn schema() -> Schema {
  157. let tag_indexing = TextOptions::default().set_indexing_options(
  158. TextFieldIndexing::default()
  159. .set_tokenizer("tag_tokenizer")
  160. .set_index_option(IndexRecordOption::Basic),
  161. );
  162. let content_indexing = TextOptions::default().set_indexing_options(
  163. TextFieldIndexing::default()
  164. .set_tokenizer("content_tokenizer")
  165. .set_index_option(IndexRecordOption::WithFreqsAndPositions),
  166. );
  167. let property_indexing = TextOptions::default().set_indexing_options(
  168. TextFieldIndexing::default()
  169. .set_tokenizer("property_tokenizer")
  170. .set_index_option(IndexRecordOption::WithFreqsAndPositions),
  171. );
  172. let mut schema_builder = SchemaBuilder::default();
  173. schema_builder.add_i64_field("post_id", STORED | INDEXED);
  174. schema_builder.add_i64_field("creation_date", INDEXED);
  175. schema_builder.add_text_field("instance", tag_indexing.clone());
  176. schema_builder.add_text_field("author", tag_indexing.clone());
  177. schema_builder.add_text_field("tag", tag_indexing);
  178. schema_builder.add_text_field("blog", content_indexing.clone());
  179. schema_builder.add_text_field("content", content_indexing.clone());
  180. schema_builder.add_text_field("subtitle", content_indexing.clone());
  181. schema_builder.add_text_field("title", content_indexing);
  182. schema_builder.add_text_field("lang", property_indexing.clone());
  183. schema_builder.add_text_field("license", property_indexing);
  184. schema_builder.build()
  185. }
  186. pub fn create(
  187. path: &dyn AsRef<Path>,
  188. dbpool: DbPool,
  189. tokenizers: &SearchTokenizerConfig,
  190. ) -> Result<Self> {
  191. let schema = Self::schema();
  192. create_dir_all(path).map_err(|_| SearcherError::IndexCreationError)?;
  193. let index = Index::create(
  194. MmapDirectory::open(path).map_err(|_| SearcherError::IndexCreationError)?,
  195. schema,
  196. )
  197. .map_err(|_| SearcherError::IndexCreationError)?;
  198. {
  199. let tokenizer_manager = index.tokenizers();
  200. tokenizer_manager.register("tag_tokenizer", tokenizers.tag_tokenizer);
  201. tokenizer_manager.register("content_tokenizer", tokenizers.content_tokenizer);
  202. tokenizer_manager.register("property_tokenizer", tokenizers.property_tokenizer);
  203. } //to please the borrow checker
  204. Ok(Self {
  205. writer: Mutex::new(Some(
  206. index
  207. .writer(50_000_000)
  208. .map_err(|_| SearcherError::WriteLockAcquisitionError)?,
  209. )),
  210. reader: index
  211. .reader_builder()
  212. .reload_policy(ReloadPolicy::Manual)
  213. .try_into()
  214. .map_err(|_| SearcherError::IndexCreationError)?,
  215. index,
  216. dbpool,
  217. })
  218. }
  219. pub fn open(
  220. path: &dyn AsRef<Path>,
  221. dbpool: DbPool,
  222. tokenizers: &SearchTokenizerConfig,
  223. ) -> Result<Self> {
  224. let mut index =
  225. Index::open(MmapDirectory::open(path).map_err(|_| SearcherError::IndexOpeningError)?)
  226. .map_err(|_| SearcherError::IndexOpeningError)?;
  227. {
  228. let tokenizer_manager = index.tokenizers();
  229. tokenizer_manager.register("tag_tokenizer", tokenizers.tag_tokenizer);
  230. tokenizer_manager.register("content_tokenizer", tokenizers.content_tokenizer);
  231. tokenizer_manager.register("property_tokenizer", tokenizers.property_tokenizer);
  232. } //to please the borrow checker
  233. let writer = index
  234. .writer(50_000_000)
  235. .map_err(|_| SearcherError::WriteLockAcquisitionError)?;
  236. // Since Tantivy v0.12.0, IndexWriter::garbage_collect_files() returns Future.
  237. // To avoid conflict with Plume async project, we don't introduce async now.
  238. // After async is introduced to Plume, we can use garbage_collect_files() again.
  239. // Algorithm stolen from Tantivy's SegmentUpdater::list_files()
  240. use std::collections::HashSet;
  241. use std::path::PathBuf;
  242. let mut files: HashSet<PathBuf> = index
  243. .list_all_segment_metas()
  244. .into_iter()
  245. .flat_map(|segment_meta| segment_meta.list_files())
  246. .collect();
  247. files.insert(Path::new("meta.json").to_path_buf());
  248. index
  249. .directory_mut()
  250. .garbage_collect(|| files)
  251. .map_err(|_| SearcherError::IndexEditionError)?;
  252. Ok(Self {
  253. writer: Mutex::new(Some(writer)),
  254. reader: index
  255. .reader_builder()
  256. .reload_policy(ReloadPolicy::Manual)
  257. .try_into()
  258. .map_err(|e| {
  259. if let TantivyError::IOError(err) = e {
  260. let err: io::Error = err.into();
  261. if err.kind() == io::ErrorKind::InvalidData {
  262. // Search index was created in older Tantivy format.
  263. SearcherError::InvalidIndexDataError
  264. } else {
  265. SearcherError::IndexCreationError
  266. }
  267. } else {
  268. SearcherError::IndexCreationError
  269. }
  270. })?,
  271. index,
  272. dbpool,
  273. })
  274. }
  275. pub fn add_document(&self, post: &Post) -> Result<()> {
  276. if !post.published {
  277. return Ok(());
  278. }
  279. let schema = self.index.schema();
  280. let post_id = schema.get_field("post_id").unwrap();
  281. let creation_date = schema.get_field("creation_date").unwrap();
  282. let instance = schema.get_field("instance").unwrap();
  283. let author = schema.get_field("author").unwrap();
  284. let tag = schema.get_field("tag").unwrap();
  285. let blog_name = schema.get_field("blog").unwrap();
  286. let content = schema.get_field("content").unwrap();
  287. let subtitle = schema.get_field("subtitle").unwrap();
  288. let title = schema.get_field("title").unwrap();
  289. let lang = schema.get_field("lang").unwrap();
  290. let license = schema.get_field("license").unwrap();
  291. let conn = match self.dbpool.get() {
  292. Ok(c) => c,
  293. Err(_) => return Err(Error::DbPool),
  294. };
  295. let mut writer = self.writer.lock().unwrap();
  296. let writer = writer.as_mut().unwrap();
  297. writer.add_document(doc!(
  298. post_id => i64::from(post.id),
  299. author => post.get_authors(&conn)?.into_iter().map(|u| u.fqn).join(" "),
  300. creation_date => i64::from(post.creation_date.num_days_from_ce()),
  301. instance => Instance::get(&conn, post.get_blog(&conn)?.instance_id)?.public_domain,
  302. tag => Tag::for_post(&conn, post.id)?.into_iter().map(|t| t.tag).join(" "),
  303. blog_name => post.get_blog(&conn)?.title,
  304. content => post.content.get().clone(),
  305. subtitle => post.subtitle.clone(),
  306. title => post.title.clone(),
  307. lang => detect_lang(post.content.get()).and_then(|i| if i.is_reliable() { Some(i.lang()) } else {None} ).unwrap_or(Lang::Eng).name(),
  308. license => post.license.clone(),
  309. ));
  310. Ok(())
  311. }
  312. pub fn delete_document(&self, post: &Post) {
  313. let schema = self.index.schema();
  314. let post_id = schema.get_field("post_id").unwrap();
  315. let doc_id = Term::from_field_i64(post_id, i64::from(post.id));
  316. let mut writer = self.writer.lock().unwrap();
  317. let writer = writer.as_mut().unwrap();
  318. writer.delete_term(doc_id);
  319. }
  320. pub fn update_document(&self, post: &Post) -> Result<()> {
  321. self.delete_document(post);
  322. self.add_document(post)
  323. }
  324. pub fn search_document(&self, query: PlumeQuery, (min, max): (i32, i32)) -> Vec<Post> {
  325. let schema = self.index.schema();
  326. let post_id = schema.get_field("post_id").unwrap();
  327. let collector = TopDocs::with_limit(cmp::max(1, max) as usize);
  328. let searcher = self.reader.searcher();
  329. let res = searcher.search(&query.into_query(), &collector).unwrap();
  330. let conn = match self.dbpool.get() {
  331. Ok(c) => c,
  332. Err(_) => return Vec::new(),
  333. };
  334. res.get(min as usize..)
  335. .unwrap_or(&[])
  336. .iter()
  337. .filter_map(|(_, doc_add)| {
  338. let doc = searcher.doc(*doc_add).ok()?;
  339. let id = doc.get_first(post_id)?;
  340. Post::get(&conn, id.i64_value() as i32).ok()
  341. //borrow checker don't want me to use filter_map or and_then here
  342. })
  343. .collect()
  344. }
  345. pub fn fill(&self) -> Result<()> {
  346. let conn = match self.dbpool.get() {
  347. Ok(c) => c,
  348. Err(_) => return Err(Error::DbPool),
  349. };
  350. for post in posts::table
  351. .filter(posts::published.eq(true))
  352. .load::<Post>(&conn)?
  353. {
  354. self.update_document(&post)?
  355. }
  356. Ok(())
  357. }
  358. pub fn commit(&self) {
  359. let mut writer = self.writer.lock().unwrap();
  360. writer.as_mut().unwrap().commit().unwrap();
  361. self.reader.reload().unwrap();
  362. }
  363. pub fn drop_writer(&self) {
  364. self.writer.lock().unwrap().take();
  365. }
  366. }