import os import time import subprocess import requests from dotenv import load_dotenv load_dotenv() BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") ALLOWED_USER_ID = int(os.getenv("TELEGRAM_ALLOWED_USER_ID")) NAMESPACE = os.getenv("NAMESPACE", "devsecops") DEPLOYMENT = os.getenv("DEPLOYMENT", "arcade") CONTAINER = os.getenv("CONTAINER", "arcade") IMAGE_REPO = os.getenv("IMAGE_REPO", "git.onlionel.com/gitea/arcade") if not BOT_TOKEN: raise RuntimeError("TELEGRAM_BOT_TOKEN ontbreekt") if not ALLOWED_USER_ID: raise RuntimeError("TELEGRAM_ALLOWED_USER_ID ontbreekt") BASE_URL = f"https://api.telegram.org/bot{BOT_TOKEN}" offset = None def send_message(chat_id: int, text: str) -> None: requests.post( f"{BASE_URL}/sendMessage", data={"chat_id": chat_id, "text": text[:4000]}, timeout=20, ) def run_cmd(cmd: str, timeout: int = 120) -> str: result = subprocess.run( ["bash", "-lc", cmd], capture_output=True, text=True, timeout=timeout, ) output = ((result.stdout or "") + "\n" + (result.stderr or "")).strip() if result.returncode != 0: raise RuntimeError(output or f"Commando faalde: {cmd}") return output def k(cmd: str, timeout: int = 120) -> str: return run_cmd(f"kubectl -n {NAMESPACE} {cmd}", timeout=timeout) def handle_command(chat_id: int, user_id: int, text: str) -> None: parts = text.strip().split() command = parts[0].split("@")[0] if parts else "" if user_id != ALLOWED_USER_ID: send_message(chat_id, "Niet toegestaan.") return if command == "/start": send_message( chat_id, "Bot actief.\nBeschikbare commando's:\n" "/status\n" "/current\n" "/history\n" "/rollback\n" "/deploy_sha " ) return if command == "/status": try: rollout = k(f"rollout status deployment/{DEPLOYMENT} --timeout=60s", timeout=70) pods = k(f"get pods -l app={DEPLOYMENT} -o wide", timeout=30) send_message(chat_id, f"Rollout status:\n{rollout}\n\nPods:\n{pods}") except Exception as e: send_message(chat_id, f"Status ophalen mislukt:\n{str(e)}") return if command == "/current": try: image = k( f"get deployment {DEPLOYMENT} " f"-o jsonpath='{{.spec.template.spec.containers[0].image}}'", timeout=30, ) send_message(chat_id, f"Huidige image:\n{image}") except Exception as e: send_message(chat_id, f"Huidige image ophalen mislukt:\n{str(e)}") return if command == "/history": try: history = k(f"rollout history deployment/{DEPLOYMENT}", timeout=30) send_message(chat_id, f"Rollout history:\n{history}") except Exception as e: send_message(chat_id, f"History ophalen mislukt:\n{str(e)}") return if command == "/rollback": send_message(chat_id, "Rollback gestart...") try: undo = k(f"rollout undo deployment/{DEPLOYMENT}", timeout=60) status = k(f"rollout status deployment/{DEPLOYMENT} --timeout=180s", timeout=190) image = k( f"get deployment {DEPLOYMENT} " f"-o jsonpath='{{.spec.template.spec.containers[0].image}}'", timeout=30, ) send_message(chat_id, f"Rollback voltooid.\n\n{undo}\n\n{status}\n\nNieuwe image:\n{image}") except Exception as e: send_message(chat_id, f"Rollback mislukt:\n{str(e)}") return if command == "/deploy_sha": if len(parts) < 2: send_message(chat_id, "Gebruik: /deploy_sha ") return sha = parts[1].strip() if not sha or "/" in sha or ":" in sha: send_message(chat_id, "Ongeldige sha/tag.") return image = f"{IMAGE_REPO}:{sha}" send_message(chat_id, f"Deploy gestart naar:\n{image}") try: set_image = k( f"set image deployment/{DEPLOYMENT} " f"{CONTAINER}={image}", timeout=60, ) status = k(f"rollout status deployment/{DEPLOYMENT} --timeout=180s", timeout=190) send_message(chat_id, f"Deploy voltooid.\n\n{set_image}\n\n{status}") except Exception as e: send_message(chat_id, f"Deploy mislukt:\n{str(e)}") return send_message(chat_id, "Onbekend commando. Gebruik /status, /current, /history, /rollback of /deploy_sha .") while True: try: params = {"timeout": 30} if offset is not None: params["offset"] = offset response = requests.get(f"{BASE_URL}/getUpdates", params=params, timeout=40) response.raise_for_status() updates = response.json().get("result", []) for update in updates: offset = update["update_id"] + 1 message = update.get("message", {}) msg_text = message.get("text", "").strip() chat_id = message.get("chat", {}).get("id") user_id = message.get("from", {}).get("id") if msg_text and chat_id and user_id: handle_command(chat_id, user_id, msg_text) except Exception as e: print(f"Loop error: {e}") time.sleep(5)