|
|
|
|
@ -1,4 +1,9 @@
|
|
|
|
|
use crate::core::{DictRepository, SystemDecoder, entities::DecodedLength, errors::CodecError}; |
|
|
|
|
use crate::core::{ |
|
|
|
|
DictRepository, SystemDecoder, |
|
|
|
|
entities::DecodedLength, |
|
|
|
|
errors::{CodecError, RepositoryError}, |
|
|
|
|
}; |
|
|
|
|
use futures::{Stream, StreamExt, TryStreamExt}; |
|
|
|
|
use std::{collections::HashMap, hash::Hash, num::ParseIntError}; |
|
|
|
|
use thiserror::Error; |
|
|
|
|
|
|
|
|
|
@ -26,6 +31,9 @@ pub enum LenValueMapError {
|
|
|
|
|
#[error(transparent)] |
|
|
|
|
Codec(#[from] CodecError), |
|
|
|
|
|
|
|
|
|
#[error(transparent)] |
|
|
|
|
Repository(#[from] RepositoryError), |
|
|
|
|
|
|
|
|
|
#[error("unable to build encoder data: {0}")] |
|
|
|
|
Build(String), |
|
|
|
|
} |
|
|
|
|
@ -61,6 +69,10 @@ impl LenValueMap {
|
|
|
|
|
self |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn get(&self, len: u8, num: DecodedNumber) -> Option<&Vec<String>> { |
|
|
|
|
self.data.get(&DecodedLength::from(len))?.get(&num) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn insert_words<I>( |
|
|
|
|
&mut self, |
|
|
|
|
words: I, |
|
|
|
|
@ -89,38 +101,28 @@ impl LenValueMap {
|
|
|
|
|
Self { data: data } |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub async fn from_dict( |
|
|
|
|
decoder: &impl SystemDecoder, |
|
|
|
|
repo: &impl DictRepository, |
|
|
|
|
) -> Result<Self, LenValueMapError> { |
|
|
|
|
Self::build(decoder, repo, DEFAULT_DICT_BATCH_SIZE).await |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn build( |
|
|
|
|
decoder: &impl SystemDecoder, |
|
|
|
|
repo: &impl DictRepository, |
|
|
|
|
batch_size: usize, |
|
|
|
|
) -> Result<Self, LenValueMapError> { |
|
|
|
|
pub async fn from_stream<S>( |
|
|
|
|
mut stream: S, |
|
|
|
|
decoder: &dyn SystemDecoder, |
|
|
|
|
) -> Result<Self, LenValueMapError> |
|
|
|
|
where |
|
|
|
|
// S is a stream of "Result<Vec<String>, Error>"
|
|
|
|
|
S: Stream<Item = Result<Vec<String>, crate::core::errors::RepositoryError>> + Unpin, |
|
|
|
|
{ |
|
|
|
|
let mut map = LenValueMap::new(); |
|
|
|
|
let mut offset = 0; |
|
|
|
|
const MAX_OFFSET: usize = 10_000_000; |
|
|
|
|
|
|
|
|
|
loop { |
|
|
|
|
let dict = repo |
|
|
|
|
.fetch_many(batch_size, offset) |
|
|
|
|
.await |
|
|
|
|
.map_err(|e| LenValueMapError::Build(e.to_string()))?; |
|
|
|
|
|
|
|
|
|
if dict.entries.is_empty() { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let words: Vec<String> = dict.entries.into_iter().map(|entry| entry.1.text).collect(); |
|
|
|
|
map.insert_words(words, decoder)?; |
|
|
|
|
|
|
|
|
|
offset += batch_size; |
|
|
|
|
if offset >= MAX_OFFSET { |
|
|
|
|
break; |
|
|
|
|
// We stream the batches one by one.
|
|
|
|
|
// This ensures only one batch is in memory at a time.
|
|
|
|
|
while let Some(batch_result) = stream.next().await { |
|
|
|
|
match batch_result { |
|
|
|
|
Ok(batch) => { |
|
|
|
|
// We delegate to the synchronous logic for the heavy lifting
|
|
|
|
|
map.insert_words(batch, decoder)?; |
|
|
|
|
} |
|
|
|
|
Err(e) => { |
|
|
|
|
// Convert RepositoryError to LenValueMapError::Build
|
|
|
|
|
return Err(e.into()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -133,6 +135,8 @@ mod tests {
|
|
|
|
|
use super::*; |
|
|
|
|
use crate::core::{entities::*, errors::*}; |
|
|
|
|
use async_trait::async_trait; |
|
|
|
|
use futures::stream; |
|
|
|
|
|
|
|
|
|
use std::collections::HashMap; |
|
|
|
|
|
|
|
|
|
use mockall::{Sequence, automock}; |
|
|
|
|
@ -260,22 +264,6 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
// --- build ---
|
|
|
|
|
|
|
|
|
|
mock! { |
|
|
|
|
pub Repo {} |
|
|
|
|
|
|
|
|
|
#[async_trait] |
|
|
|
|
impl DictRepository for Repo { |
|
|
|
|
async fn create_dict(&self) -> Result<(), RepositoryError>; |
|
|
|
|
fn use_dict(&mut self, _name: &str); |
|
|
|
|
async fn save_entries(&self, _entry: &[DictEntry]) -> Result<(), RepositoryError>; |
|
|
|
|
async fn fetch_many( |
|
|
|
|
&self, |
|
|
|
|
limit: usize, |
|
|
|
|
offset: usize, |
|
|
|
|
) -> Result<Dict, RepositoryError>; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn dict_with_words(words: &[&str]) -> Dict { |
|
|
|
|
let mut dict = Dict::new("default".into()); |
|
|
|
|
|
|
|
|
|
@ -287,91 +275,94 @@ mod tests {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[tokio::test] |
|
|
|
|
async fn test_build_single_batch() { |
|
|
|
|
let mut repo = MockRepo::new(); |
|
|
|
|
async fn test_from_stream_success() { |
|
|
|
|
// 1. Setup Mocks (Same as before)
|
|
|
|
|
let mut decoder = MockDecoder::new(); |
|
|
|
|
let mut seq = Sequence::new(); |
|
|
|
|
|
|
|
|
|
decoder |
|
|
|
|
.expect_decode() |
|
|
|
|
.returning(|word| mock_decoding(word)); |
|
|
|
|
|
|
|
|
|
// FIRST CALL expectation (will be called first)
|
|
|
|
|
repo.expect_fetch_many() |
|
|
|
|
.times(1) // Explicitly expect 1 call
|
|
|
|
|
.in_sequence(&mut seq) // Enforce order
|
|
|
|
|
.returning(|_, _| Ok(dict_with_words(&[TEST_WORD_1]))); |
|
|
|
|
// 2. Prepare Data
|
|
|
|
|
// We wrap the inner Vecs in Ok() because the stream expects Result<Vec<String>, RepositoryError>
|
|
|
|
|
let batches = vec![ |
|
|
|
|
Ok(vec![TEST_WORD_1.into(), TEST_WORD_2.into()]), |
|
|
|
|
Ok(vec![TEST_WORD_3.into(), TEST_WORD_4.into()]), |
|
|
|
|
]; |
|
|
|
|
|
|
|
|
|
// SECOND CALL expectation (will be called second)
|
|
|
|
|
repo.expect_fetch_many() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.returning(|_, _| Ok(Dict::new("default_dict".into()))); |
|
|
|
|
// 3. Create a Stream from the Vec
|
|
|
|
|
// stream::iter converts an IntoIterator into a Stream
|
|
|
|
|
let stream = stream::iter(batches); |
|
|
|
|
|
|
|
|
|
let data = LenValueMap::build(&decoder, &repo, 1) |
|
|
|
|
// 4. Inject the stream (Dependency Injection)
|
|
|
|
|
let map = LenValueMap::from_stream(stream, &decoder) |
|
|
|
|
.await |
|
|
|
|
.unwrap() |
|
|
|
|
.into_data(); |
|
|
|
|
.expect("Should build map successfully"); |
|
|
|
|
|
|
|
|
|
assert_eq!(data.len(), 1); |
|
|
|
|
// 5. Assertions
|
|
|
|
|
let data = map.into_data(); |
|
|
|
|
assert_eq!(data.len(), 2); |
|
|
|
|
assert!(data.contains_key(&TEST_NUM_1_LEN)); |
|
|
|
|
assert!(data.contains_key(&TEST_NUM_3_LEN)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[tokio::test] |
|
|
|
|
async fn test_build_multiple_batches() { |
|
|
|
|
let mut repo = MockRepo::new(); |
|
|
|
|
async fn test_from_stream_failure() { |
|
|
|
|
let mut decoder = MockDecoder::new(); |
|
|
|
|
let mut seq = Sequence::new(); |
|
|
|
|
|
|
|
|
|
decoder |
|
|
|
|
.expect_decode() |
|
|
|
|
.returning(|word| mock_decoding(word)); |
|
|
|
|
|
|
|
|
|
repo.expect_fetch_many() |
|
|
|
|
.times(1) // Explicitly expect 1 call
|
|
|
|
|
.in_sequence(&mut seq) // Enforce order
|
|
|
|
|
.returning(|_, _| Ok(dict_with_words(&[TEST_WORD_1]))); |
|
|
|
|
let batches = vec![ |
|
|
|
|
Ok(vec![TEST_WORD_1.into()]), |
|
|
|
|
Err(RepositoryError::ConnectionFailed), |
|
|
|
|
Ok(vec![TEST_WORD_3.into()]), |
|
|
|
|
]; |
|
|
|
|
|
|
|
|
|
repo.expect_fetch_many() |
|
|
|
|
.times(1) // Explicitly expect 1 call
|
|
|
|
|
.in_sequence(&mut seq) // Enforce order
|
|
|
|
|
.returning(|_, _| Ok(dict_with_words(&[TEST_WORD_3]))); // word with different decoded length
|
|
|
|
|
let stream = stream::iter(batches); |
|
|
|
|
let result = LenValueMap::from_stream(stream, &decoder).await; |
|
|
|
|
|
|
|
|
|
repo.expect_fetch_many() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.returning(|_, _| Ok(Dict::new("default_dict".into()))); |
|
|
|
|
|
|
|
|
|
let data = LenValueMap::build(&decoder, &repo, 1) |
|
|
|
|
.await |
|
|
|
|
.unwrap() |
|
|
|
|
.into_data(); |
|
|
|
|
|
|
|
|
|
assert_eq!(data.len(), 2); |
|
|
|
|
match result { |
|
|
|
|
// We match specifically on the Repository variant and the ConnectionFailed inner error
|
|
|
|
|
Err(LenValueMapError::Repository(RepositoryError::ConnectionFailed)) => { |
|
|
|
|
// Success! The correct error type propagated up.
|
|
|
|
|
} |
|
|
|
|
// If it's any other error (including a stringified one), we fail
|
|
|
|
|
_ => panic!( |
|
|
|
|
"Expected LenValueMapError::Repository(ConnectionFailed), got {:?}", |
|
|
|
|
result |
|
|
|
|
), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// #[tokio::test]
|
|
|
|
|
// async fn test_build_fetches_multiple_batches() {
|
|
|
|
|
// async fn test_build_multiple_batches() {
|
|
|
|
|
// let mut repo = MockRepo::new();
|
|
|
|
|
// let mut decoder = MockDecoder::new();
|
|
|
|
|
// let mut seq = Sequence::new();
|
|
|
|
|
|
|
|
|
|
// decoder
|
|
|
|
|
// .expect_decode()
|
|
|
|
|
// .returning(|word| mock_decoding(word));
|
|
|
|
|
|
|
|
|
|
// repo.expect_fetch_many()
|
|
|
|
|
// .with(eq("default"), eq(Some(2)), eq(Some(0)))
|
|
|
|
|
// .return_once(|_, _, _| Ok(dict_with_words(&[TEST_WORD_1, TEST_WORD_2])));
|
|
|
|
|
// .times(1) // Explicitly expect 1 call
|
|
|
|
|
// .in_sequence(&mut seq) // Enforce order
|
|
|
|
|
// .returning(|_, _| Ok(dict_with_words(&[TEST_WORD_1])));
|
|
|
|
|
|
|
|
|
|
// repo.expect_fetch_many()
|
|
|
|
|
// .with(eq("default"), eq(Some(2)), eq(Some(2)))
|
|
|
|
|
// .return_once(|_, _, _| Ok(dict_with_words(&[TEST_WORD_3, TEST_WORD_4])));
|
|
|
|
|
// .times(1) // Explicitly expect 1 call
|
|
|
|
|
// .in_sequence(&mut seq) // Enforce order
|
|
|
|
|
// .returning(|_, _| Ok(dict_with_words(&[TEST_WORD_3]))); // word with different decoded length
|
|
|
|
|
|
|
|
|
|
// repo.expect_fetch_many()
|
|
|
|
|
// .with(eq("default"), eq(Some(2)), eq(Some(4)))
|
|
|
|
|
// .return_once(|_, _, _| Ok(Dict::new("default".into())));
|
|
|
|
|
|
|
|
|
|
// let map = LenValueMap::build(&decoder, &repo, 2).await;
|
|
|
|
|
// let data = map.into_data();
|
|
|
|
|
// .times(1)
|
|
|
|
|
// .in_sequence(&mut seq)
|
|
|
|
|
// .returning(|_, _| Ok(Dict::new("default_dict".into())));
|
|
|
|
|
|
|
|
|
|
// let data = LenValueMap::build(&decoder, &repo, 1)
|
|
|
|
|
// .await
|
|
|
|
|
// .unwrap()
|
|
|
|
|
// .into_data();
|
|
|
|
|
|
|
|
|
|
// assert_eq!(data.len(), 2);
|
|
|
|
|
// }
|
|
|
|
|
|