use crate::common::entities::{Dict, DictEntry}; use crate::common::errors::RepositoryError; use crate::common::traits::DictRepository; use futures::TryStreamExt; use futures::stream::BoxStream; use sqlx::{Row, SqlitePool, sqlite::SqliteConnectOptions}; use std::collections::HashMap; use std::str::FromStr; #[derive(sqlx::FromRow)] struct SqliteEntryDto { id: i64, text: String, // sqlx reads the DB column into this specific wrapper metadata: sqlx::types::Json>, } // Mapper: DTO -> Domain Entity impl From for DictEntry { fn from(dto: SqliteEntryDto) -> Self { Self { id: Some(dto.id as u64), text: dto.text, // Unwrap the sqlx wrapper to get the inner HashMap metadata: dto.metadata.0, } } } // --- REPOSITORY IMPLEMENTATION --- #[derive(Clone)] pub struct SqliteDictRepository { pool: SqlitePool, dict_name: String, } impl SqliteDictRepository { pub async fn new(database_url: &str) -> Result { let options = SqliteConnectOptions::from_str(database_url) .map_err(|_| RepositoryError::ConnectionFailed)? .create_if_missing(true); let pool = SqlitePool::connect_with(options) .await .map_err(|_| RepositoryError::ConnectionFailed)?; // Ensure tables exist with proper Normalization and Constraints sqlx::query( r#" CREATE TABLE IF NOT EXISTS dictionaries ( id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS entries ( id INTEGER PRIMARY KEY, dictionary_id INTEGER NOT NULL, text TEXT NOT NULL, metadata TEXT, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(dictionary_id) REFERENCES dictionaries(id) ON DELETE CASCADE, -- This constraint allows us to update existing words instead of duplicating them UNIQUE(dictionary_id, text) ); "#, ) .execute(&pool) .await .map_err(|e| RepositoryError::StorageError(e.to_string()))?; Ok(Self { pool: pool, dict_name: "default_dict".into(), }) } // Helper: Resolve dictionary name to ID async fn get_dict_id(&self) -> Result { let row = sqlx::query("SELECT id FROM dictionaries WHERE name = ?") .bind(&self.dict_name) .fetch_optional(&self.pool) .await .map_err(|e| RepositoryError::StorageError(e.to_string()))?; match row { Some(r) => Ok(r.get("id")), None => Err(RepositoryError::NotFound(self.dict_name.clone())), } } } #[async_trait::async_trait] impl DictRepository for SqliteDictRepository { async fn create_dict(&self) -> Result<(), RepositoryError> { sqlx::query("INSERT OR IGNORE INTO dictionaries (name) VALUES (?)") .bind(&self.dict_name) .execute(&self.pool) .await .map_err(|e| RepositoryError::StorageError(e.to_string()))?; Ok(()) } fn use_dict(&mut self, name: &str) { self.dict_name = name.to_string(); } async fn save_entries(&self, entries: &[DictEntry]) -> Result<(), RepositoryError> { let mut tx = self .pool .begin() .await .map_err(|_| RepositoryError::ConnectionFailed)?; // 1. Get Dict ID let dict_id_row = sqlx::query("SELECT id FROM dictionaries WHERE name = ?") .bind(&self.dict_name) .fetch_optional(&mut *tx) .await .map_err(|e| RepositoryError::StorageError(e.to_string()))?; let dict_id: i64 = match dict_id_row { Some(row) => row.get("id"), None => return Err(RepositoryError::NotFound(self.dict_name.clone())), }; // 2. Batch Upsert for entry in entries { // We must wrap the HashMap in sqlx::types::Json so SQLx knows how to serialize it let meta_json = sqlx::types::Json(&entry.metadata); sqlx::query( r#" INSERT INTO entries (dictionary_id, text, metadata) VALUES (?, ?, ?) ON CONFLICT(dictionary_id, text) DO UPDATE SET metadata = excluded.metadata, updated_at = CURRENT_TIMESTAMP "#, ) .bind(dict_id) .bind(&entry.text) .bind(meta_json) .execute(&mut *tx) .await .map_err(|e| RepositoryError::StorageError(e.to_string()))?; } tx.commit() .await .map_err(|e| RepositoryError::StorageError(e.to_string()))?; Ok(()) } async fn fetch_many(&self, limit: usize, offset: usize) -> Result { // Get Dict ID let dict_id = self.get_dict_id().await?; // Query (Reading into the DTO) let dtos = sqlx::query_as::<_, SqliteEntryDto>( r#" SELECT id, text, metadata FROM entries WHERE dictionary_id = ? LIMIT ? OFFSET ? "#, ) .bind(dict_id) .bind(limit as u32) .bind(offset as u32) .fetch_all(&self.pool) .await .map_err(|e| RepositoryError::StorageError(e.to_string()))?; // 4. Convert DTOs to Domain Dict let mut entries_map = HashMap::new(); for dto in dtos { let entry: DictEntry = dto.into(); // Converts DTO -> Entity // We safely unwrap because the DB guarantees an ID exists if let Some(id) = entry.id { entries_map.insert(id, entry); } } Ok(Dict { name: self.dict_name.clone(), entries: entries_map, }) } async fn stream_batches( &self, batch_size: usize, ) -> Result, RepositoryError>>, RepositoryError> { // 1. Resolve ID first let dict_id = self.get_dict_id().await?; // 2. Create the base query stream. // We do NOT use limit/offset. We let the DB stream rows via a cursor. let query_stream = sqlx::query("SELECT text FROM entries WHERE dictionary_id = ?") .bind(dict_id) .fetch(&self.pool); // 3. Transform the stream using Functional combinators let stream = query_stream // Map SQLx errors to Domain errors .map_err(|e| RepositoryError::StorageError(e.to_string())) // Extract the String from the Row .and_then(|row| async move { // 'text' is the column name let text: String = row .try_get("text") .map_err(|e| RepositoryError::StorageError(e.to_string()))?; Ok(text) }) // Group items into vectors of size `batch_size` .try_chunks(batch_size) // try_chunks returns a specific error type on failure, map it back .map_err(|e| { // logic to handle leftover elements if error occurs, // but for simplicity, we treat stream errors as fatal here RepositoryError::StorageError(e.to_string()) }); // 4. Box the stream to erase the complex iterator type (Type Erasure) Ok(Box::pin(stream)) } }