import os import secrets import signal import psutil import requests from vnc import run_vnc, save_password API_SERVER = "http://127.0.0.1:8000" REGISTER_URL = API_SERVER + "/app/rwa/api/register/" class Session: def __init__(self): self._generate_password() self._start_vnc() self._register_session() @property def pid(self) -> int: return self.vnc_pid @property def port(self) -> int: return self.ws_port def _generate_password(self): """Generate password for x11vnc and save it.""" self.password = secrets.token_urlsafe(20) self.pw_filename = save_password(self.password) def _start_vnc(self): """Start x11vnc server.""" process_info = run_vnc(self.pw_filename) self.vnc_pid = process_info["vnc"]["pid"] self.vnc_port = process_info["vnc"]["port"] self.ws_pid = process_info["ws"]["pid"] self.ws_port = process_info["ws"]["port"] def _register_session(self): """Register session in RWA.""" r = requests.post( REGISTER_URL, json={ "port": self.ws_port, "password": self.password, "pid": self.vnc_pid, }, ) print(r) self.meta = r.json() self.session_id = self.meta["session_id"] self.web_url = self.meta["url"] self.api_token = self.meta["token"] self.pin = self.meta["pin"] def stop(self): """Stop session and clean up.""" # Kill VNC if self.vnc_pid in psutil.pids(): print("Kill VNC.") os.kill(self.vnc_pid, signal.SIGTERM) # Kill websockify if self.ws_pid in psutil.pids(): print("Kill websockify.") os.kill(self.ws_pid, signal.SIGTERM) # Delete PW file if os.path.exists(self.pw_filename): print("Delete password file") os.remove(self.pw_filename) # Delete self del self @property def vnc_process_running(self): """Check if the VNC process is still running.""" if self.vnc_pid in psutil.pids(): p = psutil.Process(self.vnc_pid) if p.status() == "zombie": return False return True return False @property def client_meta(self): return {"id": self.pid, "url": self.web_url, "pin": self.pin}