def do_get(url, method) -> str: import requests resp = requests.get(f"{url}{method}") if resp.status_code != 200: raise ValueError(f"unexpected http error {resp.status_code}") return resp.text def do_post(url, method, data) -> str: import requests resp = requests.post(f"{url}{method}", data=data) if resp.status_code != 200: raise ValueError(f"unexpected http error {resp.status_code}") return resp.text def format_manager_directory(mgr_name) -> str: import os dir = os.path.join(config["reports_dir"], mgr_name) return dir def make_manager_directory(mgr_name): import os os.makedirs(format_manager_directory(mgr_name), exist_ok=True) def get_arrays_diff(old, cur) -> str: old_set = set(old) not_changed = [] new = [] for item in cur: if item in old_set: old_set.remove(item) not_changed.append(item) else: new.append(item) return { "not_changed": not_changed, "new": new, "removed": list(old_set), } def manager_write_state(mgr, method, new_state): import os from datetime import datetime dir = format_manager_directory(mgr) file = os.path.join(dir, f"{method}.state") if not os.path.exists(file): os.close(os.open(file, os.O_CREAT)) with open(os.path.join(dir, f"{method}.diff"), "w") as f: f.write("time,not_changed,new,removed\n") old_state = [] with open(file, "r") as f: old_state = [line.strip() for line in f.readlines()] diff = get_arrays_diff(old_state, new_state) with open(file, "w") as f: f.write("\n".join(new_state)) file = os.path.join(dir, f"{method}.diff") time = datetime.now().isoformat() with open(file, "a") as f: f.write( f"{time},{len(diff['not_changed'])},{len(diff['new'])},{len(diff['removed'])}\n" ) file = f"{file}.detail" with open(file, "a") as f: f.write("Time: {}\n\n".format(time)) for line in diff["new"]: f.write(f"+ {line}\n") for line in diff["removed"]: f.write(f"- {line}\n") f.write("===\n") def parse_metrics(mgr, method, raw_main_page) -> str: import os from bs4 import BeautifulSoup from datetime import datetime metrics = {"time": datetime.now().isoformat()} soup = BeautifulSoup(raw_main_page, "html.parser") body = soup.find("body") metrics = body.find_all("table", class_=["list_table"])[0:1] if len(metrics) == 0: return "" metrics = metrics[0] header_row = [] data_row = [] for row in metrics.find_all("tr"): is_header = True for elem in row.find_all("td"): if is_header: header_row.append(elem.text.strip()) is_header = False else: data_row.append(elem.text.strip()) dir = format_manager_directory(mgr) file = os.path.join(dir, f"{method}.state") if not os.path.exists(file): header = ",".join(header_row) else: with open(file, "r") as f: header = "".join(f.readlines()) data = ",".join(data_row) return f"{header}\n{data}" def parse_crashes(mgr, raw_main_page) -> str: from bs4 import BeautifulSoup soup = BeautifulSoup(raw_main_page, "html.parser") body = soup.find("body") crashes = body.find_all("table", class_=["list_table"])[1:2] if len(crashes) == 0: return "" crashes = crashes[0] result = ["title,stat,time,repro"] for row in crashes.find_all("tr"): row_list = [] for elem in row.find_all("td"): row_list.append(elem.text.strip()) if len(row_list) == 0: continue result.append(",".join(row_list)) return "\n".join(result) def check_and_setup_expert_mode(manager): from bs4 import BeautifulSoup raw_main_page = do_get(manager["http_url"], "/") soup = BeautifulSoup(raw_main_page, "html.parser") form = soup.find("form") buttons_clicked = form.find_all("button", {"class": "action_button_selected"})[0:1] if len(buttons_clicked) == 1 and buttons_clicked[0]["value"] == "expert": return do_post(manager["http_url"], "/action", {"toggle": "expert", "url": "/"}) print(f"enabled expert mode for {manager['name']}") async def collect_manager_information(manager, endpoint): from datetime import datetime print(f"{datetime.now()}: collecting info from {manager['name']}, {endpoint['name']}") try: data = do_get(manager["http_url"], endpoint["http_uri"]) except Exception as e: print( f"Failed to get information from manager {manager['name']} using endpoint {endpoint['name']}" ) print(e) return try: if endpoint.get("metrics", False): data = parse_metrics(manager["name"], endpoint["name"], data) elif endpoint.get("crashes", False): data = parse_crashes(manager["name"], data) state = data.split("\n") except: print( f"Failed to parse information from manager {manager['name']}, endpoint: {endpoint['name']}" ) return try: manager_write_state(manager["name"], endpoint["name"], state) except: print( f"Failed to log manager {manager['name']} state, endpoint: {endpoint['name']}" ) async def schedule_information_gathering(): import asyncio for manager in config["managers"]: if manager["need_expert_mode"]: check_and_setup_expert_mode(manager) for endpoint in config["endpoints"]: asyncio.create_task(collect_manager_information(manager, endpoint)) async def main(): import asyncio for manager in config["managers"]: make_manager_directory(manager["name"]) while True: await asyncio.gather( schedule_information_gathering(), asyncio.sleep(config["timeout"]), ) if __name__ == "__main__": import json import asyncio import signal with open("./config.json", "r") as f: config = json.load(f) aio_loop = asyncio.get_event_loop() main_task = asyncio.ensure_future(main()) for sig in [signal.SIGINT, signal.SIGTERM]: aio_loop.add_signal_handler(sig, main_task.cancel) try: aio_loop.run_until_complete(main_task) finally: aio_loop.close()