From 2e70694485179bf7329e94b2f9a109c18538b274 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Tue, 20 Jan 2026 20:39:19 +0000 Subject: [PATCH] Delete src/server/csv_import_export.rs --- src/server/csv_import_export.rs | 383 -------------------------------- 1 file changed, 383 deletions(-) delete mode 100644 src/server/csv_import_export.rs diff --git a/src/server/csv_import_export.rs b/src/server/csv_import_export.rs deleted file mode 100644 index b44241a..0000000 --- a/src/server/csv_import_export.rs +++ /dev/null @@ -1,383 +0,0 @@ -// src/server/csv_import_export.rs -//! Lock-free модуль для импорта/экспорта данных в формате CSV -//! -//! Основные функции: -//! 1. Импорт CSV файлов в коллекции базы данных -//! 2. Экспорт коллекций в CSV файлы -//! 3. Отслеживание прогресса импорта с атомарными счетчиками -//! 4. Буферизация операций для повышения производительности -//! -//! Особенности: -//! - Атомарные операции без блокировок через DashMap -//! - Автоматическое определение типов данных -//! - Обработка больших файлов с пагинацией -//! - Отслеживание прогресса в реальном времени - -use std::sync::Arc; -use std::fs::File; -use std::io::{BufReader, BufWriter}; -use std::path::Path; -use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; -use csv::{Reader, Writer}; -use serde_json::Value; -use dashmap::DashMap; -use tokio::sync::RwLock; - -use crate::common::Result; -use crate::common::config::CsvConfig; -use crate::server::database::Database; - -/// Lock-free хэш-таблица для прогресса импорта -/// Использует DashMap для атомарного доступа к данным прогресса -struct LockFreeProgressMap { - progress_data: DashMap, - active_imports: AtomicUsize, -} - -impl LockFreeProgressMap { - fn new() -> Self { - Self { - progress_data: DashMap::new(), - active_imports: AtomicUsize::new(0), - } - } - - fn insert(&self, key: String, value: f64) -> Option { - if value == 100.0 { - // Импорт завершен, удаляем из активных - self.active_imports.fetch_sub(1, Ordering::SeqCst); - } else if value == 0.0 { - // Начинаем новый импорт - self.active_imports.fetch_add(1, Ordering::SeqCst); - } - - self.progress_data.insert(key, value) - } - - fn get(&self, key: &str) -> Option { - self.progress_data.get(key).map(|entry| *entry) - } - - fn remove(&self, key: &str) -> Option { - self.progress_data.remove(key).map(|(_, v)| v) - } - - fn len(&self) -> usize { - self.progress_data.len() - } - - fn active_imports(&self) -> usize { - self.active_imports.load(Ordering::SeqCst) - } -} - -/// Менеджер CSV операций с lock-free архитектурой -#[derive(Clone)] -pub struct CsvManager { - database: Arc, - config: CsvConfig, - import_progress: Arc, - // Используем RwLock для буферизации, но без блокировок на операции - import_buffer: Arc>>>, -} - -impl CsvManager { - pub fn new(database: Arc, config: CsvConfig) -> Self { - // Создаем директории для импорта и экспорта, если они не существуют - let _ = std::fs::create_dir_all(&config.import_dir); - let _ = std::fs::create_dir_all(&config.export_dir); - - Self { - database, - config, - import_progress: Arc::new(LockFreeProgressMap::new()), - import_buffer: Arc::new(RwLock::new(DashMap::new())), - } - } - - pub fn import_csv(&self, collection_name: &str, file_path: &str) -> Result { - // Проверяем размер файла - let metadata = std::fs::metadata(file_path) - .map_err(|e| crate::common::FutriixError::IoError(e))?; - - if metadata.len() > self.config.max_file_size { - return Err(crate::common::FutriixError::CsvError( - format!("File size {} exceeds maximum allowed size {}", - metadata.len(), self.config.max_file_size) - )); - } - - println!("Importing CSV file '{}' into collection '{}'", file_path, collection_name); - - // Открываем файл для чтения - let file = File::open(file_path) - .map_err(|e| crate::common::FutriixError::IoError(e))?; - - let mut reader = Reader::from_reader(BufReader::new(file)); - - // Читаем заголовки - let headers: Vec = reader.headers()? - .iter() - .map(|s| s.to_string()) - .collect(); - - let mut record_count = 0; - // Убираем mut, так как переменная не изменяется - let error_count = 0; - - // Начинаем импорт - self.import_progress.insert(collection_name.to_string(), 0.0); - - // Буферизируем записи для batch вставки - let mut buffer = Vec::new(); - - for result in reader.records() { - let record = result?; - - let mut document = serde_json::Map::new(); - - for (i, field) in record.iter().enumerate() { - let header = if i < headers.len() { - &headers[i] - } else { - &format!("field_{}", i) - }; - - let value = Self::parse_field_value(field); - document.insert(header.to_string(), value); - } - - let json_value = Value::Object(document); - buffer.push(json_value); - - if buffer.len() >= 100 { - // Вставляем пачку записей - let inserted = self.insert_batch(collection_name, &buffer)?; - record_count += inserted; - buffer.clear(); - - if record_count % 100 == 0 { - println!("Imported {} records...", record_count); - - let progress = (record_count as f64 / 1000.0) * 100.0; - self.import_progress.insert(collection_name.to_string(), progress); - } - } - } - - // Вставляем оставшиеся записи - if !buffer.is_empty() { - let inserted = self.insert_batch(collection_name, &buffer)?; - record_count += inserted; - } - - // Завершаем импорт - self.import_progress.insert(collection_name.to_string(), 100.0); - - if error_count > 0 { - println!("Import completed with {} successful records and {} errors", - record_count, error_count); - } else { - println!("Successfully imported {} records into collection '{}'", - record_count, collection_name); - } - - Ok(record_count) - } - - /// Вставляет пачку документов атомарно - fn insert_batch(&self, collection_name: &str, documents: &[Value]) -> Result { - let mut inserted = 0; - - for document in documents { - let json_string = serde_json::to_string(document)?; - - let command = crate::common::protocol::Command::Create { - collection: collection_name.to_string(), - document: json_string.into_bytes(), - }; - - match self.database.execute_command(command) { - Ok(_) => { - inserted += 1; - } - Err(e) => { - eprintln!("Failed to import record: {}", e); - } - } - } - - Ok(inserted) - } - - /// Парсит строковое значение поля в соответствующий тип JSON - fn parse_field_value(field: &str) -> Value { - if field.is_empty() { - return Value::Null; - } - - // Пытаемся распарсить как целое число - if let Ok(int_val) = field.parse::() { - return Value::Number(int_val.into()); - } - - // Пытаемся распарсить как число с плавающей точкой - if let Ok(float_val) = field.parse::() { - if let Some(num) = serde_json::Number::from_f64(float_val) { - return Value::Number(num); - } - } - - // Проверяем булевые значения - match field.to_lowercase().as_str() { - "true" => return Value::Bool(true), - "false" => return Value::Bool(false), - _ => {} - } - - // По умолчанию возвращаем как строку - Value::String(field.to_string()) - } - - pub fn export_csv(&self, collection_name: &str, file_path: &str) -> Result { - println!("Exporting collection '{}' to CSV file '{}'", collection_name, file_path); - - // Создаем файл для записи - let file = File::create(file_path) - .map_err(|e| crate::common::FutriixError::IoError(e))?; - - let mut writer = Writer::from_writer(BufWriter::new(file)); - - // Запрашиваем все документы из коллекции - let command = crate::common::protocol::Command::Query { - collection: collection_name.to_string(), - filter: vec![], - }; - - let response = self.database.execute_command(command)?; - - let documents = match response { - crate::common::protocol::Response::Success(data) => { - serde_json::from_slice::>(&data)? - } - crate::common::protocol::Response::Error(e) => { - return Err(crate::common::FutriixError::DatabaseError(e)); - } - }; - - if documents.is_empty() { - println!("Collection '{}' is empty", collection_name); - return Ok(0); - } - - // Собираем все уникальные заголовки из документов - // Убираем mut, так как DashSet не требует изменяемости для итерации - let all_headers = dashmap::DashSet::new(); - for document in &documents { - if let Value::Object(obj) = document { - for key in obj.keys() { - all_headers.insert(key.clone()); - } - } - } - - let headers: Vec = all_headers.into_iter().collect(); - - // Записываем заголовки - writer.write_record(&headers)?; - - let mut record_count = 0; - - // Экспортируем документы - for document in documents { - if let Value::Object(obj) = document { - let mut record = Vec::new(); - - for header in &headers { - let value = obj.get(header).unwrap_or(&Value::Null); - let value_str = Self::value_to_string(value); - record.push(value_str); - } - - writer.write_record(&record)?; - record_count += 1; - - if record_count % 100 == 0 { - println!("Exported {} records...", record_count); - } - } - } - - writer.flush()?; - - println!("Successfully exported {} records to '{}'", record_count, file_path); - Ok(record_count) - } - - /// Конвертирует JSON значение в строку для CSV - fn value_to_string(value: &Value) -> String { - match value { - Value::String(s) => s.clone(), - Value::Number(n) => n.to_string(), - Value::Bool(b) => b.to_string(), - Value::Null => "".to_string(), - Value::Array(arr) => { - let items: Vec = arr.iter().map(Self::value_to_string).collect(); - format!("[{}]", items.join(",")) - } - Value::Object(_) => { - // Для объектов используем JSON строку - value.to_string() - } - } - } - - pub fn get_import_progress(&self, collection_name: &str) -> f64 { - self.import_progress.get(collection_name) - .unwrap_or(0.0) - } - - pub fn list_csv_files(&self) -> Result> { - let csv_dir = &self.config.import_dir; - let mut csv_files = Vec::new(); - - if let Ok(entries) = std::fs::read_dir(csv_dir) { - for entry in entries.flatten() { - let path = entry.path(); - if path.is_file() { - if let Some(extension) = path.extension() { - if extension == "csv" || extension == "CSV" { - if let Some(file_name) = path.file_name() { - csv_files.push(file_name.to_string_lossy().to_string()); - } - } - } - } - } - } - - Ok(csv_files) - } - - pub fn get_import_file_path(&self, file_name: &str) -> String { - Path::new(&self.config.import_dir) - .join(file_name) - .to_string_lossy() - .to_string() - } - - pub fn get_export_file_path(&self, file_name: &str) -> String { - Path::new(&self.config.export_dir) - .join(file_name) - .to_string_lossy() - .to_string() - } - - pub fn file_exists(&self, file_path: &str) -> bool { - Path::new(file_path).exists() - } - - pub fn active_imports_count(&self) -> usize { - self.import_progress.active_imports() - } -}