From 303dee1af031273954087097cdc703cbe554cfa0 Mon Sep 17 00:00:00 2001 From: Grigory Bazilevich Date: Thu, 6 Mar 2025 23:07:54 +0300 Subject: feat: async task execution + improved metrics handling --- main.py | 157 ++++++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 113 insertions(+), 44 deletions(-) (limited to 'main.py') diff --git a/main.py b/main.py index 41decc6..2f0af0c 100644 --- a/main.py +++ b/main.py @@ -8,6 +8,16 @@ def do_get(url, method) -> str: return resp.text +def do_post(url, method, data) -> str: + import requests + + resp = requests.post(f"{url}{method}", data=data) + if resp.status_code != 200: + raise ValueError(f"unexpected http error {resp.status_code}") + + return resp.text + + def format_manager_directory(mgr_name) -> str: import os @@ -80,30 +90,45 @@ def manager_write_state(mgr, method, new_state): f.write("===\n") -def parse_metrics(mgr, method, raw_metrics) -> str: +def parse_metrics(mgr, method, raw_main_page) -> str: import os + from bs4 import BeautifulSoup from datetime import datetime metrics = {"time": datetime.now().isoformat()} - for line in raw_metrics.strip().split("\n"): - if "#" in line: - continue - name, value = line.split() - metrics[name] = value + soup = BeautifulSoup(raw_main_page, "html.parser") + body = soup.find("body") + + metrics = body.find_all("table", class_=["list_table"])[0:1] + if len(metrics) == 0: + return "" + metrics = metrics[0] + + header_row = [] + data_row = [] + for row in metrics.find_all("tr"): + + is_header = True + for elem in row.find_all("td"): + if is_header: + header_row.append(elem.text.strip()) + is_header = False + else: + data_row.append(elem.text.strip()) dir = format_manager_directory(mgr) file = os.path.join(dir, f"{method}.state") if not os.path.exists(file): - header = ",".join(metrics.keys()) + header = ",".join(header_row) else: with open(file, "r") as f: header = "".join(f.readlines()) - data_row = ",".join(metrics.values()) + data = ",".join(data_row) - return f"{header}\n{data_row}" + return f"{header}\n{data}" def parse_crashes(mgr, raw_main_page) -> str: @@ -132,51 +157,95 @@ def parse_crashes(mgr, raw_main_page) -> str: return "\n".join(result) -def main(): - import time +def check_and_setup_expert_mode(manager): + from bs4 import BeautifulSoup + + raw_main_page = do_get(manager["http_url"], "/") + soup = BeautifulSoup(raw_main_page, "html.parser") + form = soup.find("form") + + buttons_clicked = form.find_all("button", {"class": "action_button_selected"})[0:1] + if len(buttons_clicked) == 1 and buttons_clicked[0]["value"] == "expert": + return + + do_post(manager["http_url"], "/action", {"toggle": "expert", "url": "/"}) + print(f"enabled expert mode for {manager['name']}") + + +async def collect_manager_information(manager, endpoint): + from datetime import datetime + + print(f"{datetime.now()}: collecting info from {manager['name']}, {endpoint['name']}") + + try: + data = do_get(manager["http_url"], endpoint["http_uri"]) + except Exception as e: + print( + f"Failed to get information from manager {manager['name']} using endpoint {endpoint['name']}" + ) + print(e) + return + + try: + if endpoint.get("metrics", False): + data = parse_metrics(manager["name"], endpoint["name"], data) + elif endpoint.get("crashes", False): + data = parse_crashes(manager["name"], data) + state = data.split("\n") + except: + print( + f"Failed to parse information from manager {manager['name']}, endpoint: {endpoint['name']}" + ) + return + + try: + manager_write_state(manager["name"], endpoint["name"], state) + except: + print( + f"Failed to log manager {manager['name']} state, endpoint: {endpoint['name']}" + ) + + +async def schedule_information_gathering(): + import asyncio + + for manager in config["managers"]: + if manager["need_expert_mode"]: + check_and_setup_expert_mode(manager) + + for endpoint in config["endpoints"]: + asyncio.create_task(collect_manager_information(manager, endpoint)) + + +async def main(): + import asyncio for manager in config["managers"]: make_manager_directory(manager["name"]) while True: - for manager in config["managers"]: - for endpoint in config["endpoints"]: - try: - data = do_get(manager["http_url"], endpoint["http_uri"]) - except Exception as e: - print( - f"Failed to get information from manager {manager['name']} using endpoint {endpoint['name']}" - ) - print(e) - continue - - try: - if endpoint.get("metrics", False): - data = parse_metrics(manager["name"], endpoint["name"], data) - elif endpoint.get("crashes", False): - data = parse_crashes(manager["name"], data) - state = data.split("\n") - except: - print( - f"Failed to parse information from manager {manager['name']}, endpoint: {endpoint['name']}" - ) - continue - - try: - manager_write_state(manager["name"], endpoint["name"], state) - except: - print( - f"Failed to log manager {manager['name']} state, endpoint: {endpoint['name']}" - ) - continue - - time.sleep(config["timeout"]) + await asyncio.gather( + schedule_information_gathering(), + asyncio.sleep(config["timeout"]), + ) if __name__ == "__main__": import json + import asyncio + import signal + with open("./config.json", "r") as f: config = json.load(f) - main() + aio_loop = asyncio.get_event_loop() + main_task = asyncio.ensure_future(main()) + + for sig in [signal.SIGINT, signal.SIGTERM]: + aio_loop.add_signal_handler(sig, main_task.cancel) + + try: + aio_loop.run_until_complete(main_task) + finally: + aio_loop.close() -- cgit mrf-deployment