You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

314 lines
12 KiB

use crate::application::errors::RepositoryError;
use crate::application::traits::DictRepository;
use crate::core::entities::{Dict, DictEntry, DictEntryId};
use sqlx::{Row, SqlitePool, sqlite::SqliteConnectOptions};
use std::collections::HashMap;
use std::str::FromStr;
#[derive(Clone)]
pub struct SqliteDictRepository {
pool: SqlitePool,
}
impl SqliteDictRepository {
pub async fn new(database_url: &str) -> Result<Self, RepositoryError> {
let options = SqliteConnectOptions::from_str(database_url)
.map_err(|_| RepositoryError::ConnectionFailed)?
.create_if_missing(true);
let pool = SqlitePool::connect_with(options)
.await
.map_err(|_| RepositoryError::ConnectionFailed)?;
// Run migrations
sqlx::migrate!("./migrations")
.run(&pool)
.await
.map_err(|e| RepositoryError::Unexpected(format!("Failed to run migrations: {}", e)))?;
Ok(Self { pool })
}
async fn ensure_dict_tables(&self, dict_name: &str) -> Result<(), RepositoryError> {
// Create dict table if not exists
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS dicts (
name TEXT PRIMARY KEY,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"#,
)
.execute(&self.pool)
.await
.map_err(|_| RepositoryError::ConnectionFailed)?;
// Insert dict if not exists
sqlx::query("INSERT OR IGNORE INTO dicts (name) VALUES (?)")
.bind(dict_name)
.execute(&self.pool)
.await
.map_err(|_| RepositoryError::ConnectionFailed)?;
// Create entries table for this dict
let table_name = format!("dict_entries_{}", dict_name);
let create_table_sql = format!(
r#"
CREATE TABLE IF NOT EXISTS {} (
id INTEGER PRIMARY KEY,
text TEXT NOT NULL UNIQUE,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"#,
table_name
);
sqlx::query(&create_table_sql)
.execute(&self.pool)
.await
.map_err(|_| RepositoryError::ConnectionFailed)?;
Ok(())
}
pub fn get_next_id(&self, dict_name: &str) -> Result<u32, RepositoryError> {
let pool = self.pool.clone();
let dict_name = dict_name.to_string();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let table_name = format!("dict_entries_{}", dict_name);
let result: Option<i64> =
sqlx::query_scalar(&format!("SELECT MAX(id) FROM {}", table_name))
.fetch_one(&pool)
.await
.map_err(|_| RepositoryError::ConnectionFailed)?;
Ok(result.map(|id| id as u32 + 1).unwrap_or(1))
})
})
}
fn find_id_by_text(&self, dict_name: &str, text: &str) -> Result<Option<u32>, RepositoryError> {
let pool = self.pool.clone();
let dict_name = dict_name.to_string();
let text = text.to_string();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let table_name = format!("dict_entries_{}", dict_name);
let result: Option<i64> =
sqlx::query_scalar(&format!("SELECT id FROM {} WHERE text = ?", table_name))
.bind(&text)
.fetch_one(&pool)
.await
.map_err(|_| RepositoryError::ConnectionFailed)?;
Ok(result.map(|id| id as u32))
})
})
}
}
impl DictRepository for SqliteDictRepository {
fn create(&self, name: &str) -> Result<(), RepositoryError> {
// This is a synchronous method, but we need to run async operations
// In a real application, you might want to make the trait async or use a blocking executor
let pool = self.pool.clone();
let name = name.to_string();
// Use tokio's block_in_place to run async code in sync context
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let mut tx = pool.begin().await.map_err(|e| {
RepositoryError::Unexpected(format!("Failed to begin transaction: {}", e))
})?;
// Create dict table
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS dicts (
name TEXT PRIMARY KEY,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| {
RepositoryError::Unexpected(format!("Failed to create dicts table: {}", e))
})?;
// Insert dict
sqlx::query("INSERT OR IGNORE INTO dicts (name) VALUES (?)")
.bind(&name)
.execute(&mut *tx)
.await
.map_err(|e| {
RepositoryError::Unexpected(format!("Failed to insert dict: {}", e))
})?;
// Create entries table for this dict
let table_name = format!("dict_entries_{}", name);
let create_table_sql = format!(
r#"
CREATE TABLE IF NOT EXISTS {} (
id INTEGER PRIMARY KEY,
text TEXT NOT NULL UNIQUE,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"#,
table_name
);
sqlx::query(&create_table_sql)
.execute(&mut *tx)
.await
.map_err(|e| {
RepositoryError::Unexpected(format!(
"Failed to create entries table: {}",
e
))
})?;
tx.commit().await.map_err(|e| {
RepositoryError::Unexpected(format!("Failed to commit transaction: {}", e))
})?;
Ok(())
})
})
}
fn save_entries(&self, dict_name: &str, entries: &[DictEntry]) -> Result<(), RepositoryError> {
let pool = self.pool.clone();
let dict_name = dict_name.to_string();
let entries = entries.to_vec();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let table_name = format!("dict_entries_{}", dict_name);
for entry in entries {
let metadata_json = serde_json::to_string(&entry.metadata)
.map_err(|e| RepositoryError::InvalidData(e.to_string()))?;
// Check if entry with this text already exists
let existing_id: Option<i64> = sqlx::query_scalar(&format!(
"SELECT id FROM {} WHERE text = ?",
table_name
))
.bind(&entry.text)
.fetch_optional(&pool)
.await
.map_err(|e| RepositoryError::Unexpected(format!("Failed to check existing entry: {}", e)))?;
if let Some(id) = existing_id {
// Update existing entry
sqlx::query(&format!(
"UPDATE {} SET metadata = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
table_name
))
.bind(metadata_json)
.bind(id)
.execute(&pool)
.await
.map_err(|e| RepositoryError::Unexpected(format!("Failed to update entry: {}", e)))?;
} else {
// Insert new entry
sqlx::query(&format!(
"INSERT INTO {} (id, text, metadata) VALUES (?, ?, ?)",
table_name
))
.bind(entry.id as i64)
.bind(&entry.text)
.bind(metadata_json)
.execute(&pool)
.await
.map_err(|e| RepositoryError::Unexpected(format!("Failed to insert entry: {}", e)))?;
}
}
Ok(())
})
})
}
fn get_next_id(&self, dict_name: &str) -> Result<u32, RepositoryError> {
self.get_next_id(dict_name)
}
fn fetch_many(
&self,
name: &str,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<Dict, RepositoryError> {
let pool = self.pool.clone();
let name = name.to_string();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let table_name = format!("dict_entries_{}", name);
// Check if dict exists
let dict_exists: bool =
sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM dicts WHERE name = ?)")
.bind(&name)
.fetch_one(&pool)
.await
.map_err(|e| {
RepositoryError::Unexpected(format!(
"Failed to check dict exists: {}",
e
))
})?;
if !dict_exists {
return Err(RepositoryError::NotFound);
}
let mut query = format!("SELECT id, text, metadata FROM {}", table_name);
if let Some(offset_val) = offset {
query.push_str(&format!(" LIMIT {}", limit.unwrap_or(1000)));
query.push_str(&format!(" OFFSET {}", offset_val));
} else if let Some(limit_val) = limit {
query.push_str(&format!(" LIMIT {}", limit_val));
}
let rows = sqlx::query(&query).fetch_all(&pool).await.map_err(|e| {
RepositoryError::Unexpected(format!("Failed to fetch entries: {}", e))
})?;
let mut entries = HashMap::new();
for row in rows {
let id: i64 = row.get("id");
let text: String = row.get("text");
let metadata_json: Option<String> = row.get("metadata");
let metadata = if let Some(json) = metadata_json {
serde_json::from_str(&json)
.map_err(|e| RepositoryError::InvalidData(e.to_string()))?
} else {
HashMap::new()
};
let entry = DictEntry {
id: id as DictEntryId,
text,
metadata,
};
entries.insert(entry.id, entry);
}
Ok(Dict { name, entries })
})
})
}
}