Browse Source

WIP: import/export

develop-refactor
chodak166 5 months ago
parent
commit
0be1288638
  1. 2
      app/src/app.rs
  2. 2
      lib/Cargo.toml
  3. 48
      lib/src/application/errors.rs
  4. 12
      lib/src/application/services.rs
  5. 39
      lib/src/application/traits.rs
  6. 8
      lib/src/core/entities.rs
  7. 12
      lib/src/infrastructure/errors.rs
  8. 4
      lib/src/infrastructure/json_file_dict_source.rs
  9. 367
      lib/src/infrastructure/sqlite_dict_repository.rs
  10. 2
      lib/src/presentation/cli/commands/import_dict.rs

2
app/src/app.rs

@ -19,7 +19,9 @@ impl Application {
let config = AppConfig::build(&args.global, &args.command)?;
tracing_subscriber::fmt()
.compact()
.with_env_filter(&config.log_level)
.with_target(false)
.init();
debug!("Bootstrapping application...");

2
lib/Cargo.toml

@ -17,4 +17,4 @@ chrono = { version = "0.4", features = ["serde"] }
thiserror = "1.0"
async-trait = "0.1"
parking_lot = "0.12"
sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "sqlite", "chrono", "migrate"] }
sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite", "chrono", "migrate"] }

48
lib/src/application/errors.rs

@ -1,15 +1,39 @@
#[derive(Debug)]
// #[derive(Debug)]
// pub enum RepositoryError {
// NotFound,
// ConnectionFailed,
// InvalidData(String),
// Unexpected(String),
// }
// impl std::fmt::Display for RepositoryError {
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// write!(f, "{:?}", self)
// }
// }
// impl std::error::Error for RepositoryError {}
use thiserror::Error;
// #[derive(Error, Debug)]
// pub enum RepositoryError {
// #[error("Database connection failed")]
// ConnectionFailed(#[source] sqlx::Error),
// #[error("Database query failed: {0}")]
// QueryFailed(#[source] sqlx::Error), //TODO: sqlx id infrastructure
// #[error("Dictionary '{0}' not found")]
// NotFound(String),
// #[error("Invalid data encountered")]
// InvalidData,
// }
#[derive(Error, Debug)]
pub enum RepositoryError {
NotFound,
#[error("Database connection failed")]
ConnectionFailed,
InvalidData(String),
Unexpected(String),
}
impl std::fmt::Display for RepositoryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
#[error("Dictionary '{0}' not found")]
NotFound(String),
#[error("Storage error: {0}")]
StorageError(String),
}
impl std::error::Error for RepositoryError {}

12
lib/src/application/services.rs

