Files
arcade/bot.py
lionel 24992cfb86
Some checks failed
Flask CI/CD Pipeline / format-and-auto-fix (push) Successful in 17s
Flask CI/CD Pipeline / test-and-verify (push) Successful in 32s
Flask CI/CD Pipeline / build-scan-and-push-image (push) Failing after 2m22s
Flask CI/CD Pipeline / deploy-updated-container (push) Has been skipped
Update bot.py
2026-04-16 17:08:02 +00:00

165 lines
5.3 KiB
Python

import os
import time
import subprocess
import requests
from dotenv import load_dotenv
load_dotenv()
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
ALLOWED_USER_ID = int(os.getenv("TELEGRAM_ALLOWED_USER_ID"))
NAMESPACE = os.getenv("NAMESPACE", "devsecops")
DEPLOYMENT = os.getenv("DEPLOYMENT", "arcade")
CONTAINER = os.getenv("CONTAINER", "arcade")
IMAGE_REPO = os.getenv("IMAGE_REPO", "git.onlionel.com/gitea/arcade")
if not BOT_TOKEN:
raise RuntimeError("TELEGRAM_BOT_TOKEN ontbreekt")
if not ALLOWED_USER_ID:
raise RuntimeError("TELEGRAM_ALLOWED_USER_ID ontbreekt")
BASE_URL = f"https://api.telegram.org/bot{BOT_TOKEN}"
offset = None
def send_message(chat_id: int, text: str) -> None:
requests.post(
f"{BASE_URL}/sendMessage",
data={"chat_id": chat_id, "text": text[:4000]},
timeout=20,
)
def run_cmd(cmd: str, timeout: int = 120) -> str:
result = subprocess.run(
["bash", "-lc", cmd],
capture_output=True,
text=True,
timeout=timeout,
)
output = ((result.stdout or "") + "\n" + (result.stderr or "")).strip()
if result.returncode != 0:
raise RuntimeError(output or f"Commando faalde: {cmd}")
return output
def k(cmd: str, timeout: int = 120) -> str:
return run_cmd(f"kubectl -n {NAMESPACE} {cmd}", timeout=timeout)
def handle_command(chat_id: int, user_id: int, text: str) -> None:
parts = text.strip().split()
command = parts[0].split("@")[0] if parts else ""
if user_id != ALLOWED_USER_ID:
send_message(chat_id, "Niet toegestaan.")
return
if command == "/start":
send_message(
chat_id,
"Bot actief.\nBeschikbare commando's:\n"
"/status\n"
"/current\n"
"/history\n"
"/rollback\n"
"/deploy_sha <sha>"
)
return
if command == "/status":
try:
rollout = k(f"rollout status deployment/{DEPLOYMENT} --timeout=60s", timeout=70)
pods = k(f"get pods -l app={DEPLOYMENT} -o wide", timeout=30)
send_message(chat_id, f"Rollout status:\n{rollout}\n\nPods:\n{pods}")
except Exception as e:
send_message(chat_id, f"Status ophalen mislukt:\n{str(e)}")
return
if command == "/current":
try:
image = k(
f"get deployment {DEPLOYMENT} "
f"-o jsonpath='{{.spec.template.spec.containers[0].image}}'",
timeout=30,
)
send_message(chat_id, f"Huidige image:\n{image}")
except Exception as e:
send_message(chat_id, f"Huidige image ophalen mislukt:\n{str(e)}")
return
if command == "/history":
try:
history = k(f"rollout history deployment/{DEPLOYMENT}", timeout=30)
send_message(chat_id, f"Rollout history:\n{history}")
except Exception as e:
send_message(chat_id, f"History ophalen mislukt:\n{str(e)}")
return
if command == "/rollback":
send_message(chat_id, "Rollback gestart...")
try:
undo = k(f"rollout undo deployment/{DEPLOYMENT}", timeout=60)
status = k(f"rollout status deployment/{DEPLOYMENT} --timeout=180s", timeout=190)
image = k(
f"get deployment {DEPLOYMENT} "
f"-o jsonpath='{{.spec.template.spec.containers[0].image}}'",
timeout=30,
)
send_message(chat_id, f"Rollback voltooid.\n\n{undo}\n\n{status}\n\nNieuwe image:\n{image}")
except Exception as e:
send_message(chat_id, f"Rollback mislukt:\n{str(e)}")
return
if command == "/deploy_sha":
if len(parts) < 2:
send_message(chat_id, "Gebruik: /deploy_sha <sha>")
return
sha = parts[1].strip()
if not sha or "/" in sha or ":" in sha:
send_message(chat_id, "Ongeldige sha/tag.")
return
image = f"{IMAGE_REPO}:{sha}"
send_message(chat_id, f"Deploy gestart naar:\n{image}")
try:
set_image = k(
f"set image deployment/{DEPLOYMENT} "
f"{CONTAINER}={image}",
timeout=60,
)
status = k(f"rollout status deployment/{DEPLOYMENT} --timeout=180s", timeout=190)
send_message(chat_id, f"Deploy voltooid.\n\n{set_image}\n\n{status}")
except Exception as e:
send_message(chat_id, f"Deploy mislukt:\n{str(e)}")
return
send_message(chat_id, "Onbekend commando. Gebruik /status, /current, /history, /rollback of /deploy_sha <sha>.")
while True:
try:
params = {"timeout": 30}
if offset is not None:
params["offset"] = offset
response = requests.get(f"{BASE_URL}/getUpdates", params=params, timeout=40)
response.raise_for_status()
updates = response.json().get("result", [])
for update in updates:
offset = update["update_id"] + 1
message = update.get("message", {})
msg_text = message.get("text", "").strip()
chat_id = message.get("chat", {}).get("id")
user_id = message.get("from", {}).get("id")
if msg_text and chat_id and user_id:
handle_command(chat_id, user_id, msg_text)
except Exception as e:
print(f"Loop error: {e}")
time.sleep(5)