diff --git a/src/server/csv_import_export.rs b/src/server/csv_import_export.rs new file mode 100644 index 0000000..8f4387e --- /dev/null +++ b/src/server/csv_import_export.rs @@ -0,0 +1,238 @@ +// src/server/csv_import_export.rs +//! Модуль для импорта/экспорта данных в формате CSV +//! +//! Обеспечивает lock-free операции импорта CSV в базу данных +//! и экспорта коллекций в CSV формат. + +use std::sync::Arc; +use std::fs::File; +use std::io::{BufReader, BufWriter}; +use csv::{Reader, Writer}; +use serde_json::Value; +use dashmap::DashMap; + +use crate::common::Result; +use crate::common::config::CsvConfig; +use crate::server::database::Database; + +/// Менеджер CSV операций +#[derive(Clone)] +pub struct CsvManager { + database: Arc, + config: CsvConfig, + import_progress: Arc>, // Lock-free отслеживание прогресса +} + +impl CsvManager { + /// Создание нового менеджера CSV + pub fn new(database: Arc, config: CsvConfig) -> Self { + Self { + database, + config, + import_progress: Arc::new(DashMap::new()), + } + } + + /// Импорт CSV файла в коллекцию + pub fn import_csv(&self, collection_name: &str, file_path: &str) -> Result { + println!("Importing CSV file '{}' into collection '{}'", file_path, collection_name); + + let file = File::open(file_path) + .map_err(|e| crate::common::FutriixError::IoError(e))?; + + let mut reader = Reader::from_reader(BufReader::new(file)); + + // Получаем заголовки заранее, чтобы избежать множественных заимствований + let headers: Vec = reader.headers() + .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))? + .iter() + .map(|s| s.to_string()) + .collect(); + + let mut record_count = 0; + + // ИСПРАВЛЕНИЕ: Получаем общее количество записей для точного расчета прогресса + let total_records = reader.records().count(); + reader = Reader::from_reader(BufReader::new(File::open(file_path) + .map_err(|e| crate::common::FutriixError::IoError(e))?)); + + // Пропускаем заголовки + let _ = reader.headers(); + + // Устанавливаем начальный прогресс + self.import_progress.insert(collection_name.to_string(), 0.0); + + for result in reader.records() { + let record = result + .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?; + + // Преобразуем CSV запись в JSON документ + let mut document = serde_json::Map::new(); + + for (i, field) in record.iter().enumerate() { + let header = if i < headers.len() { + &headers[i] + } else { + &format!("field_{}", i) + }; + + // Пытаемся определить тип данных + let value = if let Ok(num) = field.parse::() { + Value::Number(serde_json::Number::from_f64(num).unwrap_or(serde_json::Number::from(0))) + } else if field.eq_ignore_ascii_case("true") { + Value::Bool(true) + } else if field.eq_ignore_ascii_case("false") { + Value::Bool(false) + } else { + Value::String(field.to_string()) + }; + + document.insert(header.to_string(), value); + } + + // Сохраняем документ в базу данных + let json_value = Value::Object(document); + let json_string = serde_json::to_string(&json_value) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + let command = crate::common::protocol::Command::Create { + collection: collection_name.to_string(), + document: json_string.into_bytes(), + }; + + // ИСПРАВЛЕНИЕ: Обрабатываем ошибки выполнения команды + match self.database.execute_command(command) { + Ok(_) => { + record_count += 1; + + // Обновляем прогресс + if total_records > 0 { + let progress = (record_count as f64 / total_records as f64) * 100.0; + self.import_progress.insert(collection_name.to_string(), progress); + } + + if record_count % 100 == 0 { + println!("Imported {} records...", record_count); + } + } + Err(e) => { + eprintln!("Failed to import record {}: {}", record_count, e); + // Продолжаем импорт остальных записей + } + } + } + + // Завершаем импорт + self.import_progress.insert(collection_name.to_string(), 100.0); + println!("Successfully imported {} records into collection '{}'", record_count, collection_name); + + Ok(record_count) + } + + /// Экспорт коллекции в CSV файл + pub fn export_csv(&self, collection_name: &str, file_path: &str) -> Result { + println!("Exporting collection '{}' to CSV file '{}'", collection_name, file_path); + + let file = File::create(file_path) + .map_err(|e| crate::common::FutriixError::IoError(e))?; + + let mut writer = Writer::from_writer(BufWriter::new(file)); + + // Получаем все документы из коллекции + let command = crate::common::protocol::Command::Query { + collection: collection_name.to_string(), + filter: vec![], // Без фильтра + }; + + let response = self.database.execute_command(command)?; + + let documents = match response { + crate::common::protocol::Response::Success(data) => { + serde_json::from_slice::>(&data) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))? + } + crate::common::protocol::Response::Error(e) => { + return Err(crate::common::FutriixError::DatabaseError(e)); + } + }; + + if documents.is_empty() { + println!("Collection '{}' is empty", collection_name); + return Ok(0); + } + + // Определяем заголовки из первого документа + let first_doc = &documents[0]; + let headers: Vec = if let Value::Object(obj) = first_doc { + obj.keys().map(|k| k.to_string()).collect() + } else { + vec!["data".to_string()] + }; + + // Записываем заголовки + writer.write_record(&headers) + .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?; + + let mut record_count = 0; + + // Записываем данные + for document in documents { + if let Value::Object(obj) = document { + let mut record = Vec::new(); + + // Сохраняем порядок полей согласно заголовкам + for header in &headers { + let value = obj.get(header).unwrap_or(&Value::Null); + let value_str = match value { + Value::String(s) => s.clone(), + Value::Number(n) => n.to_string(), + Value::Bool(b) => b.to_string(), + Value::Null => "".to_string(), + _ => value.to_string(), + }; + record.push(value_str); + } + + writer.write_record(&record) + .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?; + record_count += 1; + } + } + + writer.flush() + .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?; + + println!("Successfully exported {} records to '{}'", record_count, file_path); + Ok(record_count) + } + + /// Получение прогресса импорта + pub fn get_import_progress(&self, collection_name: &str) -> f64 { + self.import_progress.get(collection_name) + .map(|entry| *entry.value()) + .unwrap_or(0.0) + } + + /// Список CSV файлов в директории + pub fn list_csv_files(&self) -> Result> { + let csv_dir = &self.config.import_dir; + let mut csv_files = Vec::new(); + + if let Ok(entries) = std::fs::read_dir(csv_dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_file() { + if let Some(extension) = path.extension() { + if extension == "csv" { + if let Some(file_name) = path.file_name() { + csv_files.push(file_name.to_string_lossy().to_string()); + } + } + } + } + } + } + + Ok(csv_files) + } +}