aboutsummaryrefslogtreecommitdiff
path: root/rwa/support/sessionservice/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'rwa/support/sessionservice/service.py')
-rwxr-xr-xrwa/support/sessionservice/service.py54
1 files changed, 28 insertions, 26 deletions
diff --git a/rwa/support/sessionservice/service.py b/rwa/support/sessionservice/service.py
index ae634c8..c50da26 100755
--- a/rwa/support/sessionservice/service.py
+++ b/rwa/support/sessionservice/service.py
@@ -28,12 +28,10 @@
import argparse
import json
import logging
-import os
import signal
-import tempfile
import time
from threading import Thread
-from typing import Union, Any
+from typing import Union
import dbus
import dbus.mainloop.glib
@@ -58,9 +56,7 @@ class RWASupportSessionService(dbus.service.Object):
:param mockup_mode: Starts the service in mock up mode
"""
- def __init__(
- self, loop: GLib.MainLoop, mockup_mode: bool = False, one_time: bool = False
- ):
+ def __init__(self, loop: GLib.MainLoop, mockup_mode: bool = False, one_time: bool = False):
self.loop = loop
self.mockup_mode = mockup_mode
self.one_time = one_time
@@ -102,14 +98,15 @@ class RWASupportSessionService(dbus.service.Object):
"""
if ALLOW_ONLY_ONE_SESSION and len(self.sessions.values()) > 0:
logging.warning(
- "There is already one session running and the service is configured to allow only one "
+ "There is already one session running and the service "
+ "is configured to allow only one "
"session, so this session won't be started."
)
return json.dumps({"status": "error", "type": "multiple"})
# Start session
try:
- session = Session(self.trigger_service.port, mockup_mode)
+ session = Session(self.trigger_service.port, self.mockup_mode)
# Add session to sessions list
self.sessions[session.pid] = session
@@ -120,9 +117,7 @@ class RWASupportSessionService(dbus.service.Object):
return_json = session.client_meta
return_json["status"] = "success"
- logging.info(
- f"New session #{session.pid} was started with meta {return_json}."
- )
+ logging.info(f"New session #{session.pid} was started with meta {return_json}.")
return json.dumps(return_json)
except ConnectionError:
@@ -130,7 +125,9 @@ class RWASupportSessionService(dbus.service.Object):
return json.dumps({"status": "error", "type": "connection"})
- @dbus.service.method("org.ArcticaProject.RWASupportSessionService", in_signature="i", out_signature="s")
+ @dbus.service.method(
+ "org.ArcticaProject.RWASupportSessionService", in_signature="i", out_signature="s"
+ )
def status(self, pid: int) -> str:
"""Return the status of a session.
@@ -158,14 +155,17 @@ class RWASupportSessionService(dbus.service.Object):
"""
return self._get_status(pid)
- @dbus.service.method("org.ArcticaProject.RWASupportSessionService", in_signature="i", out_signature="s")
+ @dbus.service.method(
+ "org.ArcticaProject.RWASupportSessionService", in_signature="i", out_signature="s"
+ )
def refresh_status(self, pid: int) -> str:
- """Same as :meth:`status`, but updates status from RWA.WebApp before returning it here.
- """
+ """Update status from WebApp before returning it here like :meth:`status`."""
self._update_session(pid)
return self._get_status(pid)
- @dbus.service.method("org.ArcticaProject.RWASupportSessionService", in_signature="i", out_signature="s")
+ @dbus.service.method(
+ "org.ArcticaProject.RWASupportSessionService", in_signature="i", out_signature="s"
+ )
def stop(self, pid: int) -> str:
"""Stop a remote session.
@@ -200,7 +200,6 @@ class RWASupportSessionService(dbus.service.Object):
def _update_session(self, pid: int):
"""Update the status of a session."""
-
try:
session = self.sessions[pid]
except KeyError:
@@ -238,7 +237,7 @@ class RWASupportSessionService(dbus.service.Object):
if self.one_time:
self._stop_all()
- def _trigger(self, session_id: int, data: dict, method: str = "trigger") -> Union[dict, bool]:
+ def _trigger(self, session_id: int, data: dict, method: str = "trigger") -> Union[dict, bool]:
"""Trigger a specific session via trigger token."""
logging.info(f"Triggered with session ID {session_id} and {data}")
@@ -271,7 +270,10 @@ class RWASupportSessionService(dbus.service.Object):
def str2bool(v: Union[str, bool, int]) -> bool:
- """Return true or false if the given string can be interpreted as a boolean otherwise raise an exception."""
+ """Return true or false if the given string can be interpreted as a boolean.
+
+ If it fails, raise an exception.
+ """
if isinstance(v, bool):
return v
if v.lower() in ("yes", "true", "t", "y", "1", 1):
@@ -281,6 +283,7 @@ def str2bool(v: Union[str, bool, int]) -> bool:
else:
raise argparse.ArgumentTypeError("Boolean value expected.")
+
def main():
# Check for lock file
if is_locked():
@@ -298,7 +301,8 @@ def main():
nargs="?",
const=True,
default=False,
- help="Activates mock up mode. Acts like the real session service but don't do changes or call RWA.",
+ help="Activates mock up mode. Acts like the real session service "
+ "but don't do changes or call RWA.",
)
parser.add_argument(
"-o",
@@ -315,18 +319,16 @@ def main():
one_time = args.one_time
if mockup_mode:
- logging.warning(
- "All API responses are faked and should NOT BE USED IN PRODUCTION!"
- )
+ logging.warning("All API responses are faked and should NOT BE USED IN PRODUCTION!")
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
loop = GLib.MainLoop()
- object = RWASupportSessionService(loop, mockup_mode, one_time)
+ service_object = RWASupportSessionService(loop, mockup_mode, one_time)
def signal_handler(sig, frame):
logging.info("Service was terminated.")
- object._stop_all()
+ service_object._stop_all()
signal.signal(signal.SIGINT, signal_handler)
@@ -337,4 +339,4 @@ def main():
if __name__ == "__main__":
- main() \ No newline at end of file
+ main()