Compare commits

...

10 Commits

6 changed files with 1282 additions and 185 deletions

View File

@ -1,5 +1,5 @@
TG_KEY=
GITEA_KEY=
GITEA_URL=https://git.blek.codes
CHAT_ID=
POLL_TIME=2000
WH_SECRET=
WH_URL=0.0.0.0:80

1151
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -6,10 +6,13 @@ edition = "2021"
[dependencies]
dotenvy_macro = "0.15.7"
femme = "2.2.1"
lazy_static = "1.4.0"
log = "0.4.19"
reqwest = "0.11.18"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.104"
teloxide = "0.12.2"
teloxide-core = "0.9.1"
tide = "0.16.0"
tokio = { version = "1.29.1", features = [ "full" ] }
async-std = { version = "1", features = ["attributes", "tokio1"] }

View File

@ -1,108 +1,19 @@
use crate::{GITEA_KEY, gitea::{User, Repo, Pull}};
macro_rules! gitea_api {
($path: expr) => {
format!("{}{}?token={}", dotenvy_macro::dotenv!("GITEA_URL", "Gitea url is not set!"), $path, GITEA_KEY)
};
}
pub fn get_reviewers_pings_inner(author_login: String) -> Vec<String> {
let mut reviewers: Vec<String> = vec![
"@bleki42".into(),
"@x3paerz".into(),
"@balistiktw".into()
];
pub async fn get_user() -> User {
let user = reqwest::get(gitea_api!("/api/v1/user")).await.unwrap();
if user.status() != 200 {
panic!("Request failed with code {}", user.status());
}
let user = user.bytes().await.unwrap();
let user = String::from_utf8(user.to_vec()).unwrap();
serde_json::from_str::<User>(user.as_str()).unwrap()
}
pub async fn get_org_repos(org: String) -> Vec<Repo> {
let repos = reqwest::get(gitea_api!(format!("/api/v1/orgs/{org}/repos"))).await.unwrap();
if repos.status() != 200 {
panic!("Request failed with code {}", repos.status());
}
let repos = repos.bytes().await.unwrap();
let repos = String::from_utf8(repos.to_vec()).unwrap();
serde_json::from_str::<Vec<Repo>>(repos.as_str()).unwrap()
}
pub async fn get_repo_pulls(org: String, repo: String) -> Vec<Pull> {
let pulls = reqwest::get(gitea_api!(format!("/api/v1/repos/{org}/{repo}/pulls"))).await.unwrap();
if pulls.status() != 200 {
panic!("Request failed with code {}", pulls.status());
}
let pulls = pulls.bytes().await.unwrap();
let pulls = String::from_utf8(pulls.to_vec()).unwrap();
let parsed = serde_json::from_str(pulls.as_str());
if parsed.is_err() {
println!("{pulls}");
}
parsed.unwrap()
}
pub async fn get_all_prs(repos: Vec<Repo>) -> Vec<Pull> {
let mut out: Vec<Pull> = vec![];
let futures: Vec<_> =
repos.iter()
.map(|x| get_repo_pulls(x.owner.login.clone(), x.name.clone()))
.collect();
let map: Vec<String> = vec!["blek".into(), "balistik".into(), "xepaerz".into()];
let mut set = tokio::task::JoinSet::new();
for future in futures {
set.spawn(future);
if ! map.contains(&author_login) {
return reviewers
}
while let Some(res) = set.join_next().await {
let res = res.unwrap();
for pull in res.iter() {
out.push(pull.clone());
}
}
reviewers.remove(map.iter().position(|x| **x == author_login).unwrap());
out
}
/**
The first one is the never seen before PRs and the 2nd is just the data
*/
pub async fn get_new_prs(old: Vec<Pull>, repos: Vec<Repo>) -> ( Vec<Pull>, Vec<Pull> ) {
log::debug!("Getting all PRs now");
let data = get_all_prs(repos.clone()).await;
let mut new_pulls: Vec<Pull> = vec![];
for pull in data.iter() {
let mut alr_exists = false;
for old_pull in old.iter() {
if pull.id == old_pull.id {
alr_exists = true
}
}
if ! alr_exists {
new_pulls.push(pull.clone());
}
}
(new_pulls, data)
}
pub fn filter_repos(repos: Vec<Repo>) -> Vec<Repo> {
repos.iter()
.filter(|x| (!x.archived) && x.has_pull_requests)
.cloned()
.collect()
reviewers
}

View File

@ -36,8 +36,18 @@ pub struct Pull {
pub body: String,
pub state: String,
pub mergeable: bool,
pub merge_base: String,
pub merged: bool,
pub base: Branch,
pub head: Branch,
pub merge_base: String,
pub user: User
pub user: User,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PullWh {
pub action: String,
pub number: i64,
pub sender: User,
pub repository: Repo,
pub pull_request: Pull
}

View File

@ -1,20 +1,139 @@
#![forbid(unsafe_code)]
use std::{path::Path, thread, time::Duration};
use log;
use teloxide_core::{prelude::*, types::Recipient};
use teloxide_core::{prelude::*, types::{Recipient, Chat}};
use teloxide::Bot;
use tide::{Server, Request, Response};
use tokio::fs;
mod gitea;
mod api;
const POLL_TIME: &str = dotenvy_macro::dotenv!("POLL_TIME", "Poll time is not set!");
const CHAT_ID: &str = dotenvy_macro::dotenv!("CHAT_ID", "Chat ID is not set!");
const TG_KEY: &str = dotenvy_macro::dotenv!("TG_KEY", "Telegram key is not set!");
const GITEA_KEY: &str = dotenvy_macro::dotenv!("GITEA_KEY", "Gitea key is not set!");
const GITEA_URL: &str = dotenvy_macro::dotenv!("GITEA_URL", "Gitea url is not set!");
const WH_SECRET: &str = dotenvy_macro::dotenv!("WH_SECRET", "Webhook secret is not set!");
const WH_LISTEN_URL: &str = dotenvy_macro::dotenv!("WH_URL", "Webhook listen URL is not set!");
#[derive(Debug, Clone)]
struct SharedState {
bot: Bot,
chat: Chat
}
async fn start_tg() -> (Bot, Chat) {
log::info!("Starting up telegram bot...");
let bot = Bot::new(TG_KEY);
let me = bot.get_me().await.unwrap();
log::info!("Logged in to telegram as \"{}\"", me.first_name);
let chat = bot.get_chat(Recipient::Id(ChatId(CHAT_ID.parse::<i64>().unwrap()))).await.unwrap();
log::info!("Using chat {}", {
if chat.is_group() {
chat.title().unwrap()
} else {
chat.first_name().unwrap()
}
});
(bot, chat)
}
async fn start_http(state: SharedState) -> Server<SharedState> {
let mut app = tide::with_state(state);
app.at("/pull_wh").post(webhook);
app
}
async fn webhook(mut req: Request<SharedState>) -> tide::Result {
let body = req.body_string().await.unwrap();
let pr = serde_json::from_str::<gitea::PullWh>(body.as_str());
let event_type = req.header("X-Gitea-Event-Type");
if event_type.is_none() {
return Ok(
Response::builder(400)
.body("{\"error\":\"no event type (X-Gitea-Event-Type)\"}")
.content_type("application/json")
.build()
)
}
let event_type = event_type.unwrap().get(0).unwrap().to_string();
if event_type != "pull_request" {
return Ok(
Response::builder(200)
.body("{\"status\":\"ignoring non-pr event\"}")
.content_type("application/json")
.build()
)
}
if pr.is_err() {
return Ok(format!("Bad serialization: {}", pr.unwrap_err().to_string()).into());
}
let pr = pr.unwrap();
let secret = req.header("Authorization");
if secret.is_none() {
return Ok(
Response::builder(401)
// { error: "bad auth" }
.body("{\"error\":\"bad auth\"}")
.content_type("application/json")
.build()
)
}
let secret = secret.unwrap().get(0).unwrap().to_string();
if secret != "Bearer ".to_string() + WH_SECRET {
println!("{} != {}", secret, "Bearer ".to_string() + WH_SECRET);
return Ok(
Response::builder(401)
.body("{\"error\":\"bad auth\"}")
.content_type("application/json")
.build()
)
}
if ! fs::try_exists(".pr-cache").await.unwrap() {
fs::write(".pr-cache", "[]").await.unwrap();
}
let cache = fs::read_to_string(".pr-cache").await.unwrap();
let cache = serde_json::from_str::<Vec<u64>>(&cache).unwrap();
if cache.iter().find(|x| **x == pr.pull_request.id as u64).is_some() {
return Ok(
Response::builder(200)
.body("{\"status\":\"ignoring known PR\"}")
.content_type("application/json")
.build()
)
} else {
let mut cache = cache;
cache.push(pr.pull_request.id as u64);
fs::write(".pr-cache", serde_json::to_string(&cache).unwrap()).await.unwrap();
}
let state = req.state().clone();
state.bot.send_message(state.chat.id, format!("New PR\n{} ({}#{}) by {}\n{}", pr.pull_request.title, pr.pull_request.head.repo.name, pr.pull_request.number, pr.pull_request.user.login, pr.pull_request.url)).await.unwrap();
state.bot.send_message(
state.chat.id,
format!("апрувните пж{}", api::get_reviewers_pings_inner(pr.sender.login).iter().map(|x| " ".to_string() + x).collect::<Vec<String>>().concat())
).await.unwrap();
Ok(
Response::builder(200)
.body("{ \"status\": \"sent\" }")
.content_type("application/json")
.build()
)
}
#[tokio::main]
async fn main() {
@ -27,54 +146,11 @@ async fn main() {
log::info!("Running in production");
}
log::info!("Gitea URL: {}", GITEA_URL);
let (bot, chat) = start_tg().await;
let http = start_http(SharedState { bot, chat }).await;
let user = api::get_user().await;
log::info!("Logged into gitea as user \"{}\"", user.full_name);
log::info!("Listening for webhooks on {}", WH_LISTEN_URL);
let repos = api::get_org_repos("Tochka".into()).await;
log::info!("Found {} repositories", repos.len());
let repos = api::filter_repos(repos);
log::info!("Only {} repositories are useful", repos.len());
if !Path::new(".pr-cache").exists() {
let prs = api::get_all_prs(repos.clone()).await;
let data = serde_json::to_string_pretty(&prs).unwrap();
fs::write(".pr-cache", data).await.unwrap();
}
let mut prs = serde_json::from_str::<Vec<gitea::Pull>>(String::from_utf8(fs::read(".pr-cache").await.unwrap()).unwrap().as_str()).unwrap();
log::info!("Got {} PRs", prs.len());
log::info!("Starting up telegram bot...");
let bot = Bot::new(TG_KEY);
let me = bot.get_me().await.unwrap();
log::info!("Logged in to telegram as \"{}\"", me.first_name);
let chat = bot.get_chat(Recipient::Id(ChatId(CHAT_ID.parse::<i64>().unwrap()))).await.unwrap();
log::info!("Using chat {}", chat.title().unwrap());
let time = POLL_TIME.parse::<u64>().unwrap();
log::info!("Starting polling SCM for updates ({})", time);
loop {
let (new_prs, new_data) = api::get_new_prs(prs.clone(), repos.clone()).await;
log::debug!("Found {} new PRs", new_prs.len());
thread::sleep(Duration::from_millis(time));
if new_prs.len() != 0 {
prs = new_data;
let data = serde_json::to_string_pretty(&prs).unwrap();
fs::write(".pr-cache", data).await.unwrap();
for pr in new_prs.iter() {
log::info!("Sending message about {} ({})", pr.title, pr.url);
bot.send_message(chat.id, format!("New PR\n{} ({}#{}) by {}\n{}", pr.title, pr.head.repo.name, pr.number, pr.user.login, pr.url)).await.unwrap();
bot.send_message(chat.id, "апрувните пж @bleki42 @balistiktw @x3paerz").await.unwrap();
}
}
}
http.listen(WH_LISTEN_URL.clone()).await.unwrap();
}