Federated blogging application, thanks to ActivityPub https://joinplu.me
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

286 lines
10 KiB

  1. use crate::{
  2. config::SearchTokenizerConfig, db_conn::DbPool, instance::Instance, posts::Post, schema::posts,
  3. search::query::PlumeQuery, tags::Tag, Connection, Result,
  4. };
  5. use chrono::Datelike;
  6. use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
  7. use itertools::Itertools;
  8. use riker::actors::*;
  9. use std::{cmp, fs::create_dir_all, io, path::Path, sync::Mutex};
  10. use tantivy::{
  11. collector::TopDocs, directory::MmapDirectory, schema::*, Index, IndexReader, IndexWriter,
  12. ReloadPolicy, TantivyError, Term,
  13. };
  14. use whatlang::{detect as detect_lang, Lang};
  15. #[derive(Debug)]
  16. pub enum SearcherError {
  17. IndexCreationError,
  18. WriteLockAcquisitionError,
  19. IndexOpeningError,
  20. IndexEditionError,
  21. InvalidIndexDataError,
  22. }
  23. pub struct Searcher {
  24. index: Index,
  25. reader: IndexReader,
  26. writer: Mutex<Option<IndexWriter>>,
  27. }
  28. impl Searcher {
  29. pub fn schema() -> Schema {
  30. let tag_indexing = TextOptions::default().set_indexing_options(
  31. TextFieldIndexing::default()
  32. .set_tokenizer("tag_tokenizer")
  33. .set_index_option(IndexRecordOption::Basic),
  34. );
  35. let content_indexing = TextOptions::default().set_indexing_options(
  36. TextFieldIndexing::default()
  37. .set_tokenizer("content_tokenizer")
  38. .set_index_option(IndexRecordOption::WithFreqsAndPositions),
  39. );
  40. let property_indexing = TextOptions::default().set_indexing_options(
  41. TextFieldIndexing::default()
  42. .set_tokenizer("property_tokenizer")
  43. .set_index_option(IndexRecordOption::WithFreqsAndPositions),
  44. );
  45. let mut schema_builder = SchemaBuilder::default();
  46. schema_builder.add_i64_field("post_id", STORED | INDEXED);
  47. schema_builder.add_i64_field("creation_date", INDEXED);
  48. schema_builder.add_text_field("instance", tag_indexing.clone());
  49. schema_builder.add_text_field("author", tag_indexing.clone());
  50. schema_builder.add_text_field("tag", tag_indexing);
  51. schema_builder.add_text_field("blog", content_indexing.clone());
  52. schema_builder.add_text_field("content", content_indexing.clone());
  53. schema_builder.add_text_field("subtitle", content_indexing.clone());
  54. schema_builder.add_text_field("title", content_indexing);
  55. schema_builder.add_text_field("lang", property_indexing.clone());
  56. schema_builder.add_text_field("license", property_indexing);
  57. schema_builder.build()
  58. }
  59. pub fn create(path: &dyn AsRef<Path>, tokenizers: &SearchTokenizerConfig) -> Result<Self> {
  60. let schema = Self::schema();
  61. create_dir_all(path).map_err(|_| SearcherError::IndexCreationError)?;
  62. let index = Index::create(
  63. MmapDirectory::open(path).map_err(|_| SearcherError::IndexCreationError)?,
  64. schema,
  65. )
  66. .map_err(|_| SearcherError::IndexCreationError)?;
  67. {
  68. let tokenizer_manager = index.tokenizers();
  69. tokenizer_manager.register("tag_tokenizer", tokenizers.tag_tokenizer);
  70. tokenizer_manager.register("content_tokenizer", tokenizers.content_tokenizer);
  71. tokenizer_manager.register("property_tokenizer", tokenizers.property_tokenizer);
  72. } //to please the borrow checker
  73. Ok(Self {
  74. writer: Mutex::new(Some(
  75. index
  76. .writer(50_000_000)
  77. .map_err(|_| SearcherError::WriteLockAcquisitionError)?,
  78. )),
  79. reader: index
  80. .reader_builder()
  81. .reload_policy(ReloadPolicy::Manual)
  82. .try_into()
  83. .map_err(|_| SearcherError::IndexCreationError)?,
  84. index,
  85. })
  86. }
  87. pub fn open(path: &dyn AsRef<Path>, tokenizers: &SearchTokenizerConfig) -> Result<Self> {
  88. let mut index =
  89. Index::open(MmapDirectory::open(path).map_err(|_| SearcherError::IndexOpeningError)?)
  90. .map_err(|_| SearcherError::IndexOpeningError)?;
  91. {
  92. let tokenizer_manager = index.tokenizers();
  93. tokenizer_manager.register("tag_tokenizer", tokenizers.tag_tokenizer);
  94. tokenizer_manager.register("content_tokenizer", tokenizers.content_tokenizer);
  95. tokenizer_manager.register("property_tokenizer", tokenizers.property_tokenizer);
  96. } //to please the borrow checker
  97. let writer = index
  98. .writer(50_000_000)
  99. .map_err(|_| SearcherError::WriteLockAcquisitionError)?;
  100. // Since Tantivy v0.12.0, IndexWriter::garbage_collect_files() returns Future.
  101. // To avoid conflict with Plume async project, we don't introduce async now.
  102. // After async is introduced to Plume, we can use garbage_collect_files() again.
  103. // Algorithm stolen from Tantivy's SegmentUpdater::list_files()
  104. use std::collections::HashSet;
  105. use std::path::PathBuf;
  106. let mut files: HashSet<PathBuf> = index
  107. .list_all_segment_metas()
  108. .into_iter()
  109. .flat_map(|segment_meta| segment_meta.list_files())
  110. .collect();
  111. files.insert(Path::new("meta.json").to_path_buf());
  112. index
  113. .directory_mut()
  114. .garbage_collect(|| files)
  115. .map_err(|_| SearcherError::IndexEditionError)?;
  116. Ok(Self {
  117. writer: Mutex::new(Some(writer)),
  118. reader: index
  119. .reader_builder()
  120. .reload_policy(ReloadPolicy::Manual)
  121. .try_into()
  122. .map_err(|e| {
  123. if let TantivyError::IOError(err) = e {
  124. let err: io::Error = err.into();
  125. if err.kind() == io::ErrorKind::InvalidData {
  126. // Search index was created in older Tantivy format.
  127. SearcherError::InvalidIndexDataError
  128. } else {
  129. SearcherError::IndexCreationError
  130. }
  131. } else {
  132. SearcherError::IndexCreationError
  133. }
  134. })?,
  135. index,
  136. })
  137. }
  138. pub fn add_document(&self, conn: &Connection, post: &Post) -> Result<()> {
  139. if !post.published {
  140. return Ok(());
  141. }
  142. let schema = self.index.schema();
  143. let post_id = schema.get_field("post_id").unwrap();
  144. let creation_date = schema.get_field("creation_date").unwrap();
  145. let instance = schema.get_field("instance").unwrap();
  146. let author = schema.get_field("author").unwrap();
  147. let tag = schema.get_field("tag").unwrap();
  148. let blog_name = schema.get_field("blog").unwrap();
  149. let content = schema.get_field("content").unwrap();
  150. let subtitle = schema.get_field("subtitle").unwrap();
  151. let title = schema.get_field("title").unwrap();
  152. let lang = schema.get_field("lang").unwrap();
  153. let license = schema.get_field("license").unwrap();
  154. let mut writer = self.writer.lock().unwrap();
  155. let writer = writer.as_mut().unwrap();
  156. writer.add_document(doc!(
  157. post_id => i64::from(post.id),
  158. author => post.get_authors(conn)?.into_iter().map(|u| u.fqn).join(" "),
  159. creation_date => i64::from(post.creation_date.num_days_from_ce()),
  160. instance => Instance::get(conn, post.get_blog(conn)?.instance_id)?.public_domain,
  161. tag => Tag::for_post(conn, post.id)?.into_iter().map(|t| t.tag).join(" "),
  162. blog_name => post.get_blog(conn)?.title,
  163. content => post.content.get().clone(),
  164. subtitle => post.subtitle.clone(),
  165. title => post.title.clone(),
  166. lang => detect_lang(post.content.get()).and_then(|i| if i.is_reliable() { Some(i.lang()) } else {None} ).unwrap_or(Lang::Eng).name(),
  167. license => post.license.clone(),
  168. ));
  169. Ok(())
  170. }
  171. pub fn delete_document(&self, post: &Post) {
  172. let schema = self.index.schema();
  173. let post_id = schema.get_field("post_id").unwrap();
  174. let doc_id = Term::from_field_i64(post_id, i64::from(post.id));
  175. let mut writer = self.writer.lock().unwrap();
  176. let writer = writer.as_mut().unwrap();
  177. writer.delete_term(doc_id);
  178. }
  179. pub fn update_document(&self, conn: &Connection, post: &Post) -> Result<()> {
  180. self.delete_document(post);
  181. self.add_document(conn, post)
  182. }
  183. pub fn search_document(
  184. &self,
  185. conn: &Connection,
  186. query: PlumeQuery,
  187. (min, max): (i32, i32),
  188. ) -> Vec<Post> {
  189. let schema = self.index.schema();
  190. let post_id = schema.get_field("post_id").unwrap();
  191. let collector = TopDocs::with_limit(cmp::max(1, max) as usize);
  192. let searcher = self.reader.searcher();
  193. let res = searcher.search(&query.into_query(), &collector).unwrap();
  194. res.get(min as usize..)
  195. .unwrap_or(&[])
  196. .iter()
  197. .filter_map(|(_, doc_add)| {
  198. let doc = searcher.doc(*doc_add).ok()?;
  199. let id = doc.get_first(post_id)?;
  200. Post::get(conn, id.i64_value() as i32).ok()
  201. //borrow checker don't want me to use filter_map or and_then here
  202. })
  203. .collect()
  204. }
  205. pub fn fill(&self, conn: &Connection) -> Result<()> {
  206. for post in posts::table
  207. .filter(posts::published.eq(true))
  208. .load::<Post>(conn)?
  209. {
  210. self.update_document(conn, &post)?
  211. }
  212. Ok(())
  213. }
  214. pub fn commit(&self) {
  215. let mut writer = self.writer.lock().unwrap();
  216. writer.as_mut().unwrap().commit().unwrap();
  217. self.reader.reload().unwrap();
  218. }
  219. pub fn drop_writer(&self) {
  220. self.writer.lock().unwrap().take();
  221. }
  222. }
  223. #[derive(Clone, Debug)]
  224. pub struct AddDocument(Post);
  225. #[actor(AddDocument)]
  226. pub struct SearcherActor {
  227. searcher: Searcher,
  228. db_pool: DbPool,
  229. }
  230. impl Actor for SearcherActor {
  231. // we used the #[actor] attribute so SearcherActorMsg is the Msg type
  232. type Msg = SearcherActorMsg;
  233. fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
  234. // Use the respective Receive<T> implementation
  235. self.receive(ctx, msg, sender);
  236. }
  237. }
  238. impl Receive<AddDocument> for SearcherActor {
  239. type Msg = SearcherActorMsg;
  240. fn receive(&mut self, _ctx: &Context<Self::Msg>, msg: AddDocument, _sender: Sender) {
  241. let conn = self.db_pool.get().unwrap();
  242. let _ = self.searcher.add_document(&conn, &msg.0);
  243. }
  244. }