diff --git a/src/server/csv_import_export.rs b/src/server/csv_import_export.rs new file mode 100644 index 0000000..26d1fae --- /dev/null +++ b/src/server/csv_import_export.rs @@ -0,0 +1,414 @@ +// src/server/csv_import_export.rs +//! Lock-free модуль для импорта/экспорта данных в формате CSV +//! +//! Основные функции: +//! 1. Импорт CSV файлов в коллекции базы данных +//! 2. Экспорт коллекций в CSV файлы +//! 3. Отслеживание прогресса импорта с атомарными счетчиками +//! 4. Буферизация операций для повышения производительности +//! +//! Особенности: +//! - Атомарные операции без блокировок через DashMap +//! - Автоматическое определение типов данных +//! - Обработка больших файлов с пагинацией +//! - Отслеживание прогресса в реальном времени + +use std::sync::Arc; +use std::fs::File; +use std::io::{BufReader, BufWriter}; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; +use csv::{Reader, Writer}; +use serde_json::Value; +use dashmap::DashMap; +use tokio::sync::RwLock; + +use crate::common::Result; +use crate::common::config::CsvConfig; +use crate::server::database::Database; + +/// Lock-free хэш-таблица для прогресса импорта +/// Использует DashMap для атомарного доступа к данным прогресса +struct LockFreeProgressMap { + progress_data: DashMap, + active_imports: AtomicUsize, +} + +impl LockFreeProgressMap { + fn new() -> Self { + Self { + progress_data: DashMap::new(), + active_imports: AtomicUsize::new(0), + } + } + + fn insert(&self, key: String, value: f64) -> Option { + if value == 100.0 { + // Импорт завершен, удаляем из активных + self.active_imports.fetch_sub(1, Ordering::SeqCst); + } else if value == 0.0 { + // Начинаем новый импорт + self.active_imports.fetch_add(1, Ordering::SeqCst); + } + + self.progress_data.insert(key, value) + } + + fn get(&self, key: &str) -> Option { + self.progress_data.get(key).map(|entry| *entry) + } + + fn remove(&self, key: &str) -> Option { + self.progress_data.remove(key).map(|(_, v)| v) + } + + fn len(&self) -> usize { + self.progress_data.len() + } + + fn active_imports(&self) -> usize { + self.active_imports.load(Ordering::SeqCst) + } +} + +/// Менеджер CSV операций с lock-free архитектурой +#[derive(Clone)] +pub struct CsvManager { + database: Arc, + config: CsvConfig, + import_progress: Arc, + // Используем RwLock для буферизации, но без блокировок на операции + import_buffer: Arc>>>, +} + +impl CsvManager { + pub fn new(database: Arc, config: CsvConfig) -> Self { + // Создаем директории для импорта и экспорта, если они не существуют + let _ = std::fs::create_dir_all(&config.import_dir); + let _ = std::fs::create_dir_all(&config.export_dir); + + Self { + database, + config, + import_progress: Arc::new(LockFreeProgressMap::new()), + import_buffer: Arc::new(RwLock::new(DashMap::new())), + } + } + + pub fn import_csv(&self, collection_name: &str, file_path: &str) -> Result { + // Проверяем размер файла + let metadata = std::fs::metadata(file_path) + .map_err(|e| crate::common::FutriixError::IoError(e))?; + + if metadata.len() > self.config.max_file_size { + return Err(crate::common::FutriixError::CsvError( + format!("File size {} exceeds maximum allowed size {}", + metadata.len(), self.config.max_file_size) + )); + } + + println!("Importing CSV file '{}' into collection '{}'", file_path, collection_name); + + // Открываем файл для чтения + let file = File::open(file_path) + .map_err(|e| crate::common::FutriixError::IoError(e))?; + + let mut reader = Reader::from_reader(BufReader::new(file)); + + // Читаем заголовки + let headers: Vec = reader.headers() + .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))? + .iter() + .map(|s| s.to_string()) + .collect(); + + let mut record_count = 0; + let mut error_count = 0; + + // Начинаем импорт + self.import_progress.insert(collection_name.to_string(), 0.0); + + // Буферизируем записи для batch вставки + let mut buffer = Vec::new(); + + for result in reader.records() { + let record = match result { + Ok(r) => r, + Err(e) => { + error_count += 1; + eprintln!("Failed to parse CSV record: {}", e); + continue; + } + }; + + let mut document = serde_json::Map::new(); + + for (i, field) in record.iter().enumerate() { + let header = if i < headers.len() { + &headers[i] + } else { + &format!("field_{}", i) + }; + + let value = Self::parse_field_value(field); + document.insert(header.to_string(), value); + } + + let json_value = Value::Object(document); + buffer.push(json_value); + + if buffer.len() >= 100 { + // Вставляем пачку записей + match self.insert_batch(collection_name, &buffer) { + Ok(inserted) => { + record_count += inserted; + } + Err(e) => { + error_count += buffer.len(); + eprintln!("Failed to insert batch: {}", e); + } + } + buffer.clear(); + + if record_count % 100 == 0 { + println!("Imported {} records...", record_count); + + let progress = (record_count as f64 / 1000.0) * 100.0; + self.import_progress.insert(collection_name.to_string(), progress.min(100.0)); + } + } + } + + // Вставляем оставшиеся записи + if !buffer.is_empty() { + match self.insert_batch(collection_name, &buffer) { + Ok(inserted) => { + record_count += inserted; + } + Err(e) => { + error_count += buffer.len(); + eprintln!("Failed to insert final batch: {}", e); + } + } + } + + // Завершаем импорт + self.import_progress.insert(collection_name.to_string(), 100.0); + + if error_count > 0 { + println!("Import completed with {} successful records and {} errors", + record_count, error_count); + } else { + println!("Successfully imported {} records into collection '{}'", + record_count, collection_name); + } + + Ok(record_count) + } + + /// Вставляет пачку документов атомарно + fn insert_batch(&self, collection_name: &str, documents: &[Value]) -> Result { + let mut inserted = 0; + + for document in documents { + let json_string = match serde_json::to_string(document) { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to serialize document: {}", e); + continue; + } + }; + + let command = crate::common::protocol::Command::Create { + collection: collection_name.to_string(), + document: json_string.into_bytes(), + }; + + match self.database.execute_command(command) { + Ok(_) => { + inserted += 1; + } + Err(e) => { + eprintln!("Failed to import record: {}", e); + } + } + } + + Ok(inserted) + } + + /// Парсит строковое значение поля в соответствующий тип JSON + fn parse_field_value(field: &str) -> Value { + if field.is_empty() { + return Value::Null; + } + + // Пытаемся распарсить как целое число + if let Ok(int_val) = field.parse::() { + return Value::Number(int_val.into()); + } + + // Пытаемся распарсить как число с плавающей точкой + if let Ok(float_val) = field.parse::() { + if let Some(num) = serde_json::Number::from_f64(float_val) { + return Value::Number(num); + } + } + + // Проверяем булевые значения + match field.to_lowercase().as_str() { + "true" => return Value::Bool(true), + "false" => return Value::Bool(false), + _ => {} + } + + // По умолчанию возвращаем как строку + Value::String(field.to_string()) + } + + pub fn export_csv(&self, collection_name: &str, file_path: &str) -> Result { + println!("Exporting collection '{}' to CSV file '{}'", collection_name, file_path); + + // Создаем файл для записи + let file = File::create(file_path) + .map_err(|e| crate::common::FutriixError::IoError(e))?; + + let mut writer = Writer::from_writer(BufWriter::new(file)); + + // Запрашиваем все документы из коллекции + let command = crate::common::protocol::Command::Query { + collection: collection_name.to_string(), + filter: vec![], + }; + + let response = self.database.execute_command(command)?; + + let documents = match response { + crate::common::protocol::Response::Success(data) => { + serde_json::from_slice::>(&data) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))? + } + crate::common::protocol::Response::Error(e) => { + return Err(crate::common::FutriixError::DatabaseError(e)); + } + }; + + if documents.is_empty() { + println!("Collection '{}' is empty", collection_name); + return Ok(0); + } + + // Собираем все уникальные заголовки из документов + let mut all_headers = std::collections::HashSet::new(); + for document in &documents { + if let Value::Object(obj) = document { + for key in obj.keys() { + all_headers.insert(key.clone()); + } + } + } + + let mut headers: Vec = all_headers.into_iter().collect(); + headers.sort(); + + // Записываем заголовки + writer.write_record(&headers) + .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?; + + let mut record_count = 0; + + // Экспортируем документы + for document in documents { + if let Value::Object(obj) = document { + let mut record = Vec::new(); + + for header in &headers { + let value = obj.get(header).unwrap_or(&Value::Null); + let value_str = Self::value_to_string(value); + record.push(value_str); + } + + writer.write_record(&record) + .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?; + record_count += 1; + + if record_count % 100 == 0 { + println!("Exported {} records...", record_count); + } + } + } + + writer.flush() + .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?; + + println!("Successfully exported {} records to '{}'", record_count, file_path); + Ok(record_count) + } + + /// Конвертирует JSON значение в строку для CSV + fn value_to_string(value: &Value) -> String { + match value { + Value::String(s) => s.clone(), + Value::Number(n) => n.to_string(), + Value::Bool(b) => b.to_string(), + Value::Null => "".to_string(), + Value::Array(arr) => { + let items: Vec = arr.iter().map(Self::value_to_string).collect(); + format!("[{}]", items.join(",")) + } + Value::Object(_) => { + // Для объектов используем JSON строку + value.to_string() + } + } + } + + pub fn get_import_progress(&self, collection_name: &str) -> f64 { + self.import_progress.get(collection_name) + .unwrap_or(0.0) + } + + pub fn list_csv_files(&self) -> Result> { + let csv_dir = &self.config.import_dir; + let mut csv_files = Vec::new(); + + if let Ok(entries) = std::fs::read_dir(csv_dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_file() { + if let Some(extension) = path.extension() { + if extension == "csv" || extension == "CSV" { + if let Some(file_name) = path.file_name() { + csv_files.push(file_name.to_string_lossy().to_string()); + } + } + } + } + } + } + + Ok(csv_files) + } + + pub fn get_import_file_path(&self, file_name: &str) -> String { + Path::new(&self.config.import_dir) + .join(file_name) + .to_string_lossy() + .to_string() + } + + pub fn get_export_file_path(&self, file_name: &str) -> String { + Path::new(&self.config.export_dir) + .join(file_name) + .to_string_lossy() + .to_string() + } + + pub fn file_exists(&self, file_path: &str) -> bool { + Path::new(file_path).exists() + } + + pub fn active_imports_count(&self) -> usize { + self.import_progress.active_imports() + } +}