futriix/src/server/csv_import_export.rs

425 lines
16 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// src/server/csv_import_export.rs
//! Lock-free модуль для импорта/экспорта данных в формате CSV
//!
//! Основные функции:
//! 1. Импорт CSV файлов в коллекции базы данных
//! 2. Экспорт коллекций в CSV файлы
//! 3. Отслеживание прогресса импорта с атомарными счетчиками
//! 4. Буферизация операций для повышения производительности
//!
//! Особенности:
//! - Атомарные операции без блокировок через DashMap
//! - Автоматическое определение типов данных
//! - Обработка больших файлов с пагинацией
//! - Отслеживание прогресса в реальном времени
use std::sync::Arc;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use csv::{Reader, Writer};
use serde_json::Value;
use dashmap::DashMap;
use tokio::sync::RwLock;
use crate::common::Result;
use crate::common::config::CsvConfig;
use crate::server::database::Database;
/// Lock-free хэш-таблица для прогресса импорта
/// Использует DashMap для атомарного доступа к данным прогресса
struct LockFreeProgressMap {
progress_data: DashMap<String, f64>, // Данные прогресса по коллекциям
active_imports: AtomicUsize, // Количество активных импортов
}
impl LockFreeProgressMap {
fn new() -> Self {
Self {
progress_data: DashMap::new(),
active_imports: AtomicUsize::new(0),
}
}
fn insert(&self, key: String, value: f64) -> Option<f64> {
if value == 100.0 {
// Импорт завершен, удаляем из активных
self.active_imports.fetch_sub(1, Ordering::SeqCst);
} else if value == 0.0 {
// Начинаем новый импорт
self.active_imports.fetch_add(1, Ordering::SeqCst);
}
self.progress_data.insert(key, value)
}
fn get(&self, key: &str) -> Option<f64> {
self.progress_data.get(key).map(|entry| *entry)
}
fn remove(&self, key: &str) -> Option<f64> {
self.progress_data.remove(key).map(|(_, v)| v)
}
fn len(&self) -> usize {
self.progress_data.len()
}
fn active_imports(&self) -> usize {
self.active_imports.load(Ordering::SeqCst)
}
}
/// Менеджер CSV операций с lock-free архитектурой
#[derive(Clone)]
pub struct CsvManager {
database: Arc<Database>, // Ссылка на базу данных
config: CsvConfig, // Конфигурация CSV операций
import_progress: Arc<LockFreeProgressMap>, // Прогресс импорта
import_buffer: Arc<RwLock<DashMap<String, Vec<Value>>>>, // Буфер для импорта
}
impl CsvManager {
/// Создает новый менеджер CSV операций
pub fn new(database: Arc<Database>, config: CsvConfig) -> Self {
// Создаем директории для импорта и экспорта, если они не существуют
let _ = std::fs::create_dir_all(&config.import_dir);
let _ = std::fs::create_dir_all(&config.export_dir);
Self {
database,
config,
import_progress: Arc::new(LockFreeProgressMap::new()),
import_buffer: Arc::new(RwLock::new(DashMap::new())),
}
}
/// Импортирует CSV файл в указанную коллекцию
pub fn import_csv(&self, collection_name: &str, file_path: &str) -> Result<usize> {
// Проверяем размер файла
let metadata = std::fs::metadata(file_path)
.map_err(|e| crate::common::FutriixError::IoError(e))?;
if metadata.len() > self.config.max_file_size {
return Err(crate::common::FutriixError::CsvError(
format!("File size {} exceeds maximum allowed size {}",
metadata.len(), self.config.max_file_size)
));
}
println!("Importing CSV file '{}' into collection '{}'", file_path, collection_name);
// Открываем файл для чтения
let file = File::open(file_path)
.map_err(|e| crate::common::FutriixError::IoError(e))?;
let mut reader = Reader::from_reader(BufReader::new(file));
// Читаем заголовки
let headers: Vec<String> = reader.headers()
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?
.iter()
.map(|s| s.to_string())
.collect();
let mut record_count = 0;
let mut error_count = 0;
// Начинаем импорт
self.import_progress.insert(collection_name.to_string(), 0.0);
// Буферизируем записи для batch вставки
let mut buffer = Vec::new();
for result in reader.records() {
let record = match result {
Ok(r) => r,
Err(e) => {
error_count += 1;
eprintln!("Failed to parse CSV record: {}", e);
continue;
}
};
let mut document = serde_json::Map::new();
// Преобразуем каждое поле CSV в JSON значение
for (i, field) in record.iter().enumerate() {
let header = if i < headers.len() {
&headers[i]
} else {
&format!("field_{}", i)
};
let value = Self::parse_field_value(field);
document.insert(header.to_string(), value);
}
let json_value = Value::Object(document);
buffer.push(json_value);
// Вставляем пачку записей при достижении лимита
if buffer.len() >= 100 {
match self.insert_batch(collection_name, &buffer) {
Ok(inserted) => {
record_count += inserted;
}
Err(e) => {
error_count += buffer.len();
eprintln!("Failed to insert batch: {}", e);
}
}
buffer.clear();
// Обновляем прогресс каждые 100 записей
if record_count % 100 == 0 {
println!("Imported {} records...", record_count);
let progress = (record_count as f64 / 1000.0) * 100.0;
self.import_progress.insert(collection_name.to_string(), progress.min(100.0));
}
}
}
// Вставляем оставшиеся записи
if !buffer.is_empty() {
match self.insert_batch(collection_name, &buffer) {
Ok(inserted) => {
record_count += inserted;
}
Err(e) => {
error_count += buffer.len();
eprintln!("Failed to insert final batch: {}", e);
}
}
}
// Завершаем импорт
self.import_progress.insert(collection_name.to_string(), 100.0);
if error_count > 0 {
println!("Import completed with {} successful records and {} errors",
record_count, error_count);
} else {
println!("Successfully imported {} records into collection '{}'",
record_count, collection_name);
}
Ok(record_count)
}
/// Вставляет пачку документов атомарно
fn insert_batch(&self, collection_name: &str, documents: &[Value]) -> Result<usize> {
let mut inserted = 0;
for document in documents {
let json_string = match serde_json::to_string(document) {
Ok(s) => s,
Err(e) => {
eprintln!("Failed to serialize document: {}", e);
continue;
}
};
let command = crate::common::protocol::Command::Create {
collection: collection_name.to_string(),
document: json_string.into_bytes(),
};
match self.database.execute_command(command) {
Ok(_) => {
inserted += 1;
}
Err(e) => {
eprintln!("Failed to import record: {}", e);
}
}
}
Ok(inserted)
}
/// Парсит строковое значение поля в соответствующий тип JSON
fn parse_field_value(field: &str) -> Value {
if field.is_empty() {
return Value::Null;
}
// Пытаемся распарсить как целое число
if let Ok(int_val) = field.parse::<i64>() {
return Value::Number(int_val.into());
}
// Пытаемся распарсить как число с плавающей точкой
if let Ok(float_val) = field.parse::<f64>() {
if let Some(num) = serde_json::Number::from_f64(float_val) {
return Value::Number(num);
}
}
// Проверяем булевые значения
match field.to_lowercase().as_str() {
"true" => return Value::Bool(true),
"false" => return Value::Bool(false),
_ => {}
}
// По умолчанию возвращаем как строку
Value::String(field.to_string())
}
/// Экспортирует коллекцию в CSV файл
pub fn export_csv(&self, collection_name: &str, file_path: &str) -> Result<usize> {
println!("Exporting collection '{}' to CSV file '{}'", collection_name, file_path);
// Создаем файл для записи
let file = File::create(file_path)
.map_err(|e| crate::common::FutriixError::IoError(e))?;
let mut writer = Writer::from_writer(BufWriter::new(file));
// Запрашиваем все документы из коллекции
let command = crate::common::protocol::Command::Query {
collection: collection_name.to_string(),
filter: vec![],
};
let response = self.database.execute_command(command)?;
let documents = match response {
crate::common::protocol::Response::Success(data) => {
serde_json::from_slice::<Vec<Value>>(&data)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?
}
crate::common::protocol::Response::Error(e) => {
return Err(crate::common::FutriixError::DatabaseError(e));
}
};
if documents.is_empty() {
println!("Collection '{}' is empty", collection_name);
return Ok(0);
}
// Собираем все уникальные заголовки из документов
let mut all_headers = std::collections::HashSet::new();
for document in &documents {
if let Value::Object(obj) = document {
for key in obj.keys() {
all_headers.insert(key.clone());
}
}
}
let mut headers: Vec<String> = all_headers.into_iter().collect();
headers.sort();
// Записываем заголовки
writer.write_record(&headers)
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?;
let mut record_count = 0;
// Экспортируем документы
for document in documents {
if let Value::Object(obj) = document {
let mut record = Vec::new();
for header in &headers {
let value = obj.get(header).unwrap_or(&Value::Null);
let value_str = Self::value_to_string(value);
record.push(value_str);
}
writer.write_record(&record)
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?;
record_count += 1;
// Отображаем прогресс каждые 100 записей
if record_count % 100 == 0 {
println!("Exported {} records...", record_count);
}
}
}
writer.flush()
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?;
println!("Successfully exported {} records to '{}'", record_count, file_path);
Ok(record_count)
}
/// Конвертирует JSON значение в строку для CSV
fn value_to_string(value: &Value) -> String {
match value {
Value::String(s) => s.clone(),
Value::Number(n) => n.to_string(),
Value::Bool(b) => b.to_string(),
Value::Null => "".to_string(),
Value::Array(arr) => {
let items: Vec<String> = arr.iter().map(Self::value_to_string).collect();
format!("[{}]", items.join(","))
}
Value::Object(_) => {
// Для объектов используем JSON строку
value.to_string()
}
}
}
/// Получает прогресс импорта для указанной коллекции
pub fn get_import_progress(&self, collection_name: &str) -> f64 {
self.import_progress.get(collection_name)
.unwrap_or(0.0)
}
/// Список доступных CSV файлов в директории импорта
pub fn list_csv_files(&self) -> Result<Vec<String>> {
let csv_dir = &self.config.import_dir;
let mut csv_files = Vec::new();
if let Ok(entries) = std::fs::read_dir(csv_dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_file() {
if let Some(extension) = path.extension() {
if extension == "csv" || extension == "CSV" {
if let Some(file_name) = path.file_name() {
csv_files.push(file_name.to_string_lossy().to_string());
}
}
}
}
}
}
Ok(csv_files)
}
/// Полный путь к файлу импорта
pub fn get_import_file_path(&self, file_name: &str) -> String {
Path::new(&self.config.import_dir)
.join(file_name)
.to_string_lossy()
.to_string()
}
/// Полный путь к файлу экспорта
pub fn get_export_file_path(&self, file_name: &str) -> String {
Path::new(&self.config.export_dir)
.join(file_name)
.to_string_lossy()
.to_string()
}
/// Проверяет существование файла
pub fn file_exists(&self, file_path: &str) -> bool {
Path::new(file_path).exists()
}
/// Количество активных импортов
pub fn active_imports_count(&self) -> usize {
self.import_progress.active_imports()
}
}