diff --git a/janitor/src/clean.rs b/janitor/src/clean.rs index 89f9bae..8e6716b 100644 --- a/janitor/src/clean.rs +++ b/janitor/src/clean.rs @@ -5,6 +5,7 @@ use std::{error::Error, path::Path}; use crate::{state::State, file::File}; +// check the files recorded in database async fn check_key(key: String, mut client: redis::Client) -> bool { #[cfg(debug_assertions)] @@ -41,6 +42,62 @@ async fn check_key(key: String, mut client: redis::Client) -> bool { false } +pub async fn check_file(file: String, keys: Vec, prefix: String) -> bool { + if ! keys.iter().find(|x| x.chars().skip(prefix.len() + 4 + 2).collect::() == file).is_none() { + #[cfg(debug_assertions)] { + log::debug!("File {file} is marked for deletion because it exists in the filesystem, but is not in the database"); + } + let _ = tokio::fs::remove_file(file).await; + return true + } + + + false +} + +// check that all files in filesystem exist in the database +pub async fn fskeep(state: State) -> Result<(), Box> { + let mut redis = state.redis.clone(); + let keys: Vec = redis.keys(format!("{}*", state.env.redis.prefix))?; + let objects = keys.len(); + + let mut files_s = tokio::fs::read_dir(state.env.usercont_dir).await?; + let mut files: Vec = vec![]; + while let Some(f) = files_s.next_entry().await? { + files.push( + f.path().into_os_string().into_string() + .map_err(|x| format!("Couldnt parse non-utf8 encoded path: {:?}", x))? + ); + } + + #[cfg(debug_assertions)] + log::debug!("Got {} objects", objects); + + let mut set: JoinSet = JoinSet::new(); + + for file in files { + set.spawn(check_file(file, keys.clone(), state.env.redis.prefix.clone())); + } + + #[cfg(debug_assertions)] + let mut del_count = 0_u32; + + while let Some(_deleted) = set.join_next().await { + #[cfg(debug_assertions)] { + if _deleted.is_ok() { + if _deleted.unwrap() { + del_count += 1; + } + } + } + } + + #[cfg(debug_assertions)] + log::debug!("Deleted {} stray files", del_count); + + Ok(()) +} + pub async fn clean(state: State) -> Result<(), Box> { let mut redis = state.redis.clone(); @@ -56,7 +113,7 @@ pub async fn clean(state: State) -> Result<(), Box> { } #[cfg(debug_assertions)] - let mut del_count: u32 = 0; + let mut del_count = 0_u32; while let Some(_deleted) = set.join_next().await { @@ -70,8 +127,12 @@ pub async fn clean(state: State) -> Result<(), Box> { } - #[cfg(debug_assertions)] - log::debug!("Deleted {} objects", del_count); + #[cfg(debug_assertions)] { + log::debug!("Deleted {} objects", del_count); + log::debug!("Finished checking the DB, checking the filesystem..."); + } + + fskeep(state).await?; Ok(()) } \ No newline at end of file diff --git a/janitor/src/main.rs b/janitor/src/main.rs index debf2fb..db8d37d 100644 --- a/janitor/src/main.rs +++ b/janitor/src/main.rs @@ -1,3 +1,5 @@ +use state::State; + mod clean; mod state; @@ -9,6 +11,39 @@ pub fn redis_conn(env: env::Env) -> Result { redis::Client::open(format!("redis://:{}@{}:{}/", env.redis.pass, env.redis.host, env.redis.port)) } +#[derive(Debug, Clone)] +enum CleanResult { + Skip, + #[allow(dead_code)] + Break, + Ok +} + +async fn clean(env: env::Env, state: State) -> CleanResult { + #[cfg(debug_assertions)] + log::debug!("Initiating clean process"); + + let envy = env.clone(); + let res = clean::clean(state.clone()).await; + if res.is_err() { + log::error!("Error while cleaning: {}", res.unwrap_err()); + log::error!("Retrying in {}", std::env::var("CLEAN_ERRDEL").unwrap()); + log::debug!("Next clean will run at {}", chrono::Local::now() + env.clean_errdel); + tokio::time::sleep(envy.clean_errdel).await; + return CleanResult::Skip; + } + + #[cfg(debug_assertions)] { + log::debug!("Cleaned successfully"); + log::debug!("Next clean is scheduled in {}", std::env::var("CLEAN_DEL").unwrap()); + log::debug!("Next clean will run at {}", chrono::Local::now() + env.clean_del); + } + + tokio::time::sleep(envy.clean_errdel).await; + + CleanResult::Ok +} + #[tokio::main] async fn main() { #[cfg(debug_assertions)] { @@ -21,32 +56,25 @@ async fn main() { dotenvy::dotenv().unwrap(); let env = crate::env::Env::load().unwrap(); - let statee = crate::state::State { + let state = crate::state::State { redis: redis_conn(env.clone()).unwrap(), env: env.clone() }; + log::info!("Initalizing initial clean"); + let cl = clean(env.clone(), state.clone()).await; + log::info!("Initial clean exited with status {:?}", cl); + loop { - - #[cfg(debug_assertions)] - log::debug!("Initiating clean process"); - - let envy = env.clone(); - let res = clean::clean(statee.clone()).await; - if res.is_err() { - log::error!("Error while cleaning: {}", res.unwrap_err()); - log::error!("Retrying in {}", std::env::var("CLEAN_ERRDEL").unwrap()); - log::debug!("Next clean will run at {}", chrono::Local::now() + env.clean_errdel); - tokio::time::sleep(envy.clean_errdel).await; - continue; + let res = clean(env.clone(), state.clone()).await; + + match res { + CleanResult::Break => { + break + }, + _ => {} } - - #[cfg(debug_assertions)] { - log::debug!("Cleaned successfully"); - log::debug!("Next clean is scheduled in {}", std::env::var("CLEAN_DEL").unwrap()); - log::debug!("Next clean will run at {}", chrono::Local::now() + env.clean_del); - } - - tokio::time::sleep(envy.clean_errdel).await; } + + log::info!("Main loop broke"); }