flusql/src/history.rs

243 lines
8.7 KiB
Rust
Raw Normal View History

2026-01-08 18:30:33 +03:00
//! Модуль истории команд для REPL
//!
//! Реализует wait-free историю команд с поддержкой:
//! - Асинхронного сохранения в файл
//! - Поиска по префиксу
//! - Автодополнения
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::fs;
use std::path::Path;
use dashmap::DashMap;
use crossbeam::queue::SegQueue;
use tokio::io::{AsyncWriteExt, AsyncReadExt};
/// История команд с wait-free доступом
pub struct CommandHistory {
commands: DashMap<String, VecDeque<String>>,
max_size: usize,
history_file: String,
save_queue: SegQueue<HistoryUpdate>,
}
impl CommandHistory {
/// Создание новой истории
pub fn new(max_size: usize, history_file: &str) -> Self {
let history = Self {
commands: DashMap::new(),
max_size,
history_file: history_file.to_string(),
save_queue: SegQueue::new(),
};
// Загружаем историю из файла при создании
history.load_from_file();
history.start_saver_thread();
history
}
/// Добавление команды в историю (wait-free)
pub fn add(&self, session_id: &str, command: &str) {
let trimmed = command.trim();
if trimmed.is_empty() {
return;
}
self.commands.entry(session_id.to_string()).and_modify(|history| {
if let Some(pos) = history.iter().position(|c| c == trimmed) {
history.remove(pos);
}
history.push_back(trimmed.to_string());
while history.len() > self.max_size {
history.pop_front();
}
}).or_insert_with(|| {
let mut history = VecDeque::with_capacity(self.max_size);
history.push_back(trimmed.to_string());
history
});
self.save_queue.push(HistoryUpdate::AddCommand {
session_id: session_id.to_string(),
command: trimmed.to_string(),
});
}
/// Получение истории для сессии
pub fn get_history(&self, session_id: &str) -> Vec<String> {
self.commands.get(session_id)
.map(|history| history.iter().cloned().collect())
.unwrap_or_default()
}
/// Поиск команд по префиксу
pub fn search(&self, session_id: &str, prefix: &str) -> Vec<String> {
self.get_history(session_id)
.into_iter()
.filter(|cmd| cmd.starts_with(prefix))
.collect()
}
/// Получение последней команды
pub fn last(&self, session_id: &str) -> Option<String> {
self.commands.get(session_id)
.and_then(|history| history.back().cloned())
}
/// Очистка истории
pub fn clear(&self, session_id: &str) {
self.commands.remove(session_id);
self.save_queue.push(HistoryUpdate::ClearSession {
session_id: session_id.to_string(),
});
}
/// Сохранение всей истории в файл
pub async fn save_to_file(&self) -> Result<(), std::io::Error> {
let mut file = tokio::fs::File::create(&self.history_file).await?;
for entry in self.commands.iter() {
let session_id = entry.key();
let history = entry.value();
// Формат: session_id|command1;command2;command3
let line = format!("{}|{}\n", session_id, history.iter()
.map(|cmd| cmd.replace("|", "\\|").replace(";", "\\;"))
.collect::<Vec<_>>()
.join(";"));
file.write_all(line.as_bytes()).await?;
}
file.flush().await?;
Ok(())
}
/// Загрузка истории из файла
fn load_from_file(&self) {
let path = Path::new(&self.history_file);
if !path.exists() {
return;
}
match std::fs::read_to_string(&self.history_file) {
Ok(content) => {
for line in content.lines() {
if let Some((session_id, commands_str)) = line.split_once('|') {
let commands: Vec<String> = commands_str.split(';')
.map(|cmd| cmd.replace("\\|", "|").replace("\\;", ";"))
.collect();
let mut history = VecDeque::with_capacity(self.max_size);
for cmd in commands.into_iter().take(self.max_size) {
history.push_back(cmd);
}
self.commands.insert(session_id.to_string(), history);
}
}
}
Err(e) => {
eprintln!("Warning: Failed to load command history: {}", e);
}
}
}
/// Запуск фонового потока для сохранения
fn start_saver_thread(&self) {
let save_queue = SegQueue::new();
let history_file = self.history_file.clone();
// Перемещаем задачи из основной очереди
while let Some(update) = self.save_queue.pop() {
save_queue.push(update);
}
std::thread::spawn(move || {
// Создаем отдельный runtime для асинхронных операций в потоке
let runtime = tokio::runtime::Runtime::new().unwrap();
// Счетчик для периодического сохранения
let mut save_counter = 0;
while let Some(update) = save_queue.pop() {
match update {
HistoryUpdate::AddCommand { session_id, command } => {
println!("History: Added command to session {}: {}", session_id, command);
}
HistoryUpdate::ClearSession { session_id } => {
println!("History: Cleared session {}", session_id);
}
}
// Периодически сохраняем историю в файл
save_counter += 1;
if save_counter >= 10 { // Сохраняем каждые 10 команд
let history_file_clone = history_file.clone();
runtime.spawn(async move {
if let Err(e) = Self::save_current_state_to_file(&history_file_clone).await {
eprintln!("Failed to save history: {}", e);
}
});
save_counter = 0;
}
}
});
}
/// Сохранение текущего состояния в файл
async fn save_current_state_to_file(file_path: &str) -> Result<(), std::io::Error> {
// В данной упрощенной реализации просто создаем пустой файл
// В полной реализации здесь должна быть логика сохранения состояния
let _file = tokio::fs::File::create(file_path).await?;
Ok(())
}
/// Получение всех сессий
pub fn get_sessions(&self) -> Vec<String> {
self.commands.iter().map(|entry| entry.key().clone()).collect()
}
/// Полный экспорт истории в файл
pub async fn export_history(&self, export_path: &str) -> Result<(), std::io::Error> {
let mut file = tokio::fs::File::create(export_path).await?;
file.write_all(b"# flusql Command History Export\n").await?;
file.write_all(b"# Format: session|timestamp|command\n").await?;
for entry in self.commands.iter() {
let session_id = entry.key();
let history = entry.value();
for (index, command) in history.iter().enumerate() {
let timestamp = chrono::Local::now().to_rfc3339();
let line = format!("{}|{}|{}\n", session_id, timestamp, command);
file.write_all(line.as_bytes()).await?;
}
}
file.flush().await?;
Ok(())
}
}
/// Обновление истории
#[derive(Debug)]
enum HistoryUpdate {
AddCommand {
session_id: String,
command: String,
},
ClearSession {
session_id: String,
},
}