safety_master_plugin/safety_master_plugin/master_plugin.py
Jakub Delicat 1d7d2c1269 added node destruction
Signed-off-by: Jakub Delicat <jakub.delicat@pwr.edu.pl>
2023-09-07 16:23:58 +02:00

128 lines
4.7 KiB
Python
Executable File

from qt_gui.plugin import Plugin
from python_qt_binding.QtCore import QTimer
from safety_master_plugin.master_restrictions import Restrictions
from safety_master_plugin.master_info import MasterInfo
from safety_master_plugin.master_widget import MasterWidget
from safety_master_plugin.stop_state import StopState as SS
from safety_master_plugin.master_callback_handler import CallbackHandler
from safety_master_plugin.ros_master_wrapper import ROSWrapper
from safety_master_plugin.qt_master_wrapper import QtWrapper
class MasterPlugin(Plugin):
def __init__(self, context):
super(MasterPlugin, self).__init__(context)
self.setObjectName("Master Plugin - L1.5 safety system")
self._widget = MasterWidget(context)
self._ros_wrapper = ROSWrapper()
self._qt_wrapper = QtWrapper(self._widget)
self.set_Qt_callback_functions()
self._master_info = MasterInfo()
self.cbhandler = CallbackHandler()
self.cbhandler.connect_robot_info_signal(self._qt_wrapper.update_robot_info)
self.cbhandler.connect_add_robot_signal(self._qt_wrapper.add_robot)
self.cbhandler.connect_remove_robot_signal(self._qt_wrapper.remove_robot)
self._qt_wrapper.connect_signals()
self._qt_wrapper.set_sliders_to_initial_values()
# Setup functions to get _master_info states from _ros_wrapper
self._ros_wrapper.setup_get_master_stop_state_function(
self._master_info.get_master_stop_state
)
self._ros_wrapper.setup_get_restrictions_function(
self._master_info.get_restrictions
)
self.set_callback_functions()
# timer to consecutively send twist messages
self._update_parameter_timer = QTimer(self)
self._update_parameter_timer.timeout.connect(
self._ros_wrapper.qt_timer_callback
)
self._update_parameter_timer.start(100)
# Stopping master at the beginning
self.disable_robots()
def set_callback_functions(self):
self.set_ROS_callback_functions()
self.set_Qt_callback_functions()
def set_ROS_callback_functions(self):
self._ros_wrapper.set_robots_list_update_callback(
self.handle_robots_list_update
)
self._ros_wrapper.set_robot_info_update_callback(self.handle_robot_info_update)
def set_Qt_callback_functions(self):
self._qt_wrapper.set_master_stop_state_updated_callback(
self.handle_master_stop_state_updated
)
self._qt_wrapper.set_restrictions_updated_callback(
self.handle_restrictions_updated
)
def handle_robots_list_update(self, robots_id_list):
self.update_robots_list(robots_id_list)
def handle_robot_info_update(self, robot_info):
self.update_robot_info(robot_info)
def handle_master_stop_state_updated(self):
"""Handles master stop state when it changes e. g. on button click and confirmed."""
if self._master_info.master_stop_state == SS.STOPPED:
self.enable_robots()
elif self._master_info.master_stop_state == SS.RUNNING:
self.disable_robots()
else:
raise ValueError("master_stop_state value undefined")
def handle_restrictions_updated(self, restrictions):
self.restrictions_changed(restrictions)
def update_robots_list(self, robots_id_list):
for robot_id in robots_id_list:
if robot_id not in self._master_info.robots_id_list:
self.new_robot_registered(robot_id)
for robot_id in self._master_info.robots_id_list:
if robot_id not in robots_id_list:
self.robot_unregistered(robot_id)
self._master_info.robots_id_list = robots_id_list
def new_robot_registered(self, robot_id):
self._ros_wrapper.add_robot_subscriber(robot_id)
self.cbhandler.add_robot_signal.emit(robot_id)
def robot_unregistered(self, robot_id):
self._ros_wrapper.remove_robot_subscriber(robot_id)
self.cbhandler.remove_robot_signal.emit(robot_id)
def update_robot_info(self, robot_info):
self.cbhandler.robot_info_signal.emit(robot_info)
def enable_robots(self):
self._master_info.master_stop_state = SS.RUNNING
self._ros_wrapper.when_robots_enabled()
self._qt_wrapper.when_robots_enabled()
def disable_robots(self):
self._master_info.master_stop_state = SS.STOPPED
self._ros_wrapper.when_robots_disabled()
self._qt_wrapper.when_robots_disabled()
def restrictions_changed(self, restrictions):
self._master_info.restrictions = restrictions
self._ros_wrapper.restrictions_changed(restrictions)
def shutdown_plugin(self):
self._ros_wrapper.unregister_node()