415 lines
15 KiB
Rust
415 lines
15 KiB
Rust
|
|
// src/server/csv_import_export.rs
|
|||
|
|
//! Lock-free модуль для импорта/экспорта данных в формате CSV
|
|||
|
|
//!
|
|||
|
|
//! Основные функции:
|
|||
|
|
//! 1. Импорт CSV файлов в коллекции базы данных
|
|||
|
|
//! 2. Экспорт коллекций в CSV файлы
|
|||
|
|
//! 3. Отслеживание прогресса импорта с атомарными счетчиками
|
|||
|
|
//! 4. Буферизация операций для повышения производительности
|
|||
|
|
//!
|
|||
|
|
//! Особенности:
|
|||
|
|
//! - Атомарные операции без блокировок через DashMap
|
|||
|
|
//! - Автоматическое определение типов данных
|
|||
|
|
//! - Обработка больших файлов с пагинацией
|
|||
|
|
//! - Отслеживание прогресса в реальном времени
|
|||
|
|
|
|||
|
|
use std::sync::Arc;
|
|||
|
|
use std::fs::File;
|
|||
|
|
use std::io::{BufReader, BufWriter};
|
|||
|
|
use std::path::Path;
|
|||
|
|
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
|
|||
|
|
use csv::{Reader, Writer};
|
|||
|
|
use serde_json::Value;
|
|||
|
|
use dashmap::DashMap;
|
|||
|
|
use tokio::sync::RwLock;
|
|||
|
|
|
|||
|
|
use crate::common::Result;
|
|||
|
|
use crate::common::config::CsvConfig;
|
|||
|
|
use crate::server::database::Database;
|
|||
|
|
|
|||
|
|
/// Lock-free хэш-таблица для прогресса импорта
|
|||
|
|
/// Использует DashMap для атомарного доступа к данным прогресса
|
|||
|
|
struct LockFreeProgressMap {
|
|||
|
|
progress_data: DashMap<String, f64>,
|
|||
|
|
active_imports: AtomicUsize,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
impl LockFreeProgressMap {
|
|||
|
|
fn new() -> Self {
|
|||
|
|
Self {
|
|||
|
|
progress_data: DashMap::new(),
|
|||
|
|
active_imports: AtomicUsize::new(0),
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn insert(&self, key: String, value: f64) -> Option<f64> {
|
|||
|
|
if value == 100.0 {
|
|||
|
|
// Импорт завершен, удаляем из активных
|
|||
|
|
self.active_imports.fetch_sub(1, Ordering::SeqCst);
|
|||
|
|
} else if value == 0.0 {
|
|||
|
|
// Начинаем новый импорт
|
|||
|
|
self.active_imports.fetch_add(1, Ordering::SeqCst);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
self.progress_data.insert(key, value)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn get(&self, key: &str) -> Option<f64> {
|
|||
|
|
self.progress_data.get(key).map(|entry| *entry)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn remove(&self, key: &str) -> Option<f64> {
|
|||
|
|
self.progress_data.remove(key).map(|(_, v)| v)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn len(&self) -> usize {
|
|||
|
|
self.progress_data.len()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
fn active_imports(&self) -> usize {
|
|||
|
|
self.active_imports.load(Ordering::SeqCst)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// Менеджер CSV операций с lock-free архитектурой
|
|||
|
|
#[derive(Clone)]
|
|||
|
|
pub struct CsvManager {
|
|||
|
|
database: Arc<Database>,
|
|||
|
|
config: CsvConfig,
|
|||
|
|
import_progress: Arc<LockFreeProgressMap>,
|
|||
|
|
// Используем RwLock для буферизации, но без блокировок на операции
|
|||
|
|
import_buffer: Arc<RwLock<DashMap<String, Vec<Value>>>>,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
impl CsvManager {
|
|||
|
|
pub fn new(database: Arc<Database>, config: CsvConfig) -> Self {
|
|||
|
|
// Создаем директории для импорта и экспорта, если они не существуют
|
|||
|
|
let _ = std::fs::create_dir_all(&config.import_dir);
|
|||
|
|
let _ = std::fs::create_dir_all(&config.export_dir);
|
|||
|
|
|
|||
|
|
Self {
|
|||
|
|
database,
|
|||
|
|
config,
|
|||
|
|
import_progress: Arc::new(LockFreeProgressMap::new()),
|
|||
|
|
import_buffer: Arc::new(RwLock::new(DashMap::new())),
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub fn import_csv(&self, collection_name: &str, file_path: &str) -> Result<usize> {
|
|||
|
|
// Проверяем размер файла
|
|||
|
|
let metadata = std::fs::metadata(file_path)
|
|||
|
|
.map_err(|e| crate::common::FutriixError::IoError(e))?;
|
|||
|
|
|
|||
|
|
if metadata.len() > self.config.max_file_size {
|
|||
|
|
return Err(crate::common::FutriixError::CsvError(
|
|||
|
|
format!("File size {} exceeds maximum allowed size {}",
|
|||
|
|
metadata.len(), self.config.max_file_size)
|
|||
|
|
));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
println!("Importing CSV file '{}' into collection '{}'", file_path, collection_name);
|
|||
|
|
|
|||
|
|
// Открываем файл для чтения
|
|||
|
|
let file = File::open(file_path)
|
|||
|
|
.map_err(|e| crate::common::FutriixError::IoError(e))?;
|
|||
|
|
|
|||
|
|
let mut reader = Reader::from_reader(BufReader::new(file));
|
|||
|
|
|
|||
|
|
// Читаем заголовки
|
|||
|
|
let headers: Vec<String> = reader.headers()
|
|||
|
|
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?
|
|||
|
|
.iter()
|
|||
|
|
.map(|s| s.to_string())
|
|||
|
|
.collect();
|
|||
|
|
|
|||
|
|
let mut record_count = 0;
|
|||
|
|
let mut error_count = 0;
|
|||
|
|
|
|||
|
|
// Начинаем импорт
|
|||
|
|
self.import_progress.insert(collection_name.to_string(), 0.0);
|
|||
|
|
|
|||
|
|
// Буферизируем записи для batch вставки
|
|||
|
|
let mut buffer = Vec::new();
|
|||
|
|
|
|||
|
|
for result in reader.records() {
|
|||
|
|
let record = match result {
|
|||
|
|
Ok(r) => r,
|
|||
|
|
Err(e) => {
|
|||
|
|
error_count += 1;
|
|||
|
|
eprintln!("Failed to parse CSV record: {}", e);
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
let mut document = serde_json::Map::new();
|
|||
|
|
|
|||
|
|
for (i, field) in record.iter().enumerate() {
|
|||
|
|
let header = if i < headers.len() {
|
|||
|
|
&headers[i]
|
|||
|
|
} else {
|
|||
|
|
&format!("field_{}", i)
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
let value = Self::parse_field_value(field);
|
|||
|
|
document.insert(header.to_string(), value);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
let json_value = Value::Object(document);
|
|||
|
|
buffer.push(json_value);
|
|||
|
|
|
|||
|
|
if buffer.len() >= 100 {
|
|||
|
|
// Вставляем пачку записей
|
|||
|
|
match self.insert_batch(collection_name, &buffer) {
|
|||
|
|
Ok(inserted) => {
|
|||
|
|
record_count += inserted;
|
|||
|
|
}
|
|||
|
|
Err(e) => {
|
|||
|
|
error_count += buffer.len();
|
|||
|
|
eprintln!("Failed to insert batch: {}", e);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
buffer.clear();
|
|||
|
|
|
|||
|
|
if record_count % 100 == 0 {
|
|||
|
|
println!("Imported {} records...", record_count);
|
|||
|
|
|
|||
|
|
let progress = (record_count as f64 / 1000.0) * 100.0;
|
|||
|
|
self.import_progress.insert(collection_name.to_string(), progress.min(100.0));
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Вставляем оставшиеся записи
|
|||
|
|
if !buffer.is_empty() {
|
|||
|
|
match self.insert_batch(collection_name, &buffer) {
|
|||
|
|
Ok(inserted) => {
|
|||
|
|
record_count += inserted;
|
|||
|
|
}
|
|||
|
|
Err(e) => {
|
|||
|
|
error_count += buffer.len();
|
|||
|
|
eprintln!("Failed to insert final batch: {}", e);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Завершаем импорт
|
|||
|
|
self.import_progress.insert(collection_name.to_string(), 100.0);
|
|||
|
|
|
|||
|
|
if error_count > 0 {
|
|||
|
|
println!("Import completed with {} successful records and {} errors",
|
|||
|
|
record_count, error_count);
|
|||
|
|
} else {
|
|||
|
|
println!("Successfully imported {} records into collection '{}'",
|
|||
|
|
record_count, collection_name);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
Ok(record_count)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// Вставляет пачку документов атомарно
|
|||
|
|
fn insert_batch(&self, collection_name: &str, documents: &[Value]) -> Result<usize> {
|
|||
|
|
let mut inserted = 0;
|
|||
|
|
|
|||
|
|
for document in documents {
|
|||
|
|
let json_string = match serde_json::to_string(document) {
|
|||
|
|
Ok(s) => s,
|
|||
|
|
Err(e) => {
|
|||
|
|
eprintln!("Failed to serialize document: {}", e);
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
let command = crate::common::protocol::Command::Create {
|
|||
|
|
collection: collection_name.to_string(),
|
|||
|
|
document: json_string.into_bytes(),
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
match self.database.execute_command(command) {
|
|||
|
|
Ok(_) => {
|
|||
|
|
inserted += 1;
|
|||
|
|
}
|
|||
|
|
Err(e) => {
|
|||
|
|
eprintln!("Failed to import record: {}", e);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
Ok(inserted)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// Парсит строковое значение поля в соответствующий тип JSON
|
|||
|
|
fn parse_field_value(field: &str) -> Value {
|
|||
|
|
if field.is_empty() {
|
|||
|
|
return Value::Null;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Пытаемся распарсить как целое число
|
|||
|
|
if let Ok(int_val) = field.parse::<i64>() {
|
|||
|
|
return Value::Number(int_val.into());
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Пытаемся распарсить как число с плавающей точкой
|
|||
|
|
if let Ok(float_val) = field.parse::<f64>() {
|
|||
|
|
if let Some(num) = serde_json::Number::from_f64(float_val) {
|
|||
|
|
return Value::Number(num);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Проверяем булевые значения
|
|||
|
|
match field.to_lowercase().as_str() {
|
|||
|
|
"true" => return Value::Bool(true),
|
|||
|
|
"false" => return Value::Bool(false),
|
|||
|
|
_ => {}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// По умолчанию возвращаем как строку
|
|||
|
|
Value::String(field.to_string())
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub fn export_csv(&self, collection_name: &str, file_path: &str) -> Result<usize> {
|
|||
|
|
println!("Exporting collection '{}' to CSV file '{}'", collection_name, file_path);
|
|||
|
|
|
|||
|
|
// Создаем файл для записи
|
|||
|
|
let file = File::create(file_path)
|
|||
|
|
.map_err(|e| crate::common::FutriixError::IoError(e))?;
|
|||
|
|
|
|||
|
|
let mut writer = Writer::from_writer(BufWriter::new(file));
|
|||
|
|
|
|||
|
|
// Запрашиваем все документы из коллекции
|
|||
|
|
let command = crate::common::protocol::Command::Query {
|
|||
|
|
collection: collection_name.to_string(),
|
|||
|
|
filter: vec![],
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
let response = self.database.execute_command(command)?;
|
|||
|
|
|
|||
|
|
let documents = match response {
|
|||
|
|
crate::common::protocol::Response::Success(data) => {
|
|||
|
|
serde_json::from_slice::<Vec<Value>>(&data)
|
|||
|
|
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?
|
|||
|
|
}
|
|||
|
|
crate::common::protocol::Response::Error(e) => {
|
|||
|
|
return Err(crate::common::FutriixError::DatabaseError(e));
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
if documents.is_empty() {
|
|||
|
|
println!("Collection '{}' is empty", collection_name);
|
|||
|
|
return Ok(0);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Собираем все уникальные заголовки из документов
|
|||
|
|
let mut all_headers = std::collections::HashSet::new();
|
|||
|
|
for document in &documents {
|
|||
|
|
if let Value::Object(obj) = document {
|
|||
|
|
for key in obj.keys() {
|
|||
|
|
all_headers.insert(key.clone());
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
let mut headers: Vec<String> = all_headers.into_iter().collect();
|
|||
|
|
headers.sort();
|
|||
|
|
|
|||
|
|
// Записываем заголовки
|
|||
|
|
writer.write_record(&headers)
|
|||
|
|
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?;
|
|||
|
|
|
|||
|
|
let mut record_count = 0;
|
|||
|
|
|
|||
|
|
// Экспортируем документы
|
|||
|
|
for document in documents {
|
|||
|
|
if let Value::Object(obj) = document {
|
|||
|
|
let mut record = Vec::new();
|
|||
|
|
|
|||
|
|
for header in &headers {
|
|||
|
|
let value = obj.get(header).unwrap_or(&Value::Null);
|
|||
|
|
let value_str = Self::value_to_string(value);
|
|||
|
|
record.push(value_str);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
writer.write_record(&record)
|
|||
|
|
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?;
|
|||
|
|
record_count += 1;
|
|||
|
|
|
|||
|
|
if record_count % 100 == 0 {
|
|||
|
|
println!("Exported {} records...", record_count);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
writer.flush()
|
|||
|
|
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?;
|
|||
|
|
|
|||
|
|
println!("Successfully exported {} records to '{}'", record_count, file_path);
|
|||
|
|
Ok(record_count)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// Конвертирует JSON значение в строку для CSV
|
|||
|
|
fn value_to_string(value: &Value) -> String {
|
|||
|
|
match value {
|
|||
|
|
Value::String(s) => s.clone(),
|
|||
|
|
Value::Number(n) => n.to_string(),
|
|||
|
|
Value::Bool(b) => b.to_string(),
|
|||
|
|
Value::Null => "".to_string(),
|
|||
|
|
Value::Array(arr) => {
|
|||
|
|
let items: Vec<String> = arr.iter().map(Self::value_to_string).collect();
|
|||
|
|
format!("[{}]", items.join(","))
|
|||
|
|
}
|
|||
|
|
Value::Object(_) => {
|
|||
|
|
// Для объектов используем JSON строку
|
|||
|
|
value.to_string()
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub fn get_import_progress(&self, collection_name: &str) -> f64 {
|
|||
|
|
self.import_progress.get(collection_name)
|
|||
|
|
.unwrap_or(0.0)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub fn list_csv_files(&self) -> Result<Vec<String>> {
|
|||
|
|
let csv_dir = &self.config.import_dir;
|
|||
|
|
let mut csv_files = Vec::new();
|
|||
|
|
|
|||
|
|
if let Ok(entries) = std::fs::read_dir(csv_dir) {
|
|||
|
|
for entry in entries.flatten() {
|
|||
|
|
let path = entry.path();
|
|||
|
|
if path.is_file() {
|
|||
|
|
if let Some(extension) = path.extension() {
|
|||
|
|
if extension == "csv" || extension == "CSV" {
|
|||
|
|
if let Some(file_name) = path.file_name() {
|
|||
|
|
csv_files.push(file_name.to_string_lossy().to_string());
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
Ok(csv_files)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub fn get_import_file_path(&self, file_name: &str) -> String {
|
|||
|
|
Path::new(&self.config.import_dir)
|
|||
|
|
.join(file_name)
|
|||
|
|
.to_string_lossy()
|
|||
|
|
.to_string()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub fn get_export_file_path(&self, file_name: &str) -> String {
|
|||
|
|
Path::new(&self.config.export_dir)
|
|||
|
|
.join(file_name)
|
|||
|
|
.to_string_lossy()
|
|||
|
|
.to_string()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub fn file_exists(&self, file_path: &str) -> bool {
|
|||
|
|
Path::new(file_path).exists()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pub fn active_imports_count(&self) -> usize {
|
|||
|
|
self.import_progress.active_imports()
|
|||
|
|
}
|
|||
|
|
}
|