// src/server/csv_import_export.rs //! Модуль для импорта/экспорта данных в формате CSV //! //! Обеспечивает lock-free операции импорта CSV в базу данных //! и экспорта коллекций в CSV формат. use std::sync::Arc; use std::fs::File; use std::io::{BufReader, BufWriter}; use csv::{Reader, Writer}; use serde_json::Value; use dashmap::DashMap; use crate::common::Result; use crate::common::config::CsvConfig; use crate::server::database::Database; /// Менеджер CSV операций #[derive(Clone)] pub struct CsvManager { database: Arc, config: CsvConfig, import_progress: Arc>, // Lock-free отслеживание прогресса } impl CsvManager { /// Создание нового менеджера CSV pub fn new(database: Arc, config: CsvConfig) -> Self { Self { database, config, import_progress: Arc::new(DashMap::new()), } } /// Импорт CSV файла в коллекцию pub fn import_csv(&self, collection_name: &str, file_path: &str) -> Result { println!("Importing CSV file '{}' into collection '{}'", file_path, collection_name); let file = File::open(file_path) .map_err(|e| crate::common::FutriixError::IoError(e))?; let mut reader = Reader::from_reader(BufReader::new(file)); // Получаем заголовки заранее, чтобы избежать множественных заимствований let headers: Vec = reader.headers() .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))? .iter() .map(|s| s.to_string()) .collect(); let mut record_count = 0; // ИСПРАВЛЕНИЕ: Получаем общее количество записей для точного расчета прогресса let total_records = reader.records().count(); reader = Reader::from_reader(BufReader::new(File::open(file_path) .map_err(|e| crate::common::FutriixError::IoError(e))?)); // Пропускаем заголовки let _ = reader.headers(); // Устанавливаем начальный прогресс self.import_progress.insert(collection_name.to_string(), 0.0); for result in reader.records() { let record = result .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?; // Преобразуем CSV запись в JSON документ let mut document = serde_json::Map::new(); for (i, field) in record.iter().enumerate() { let header = if i < headers.len() { &headers[i] } else { &format!("field_{}", i) }; // Пытаемся определить тип данных let value = if let Ok(num) = field.parse::() { Value::Number(serde_json::Number::from_f64(num).unwrap_or(serde_json::Number::from(0))) } else if field.eq_ignore_ascii_case("true") { Value::Bool(true) } else if field.eq_ignore_ascii_case("false") { Value::Bool(false) } else { Value::String(field.to_string()) }; document.insert(header.to_string(), value); } // Сохраняем документ в базу данных let json_value = Value::Object(document); let json_string = serde_json::to_string(&json_value) .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; let command = crate::common::protocol::Command::Create { collection: collection_name.to_string(), document: json_string.into_bytes(), }; // ИСПРАВЛЕНИЕ: Обрабатываем ошибки выполнения команды match self.database.execute_command(command) { Ok(_) => { record_count += 1; // Обновляем прогресс if total_records > 0 { let progress = (record_count as f64 / total_records as f64) * 100.0; self.import_progress.insert(collection_name.to_string(), progress); } if record_count % 100 == 0 { println!("Imported {} records...", record_count); } } Err(e) => { eprintln!("Failed to import record {}: {}", record_count, e); // Продолжаем импорт остальных записей } } } // Завершаем импорт self.import_progress.insert(collection_name.to_string(), 100.0); println!("Successfully imported {} records into collection '{}'", record_count, collection_name); Ok(record_count) } /// Экспорт коллекции в CSV файл pub fn export_csv(&self, collection_name: &str, file_path: &str) -> Result { println!("Exporting collection '{}' to CSV file '{}'", collection_name, file_path); let file = File::create(file_path) .map_err(|e| crate::common::FutriixError::IoError(e))?; let mut writer = Writer::from_writer(BufWriter::new(file)); // Получаем все документы из коллекции let command = crate::common::protocol::Command::Query { collection: collection_name.to_string(), filter: vec![], // Без фильтра }; let response = self.database.execute_command(command)?; let documents = match response { crate::common::protocol::Response::Success(data) => { serde_json::from_slice::>(&data) .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))? } crate::common::protocol::Response::Error(e) => { return Err(crate::common::FutriixError::DatabaseError(e)); } }; if documents.is_empty() { println!("Collection '{}' is empty", collection_name); return Ok(0); } // Определяем заголовки из первого документа let first_doc = &documents[0]; let headers: Vec = if let Value::Object(obj) = first_doc { obj.keys().map(|k| k.to_string()).collect() } else { vec!["data".to_string()] }; // Записываем заголовки writer.write_record(&headers) .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?; let mut record_count = 0; // Записываем данные for document in documents { if let Value::Object(obj) = document { let mut record = Vec::new(); // Сохраняем порядок полей согласно заголовкам for header in &headers { let value = obj.get(header).unwrap_or(&Value::Null); let value_str = match value { Value::String(s) => s.clone(), Value::Number(n) => n.to_string(), Value::Bool(b) => b.to_string(), Value::Null => "".to_string(), _ => value.to_string(), }; record.push(value_str); } writer.write_record(&record) .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?; record_count += 1; } } writer.flush() .map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?; println!("Successfully exported {} records to '{}'", record_count, file_path); Ok(record_count) } /// Получение прогресса импорта pub fn get_import_progress(&self, collection_name: &str) -> f64 { self.import_progress.get(collection_name) .map(|entry| *entry.value()) .unwrap_or(0.0) } /// Список CSV файлов в директории pub fn list_csv_files(&self) -> Result> { let csv_dir = &self.config.import_dir; let mut csv_files = Vec::new(); if let Ok(entries) = std::fs::read_dir(csv_dir) { for entry in entries.flatten() { let path = entry.path(); if path.is_file() { if let Some(extension) = path.extension() { if extension == "csv" { if let Some(file_name) = path.file_name() { csv_files.push(file_name.to_string_lossy().to_string()); } } } } } } Ok(csv_files) } }