keep filesystem too

This commit is contained in:
blek 2023-10-12 21:11:09 +10:00
parent ea7d285b7a
commit 655fe8a1d5
Signed by: blek
GPG Key ID: 14546221E3595D0C
2 changed files with 113 additions and 24 deletions

View File

@ -5,6 +5,7 @@ use std::{error::Error, path::Path};
use crate::{state::State, file::File}; use crate::{state::State, file::File};
// check the files recorded in database
async fn check_key(key: String, mut client: redis::Client) -> bool { async fn check_key(key: String, mut client: redis::Client) -> bool {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
@ -41,6 +42,62 @@ async fn check_key(key: String, mut client: redis::Client) -> bool {
false false
} }
pub async fn check_file(file: String, keys: Vec<String>, prefix: String) -> bool {
if ! keys.iter().find(|x| x.chars().skip(prefix.len() + 4 + 2).collect::<String>() == file).is_none() {
#[cfg(debug_assertions)] {
log::debug!("File {file} is marked for deletion because it exists in the filesystem, but is not in the database");
}
let _ = tokio::fs::remove_file(file).await;
return true
}
false
}
// check that all files in filesystem exist in the database
pub async fn fskeep(state: State) -> Result<(), Box<dyn Error>> {
let mut redis = state.redis.clone();
let keys: Vec<String> = redis.keys(format!("{}*", state.env.redis.prefix))?;
let objects = keys.len();
let mut files_s = tokio::fs::read_dir(state.env.usercont_dir).await?;
let mut files: Vec<String> = vec![];
while let Some(f) = files_s.next_entry().await? {
files.push(
f.path().into_os_string().into_string()
.map_err(|x| format!("Couldnt parse non-utf8 encoded path: {:?}", x))?
);
}
#[cfg(debug_assertions)]
log::debug!("Got {} objects", objects);
let mut set: JoinSet<bool> = JoinSet::new();
for file in files {
set.spawn(check_file(file, keys.clone(), state.env.redis.prefix.clone()));
}
#[cfg(debug_assertions)]
let mut del_count = 0_u32;
while let Some(_deleted) = set.join_next().await {
#[cfg(debug_assertions)] {
if _deleted.is_ok() {
if _deleted.unwrap() {
del_count += 1;
}
}
}
}
#[cfg(debug_assertions)]
log::debug!("Deleted {} stray files", del_count);
Ok(())
}
pub async fn clean(state: State) -> Result<(), Box<dyn Error>> { pub async fn clean(state: State) -> Result<(), Box<dyn Error>> {
let mut redis = state.redis.clone(); let mut redis = state.redis.clone();
@ -56,7 +113,7 @@ pub async fn clean(state: State) -> Result<(), Box<dyn Error>> {
} }
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
let mut del_count: u32 = 0; let mut del_count = 0_u32;
while let Some(_deleted) = set.join_next().await { while let Some(_deleted) = set.join_next().await {
@ -70,8 +127,12 @@ pub async fn clean(state: State) -> Result<(), Box<dyn Error>> {
} }
#[cfg(debug_assertions)] #[cfg(debug_assertions)] {
log::debug!("Deleted {} objects", del_count); log::debug!("Deleted {} objects", del_count);
log::debug!("Finished checking the DB, checking the filesystem...");
}
fskeep(state).await?;
Ok(()) Ok(())
} }

View File

@ -1,3 +1,5 @@
use state::State;
mod clean; mod clean;
mod state; mod state;
@ -9,6 +11,39 @@ pub fn redis_conn(env: env::Env) -> Result<redis::Client, redis::RedisError> {
redis::Client::open(format!("redis://:{}@{}:{}/", env.redis.pass, env.redis.host, env.redis.port)) redis::Client::open(format!("redis://:{}@{}:{}/", env.redis.pass, env.redis.host, env.redis.port))
} }
#[derive(Debug, Clone)]
enum CleanResult {
Skip,
#[allow(dead_code)]
Break,
Ok
}
async fn clean(env: env::Env, state: State) -> CleanResult {
#[cfg(debug_assertions)]
log::debug!("Initiating clean process");
let envy = env.clone();
let res = clean::clean(state.clone()).await;
if res.is_err() {
log::error!("Error while cleaning: {}", res.unwrap_err());
log::error!("Retrying in {}", std::env::var("CLEAN_ERRDEL").unwrap());
log::debug!("Next clean will run at {}", chrono::Local::now() + env.clean_errdel);
tokio::time::sleep(envy.clean_errdel).await;
return CleanResult::Skip;
}
#[cfg(debug_assertions)] {
log::debug!("Cleaned successfully");
log::debug!("Next clean is scheduled in {}", std::env::var("CLEAN_DEL").unwrap());
log::debug!("Next clean will run at {}", chrono::Local::now() + env.clean_del);
}
tokio::time::sleep(envy.clean_errdel).await;
CleanResult::Ok
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
#[cfg(debug_assertions)] { #[cfg(debug_assertions)] {
@ -21,32 +56,25 @@ async fn main() {
dotenvy::dotenv().unwrap(); dotenvy::dotenv().unwrap();
let env = crate::env::Env::load().unwrap(); let env = crate::env::Env::load().unwrap();
let statee = crate::state::State { let state = crate::state::State {
redis: redis_conn(env.clone()).unwrap(), redis: redis_conn(env.clone()).unwrap(),
env: env.clone() env: env.clone()
}; };
log::info!("Initalizing initial clean");
let cl = clean(env.clone(), state.clone()).await;
log::info!("Initial clean exited with status {:?}", cl);
loop { loop {
let res = clean(env.clone(), state.clone()).await;
#[cfg(debug_assertions)] match res {
log::debug!("Initiating clean process"); CleanResult::Break => {
break
let envy = env.clone(); },
let res = clean::clean(statee.clone()).await; _ => {}
if res.is_err() {
log::error!("Error while cleaning: {}", res.unwrap_err());
log::error!("Retrying in {}", std::env::var("CLEAN_ERRDEL").unwrap());
log::debug!("Next clean will run at {}", chrono::Local::now() + env.clean_errdel);
tokio::time::sleep(envy.clean_errdel).await;
continue;
} }
#[cfg(debug_assertions)] {
log::debug!("Cleaned successfully");
log::debug!("Next clean is scheduled in {}", std::env::var("CLEAN_DEL").unwrap());
log::debug!("Next clean will run at {}", chrono::Local::now() + env.clean_del);
}
tokio::time::sleep(envy.clean_errdel).await;
} }
log::info!("Main loop broke");
} }