@ -13,9 +13,13 @@ impl<'a, R: DictRepository> DictImporter<'a, R> {
}
}
pub fn import(&self, name: &str, mut source: impl DictSource) -> Result<(), anyhow::Error> {
pub async fn import(
&self,
name: &str,
mut source: impl DictSource,
) -> Result<(), anyhow::Error> {
// 1. Ensure Dict exists (Logic: Create if new, or maybe clear existing?)
self.repo.create(name)?;
self.repo.create(name).await?;
let mut batch = Vec::with_capacity(self.batch_size);
@ -30,7 +34,7 @@ impl<'a, R: DictRepository> DictImporter<'a, R> {
// 3. Batch Write
if batch.len() >= self.batch_size {
self.repo.save_entries(name, &batch)?;
self.repo.save_entries(name, &batch).await?;
batch.clear();
}
}
@ -44,7 +48,7 @@ impl<'a, R: DictRepository> DictImporter<'a, R> {
// 4. Flush remaining
if !batch.is_empty() {
self.repo.save_entries(name, &batch)?;
self.repo.save_entries(name, &batch).await?;
}
Ok(())

39
lib/src/application/traits.rs

@ -3,20 +3,43 @@ use crate::{
core::entities::{Dict, DictEntry},
};
pub trait DictRepository {
fn create(&self, name: &str) -> Result<(), RepositoryError>;
// Batch saving is usually much faster than 1-by-1 for SQL
fn save_entries(&self, dict_name: &str, entries: &[DictEntry]) -> Result<(), RepositoryError>;
// pub trait DictRepository {
// fn create(&self, name: &str) -> Result<(), RepositoryError>;
// // Batch saving is usually much faster than 1-by-1 for SQL
// fn save_entries(&self, dict_name: &str, entries: &[DictEntry]) -> Result<(), RepositoryError>;
fn fetch_many(
// fn fetch_many(
// &self,
// name: &str,
// limit: Option<u32>,
// offset: Option<u32>,
// ) -> Result<Dict, RepositoryError>;
// // Get the next available ID for a dictionary
// fn get_next_id(&self, dict_name: &str) -> Result<u32, RepositoryError>;
// }
#[async_trait::async_trait]
pub trait DictRepository: Send + Sync {
async fn create(&self, name: &str) -> Result<(), RepositoryError>;
/// "Upsert" logic:
/// - If entry exists (by text), update metadata.
/// - If not, insert new.
/// - IDs are handled by the Database.
async fn save_entries(
&self,
dict_name: &str,
entries: &[DictEntry],
) -> Result<(), RepositoryError>;
/// Fetch a page of entries.
async fn fetch_many(
&self,
name: &str,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<Dict, RepositoryError>;
// Get the next available ID for a dictionary
fn get_next_id(&self, dict_name: &str) -> Result<u32, RepositoryError>;
}
pub trait DictSource {

8
lib/src/core/entities.rs

@ -1,16 +1,16 @@
use std::collections::HashMap;
pub type DictEntryId = u32;
pub type DictEntryId = u64;
#[derive(Debug, Clone, PartialEq)]
pub struct DictEntry {
pub id: DictEntryId,
pub id: Option<DictEntryId>,
pub text: String,
pub metadata: HashMap<String, String>,
}
impl DictEntry {
pub fn new(id: DictEntryId, text: String) -> Self {
pub fn new(id: Option<DictEntryId>, text: String) -> Self {
DictEntry {
id,
text,
@ -33,6 +33,6 @@ impl Dict {
}
pub fn add_entry(&mut self, entry: DictEntry) {
self.entries.insert(entry.id, entry);
self.entries.insert(entry.id.unwrap(), entry);
}
}

12
lib/src/infrastructure/errors.rs

@ -1 +1,13 @@
// use thiserror::Error;
// #[derive(Error, Debug)]
// pub enum RepositoryError {
// #[error("Database connection failed")]
// ConnectionFailed(#[source] sqlx::Error),
// #[error("Database query failed: {0}")]
// QueryFailed(#[source] sqlx::Error),
// #[error("Dictionary '{0}' not found")]
// NotFound(String),
// #[error("Invalid data encountered")]
// InvalidData,
// }

4
lib/src/infrastructure/json_file_dict_source.rs

@ -31,7 +31,7 @@ impl JsonFileDictSource {
// Convert to DictEntry with auto-generated IDs
let mut entries = Vec::new();
for (index, json_entry) in json_entries.into_iter().enumerate() {
let id = (index + 1) as u32; // Auto-generate ID starting from 1
let id = (index + 1) as u64; // Auto-generate ID starting from 1
// Convert metadata from serde_json::Value to HashMap<String, String>
let metadata = if let Some(meta) = json_entry.metadata {
@ -51,7 +51,7 @@ impl JsonFileDictSource {
};
entries.push(DictEntry {
id,
id: Some(id),
text: json_entry.word,
metadata,
});

367
lib/src/infrastructure/sqlite_dict_repository.rs

@ -5,6 +5,30 @@ use sqlx::{Row, SqlitePool, sqlite::SqliteConnectOptions};
use std::collections::HashMap;
use std::str::FromStr;
// --- DTO: Data Transfer Object ---
// This struct exists ONLY to talk to the database.
#[derive(sqlx::FromRow)]
struct SqliteEntryDto {
id: i64,
text: String,
// sqlx reads the DB column into this specific wrapper
metadata: sqlx::types::Json<HashMap<String, String>>,
}
// Mapper: DTO -> Domain Entity
impl From<SqliteEntryDto> for DictEntry {
fn from(dto: SqliteEntryDto) -> Self {
Self {
id: Some(dto.id as u64),
text: dto.text,
// Unwrap the sqlx wrapper to get the inner HashMap
metadata: dto.metadata.0,
}
}
}
// --- REPOSITORY IMPLEMENTATION ---
#[derive(Clone)]
pub struct SqliteDictRepository {
pool: SqlitePool,
@ -20,295 +44,154 @@ impl SqliteDictRepository {
.await
.map_err(|_| RepositoryError::ConnectionFailed)?;
// Run migrations
sqlx::migrate!("./migrations")
.run(&pool)
.await
.map_err(|e| RepositoryError::Unexpected(format!("Failed to run migrations: {}", e)))?;
Ok(Self { pool })
}
async fn ensure_dict_tables(&self, dict_name: &str) -> Result<(), RepositoryError> {
// Create dict table if not exists
// Ensure tables exist with proper Normalization and Constraints
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS dicts (
name TEXT PRIMARY KEY,
CREATE TABLE IF NOT EXISTS dictionaries (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"#,
)
.execute(&self.pool)
.await
.map_err(|_| RepositoryError::ConnectionFailed)?;
// Insert dict if not exists
sqlx::query("INSERT OR IGNORE INTO dicts (name) VALUES (?)")
.bind(dict_name)
.execute(&self.pool)
.await
.map_err(|_| RepositoryError::ConnectionFailed)?;
);
// Create entries table for this dict
let table_name = format!("dict_entries_{}", dict_name);
let create_table_sql = format!(
r#"
CREATE TABLE IF NOT EXISTS {} (
CREATE TABLE IF NOT EXISTS entries (
id INTEGER PRIMARY KEY,
text TEXT NOT NULL UNIQUE,
dictionary_id INTEGER NOT NULL,
text TEXT NOT NULL,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"#,
table_name
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(dictionary_id) REFERENCES dictionaries(id) ON DELETE CASCADE,
-- This constraint allows us to update existing words instead of duplicating them
UNIQUE(dictionary_id, text)
);
sqlx::query(&create_table_sql)
.execute(&self.pool)
"#,
)
.execute(&pool)
.await
.map_err(|_| RepositoryError::ConnectionFailed)?;
.map_err(|e| RepositoryError::StorageError(e.to_string()))?;
Ok(())
Ok(Self { pool })
}
pub fn get_next_id(&self, dict_name: &str) -> Result<u32, RepositoryError> {
let pool = self.pool.clone();
let dict_name = dict_name.to_string();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let table_name = format!("dict_entries_{}", dict_name);
let result: Option<i64> =
sqlx::query_scalar(&format!("SELECT MAX(id) FROM {}", table_name))
.fetch_one(&pool)
// Helper: Resolve dictionary name to ID
async fn get_dict_id(&self, name: &str) -> Result<i64, RepositoryError> {
let row = sqlx::query("SELECT id FROM dictionaries WHERE name = ?")
.bind(name)
.fetch_optional(&self.pool)
.await
.map_err(|_| RepositoryError::ConnectionFailed)?;
.map_err(|e| RepositoryError::StorageError(e.to_string()))?;
Ok(result.map(|id| id as u32 + 1).unwrap_or(1))
})
})
match row {
Some(r) => Ok(r.get("id")),
None => Err(RepositoryError::NotFound(name.to_string())),
}
fn find_id_by_text(&self, dict_name: &str, text: &str) -> Result<Option<u32>, RepositoryError> {
let pool = self.pool.clone();
let dict_name = dict_name.to_string();
let text = text.to_string();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let table_name = format!("dict_entries_{}", dict_name);
let result: Option<i64> =
sqlx::query_scalar(&format!("SELECT id FROM {} WHERE text = ?", table_name))
.bind(&text)
.fetch_one(&pool)
.await
.map_err(|_| RepositoryError::ConnectionFailed)?;
Ok(result.map(|id| id as u32))
})
})
}
}
#[async_trait::async_trait]
impl DictRepository for SqliteDictRepository {
fn create(&self, name: &str) -> Result<(), RepositoryError> {
// This is a synchronous method, but we need to run async operations
// In a real application, you might want to make the trait async or use a blocking executor
let pool = self.pool.clone();
let name = name.to_string();
// Use tokio's block_in_place to run async code in sync context
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let mut tx = pool.begin().await.map_err(|e| {
RepositoryError::Unexpected(format!("Failed to begin transaction: {}", e))
})?;
// Create dict table
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS dicts (
name TEXT PRIMARY KEY,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"#,
)
.execute(&mut *tx)
async fn create(&self, name: &str) -> Result<(), RepositoryError> {
sqlx::query("INSERT OR IGNORE INTO dictionaries (name) VALUES (?)")
.bind(name)
.execute(&self.pool)
.await
.map_err(|e| {
RepositoryError::Unexpected(format!("Failed to create dicts table: {}", e))
})?;
.map_err(|e| RepositoryError::StorageError(e.to_string()))?;
Ok(())
}
// Insert dict
sqlx::query("INSERT OR IGNORE INTO dicts (name) VALUES (?)")
.bind(&name)
.execute(&mut *tx)
async fn save_entries(
&self,
dict_name: &str,
entries: &[DictEntry],
) -> Result<(), RepositoryError> {
let mut tx = self
.pool
.begin()
.await
.map_err(|e| {
RepositoryError::Unexpected(format!("Failed to insert dict: {}", e))
})?;
// Create entries table for this dict
let table_name = format!("dict_entries_{}", name);
let create_table_sql = format!(
r#"
CREATE TABLE IF NOT EXISTS {} (
id INTEGER PRIMARY KEY,
text TEXT NOT NULL UNIQUE,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"#,
table_name
);
.map_err(|_| RepositoryError::ConnectionFailed)?;
sqlx::query(&create_table_sql)
.execute(&mut *tx)
// 1. Get Dict ID
let dict_id_row = sqlx::query("SELECT id FROM dictionaries WHERE name = ?")
.bind(dict_name)
.fetch_optional(&mut *tx)
.await
.map_err(|e| {
RepositoryError::Unexpected(format!(
"Failed to create entries table: {}",
e
))
})?;
.map_err(|e| RepositoryError::StorageError(e.to_string()))?;
tx.commit().await.map_err(|e| {
RepositoryError::Unexpected(format!("Failed to commit transaction: {}", e))
})?;
Ok(())
})
})
}
fn save_entries(&self, dict_name: &str, entries: &[DictEntry]) -> Result<(), RepositoryError> {
let pool = self.pool.clone();
let dict_name = dict_name.to_string();
let entries = entries.to_vec();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let table_name = format!("dict_entries_{}", dict_name);
let dict_id: i64 = match dict_id_row {
Some(row) => row.get("id"),
None => return Err(RepositoryError::NotFound(dict_name.to_string())),
};
// 2. Batch Upsert
for entry in entries {
let metadata_json = serde_json::to_string(&entry.metadata)
.map_err(|e| RepositoryError::InvalidData(e.to_string()))?;
// Check if entry with this text already exists
let existing_id: Option<i64> = sqlx::query_scalar(&format!(
"SELECT id FROM {} WHERE text = ?",
table_name
))
.bind(&entry.text)
.fetch_optional(&pool)
.await
.map_err(|e| RepositoryError::Unexpected(format!("Failed to check existing entry: {}", e)))?;
// We must wrap the HashMap in sqlx::types::Json so SQLx knows how to serialize it
let meta_json = sqlx::types::Json(&entry.metadata);
if let Some(id) = existing_id {
// Update existing entry
sqlx::query(&format!(
"UPDATE {} SET metadata = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
table_name
))
.bind(metadata_json)
.bind(id)
.execute(&pool)
.await
.map_err(|e| RepositoryError::Unexpected(format!("Failed to update entry: {}", e)))?;
} else {
// Insert new entry
sqlx::query(&format!(
"INSERT INTO {} (id, text, metadata) VALUES (?, ?, ?)",
table_name
))
.bind(entry.id as i64)
sqlx::query(
r#"
INSERT INTO entries (dictionary_id, text, metadata)
VALUES (?, ?, ?)
ON CONFLICT(dictionary_id, text) DO UPDATE SET
metadata = excluded.metadata,
updated_at = CURRENT_TIMESTAMP
"#,
)
.bind(dict_id)
.bind(&entry.text)
.bind(metadata_json)
.execute(&pool)
.bind(meta_json)
.execute(&mut *tx)
.await
.map_err(|e| RepositoryError::Unexpected(format!("Failed to insert entry: {}", e)))?;
}
.map_err(|e| RepositoryError::StorageError(e.to_string()))?;
}
tx.commit()
.await
.map_err(|e| RepositoryError::StorageError(e.to_string()))?;
Ok(())
})
})
}
fn get_next_id(&self, dict_name: &str) -> Result<u32, RepositoryError> {
self.get_next_id(dict_name)
}
fn fetch_many(
async fn fetch_many(
&self,
name: &str,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<Dict, RepositoryError> {
let pool = self.pool.clone();
let name = name.to_string();
// 1. Get Dict ID
let dict_id = self.get_dict_id(name).await?;
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let table_name = format!("dict_entries_{}", name);
// 2. Prepare Limits
let limit_val = limit.unwrap_or(1000);
let offset_val = offset.unwrap_or(0);
// Check if dict exists
let dict_exists: bool =
sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM dicts WHERE name = ?)")
.bind(&name)
.fetch_one(&pool)
// 3. Query (Reading into the DTO)
let dtos = sqlx::query_as::<_, SqliteEntryDto>(
r#"
SELECT id, text, metadata
FROM entries
WHERE dictionary_id = ?
LIMIT ? OFFSET ?
"#,
)
.bind(dict_id)
.bind(limit_val)
.bind(offset_val)
.fetch_all(&self.pool)
.await
.map_err(|e| {
RepositoryError::Unexpected(format!(
"Failed to check dict exists: {}",
e
))
})?;
.map_err(|e| RepositoryError::StorageError(e.to_string()))?;
if !dict_exists {
return Err(RepositoryError::NotFound);
}
// 4. Convert DTOs to Domain Dict
let mut entries_map = HashMap::new();
for dto in dtos {
let entry: DictEntry = dto.into(); // Converts DTO -> Entity
let mut query = format!("SELECT id, text, metadata FROM {}", table_name);
if let Some(offset_val) = offset {
query.push_str(&format!(" LIMIT {}", limit.unwrap_or(1000)));
query.push_str(&format!(" OFFSET {}", offset_val));
} else if let Some(limit_val) = limit {
query.push_str(&format!(" LIMIT {}", limit_val));
// We safely unwrap because the DB guarantees an ID exists
if let Some(id) = entry.id {
entries_map.insert(id, entry);
}
let rows = sqlx::query(&query).fetch_all(&pool).await.map_err(|e| {
RepositoryError::Unexpected(format!("Failed to fetch entries: {}", e))
})?;
let mut entries = HashMap::new();
for row in rows {
let id: i64 = row.get("id");
let text: String = row.get("text");
let metadata_json: Option<String> = row.get("metadata");
let metadata = if let Some(json) = metadata_json {
serde_json::from_str(&json)
.map_err(|e| RepositoryError::InvalidData(e.to_string()))?
} else {
HashMap::new()
};
let entry = DictEntry {
id: id as DictEntryId,
text,
metadata,
};
entries.insert(entry.id, entry);
}
Ok(Dict { name, entries })
})
Ok(Dict {
name: name.to_string(),
entries: entries_map,
})
}
}

2
lib/src/presentation/cli/commands/import_dict.rs

@ -21,7 +21,7 @@ pub async fn run<R: DictRepository>(
let importer = DictImporter::new(&repository);
// Perform the import (this will call create() first)
match importer.import(&config.name, source) {
match importer.import(&config.name, source).await {
Ok(()) => {
info!("Successfully imported dictionary '{}'", config.name);
Ok(())

Loading…
Cancel
Save