safety_master_plugin/scripts/master_plugin.py

110 lines
4.0 KiB
Python
Raw Permalink Normal View History

2021-08-13 14:38:22 +02:00
from ros_master_wrapper import ROSWrapper
from qt_master_wrapper import QtWrapper
from master_restrictions import Restrictions
from master_info import MasterInfo
from master_widget import MasterWidget
from qt_gui.plugin import Plugin
from enums.stop_state import StopState as SS
from master_callback_handler import CallbackHandler
class MasterPlugin(Plugin):
def __init__(self,context):
super(MasterPlugin, self).__init__(context)
self.setObjectName('Master Plugin - L1.5 safety system')
self._widget = MasterWidget(context)
self._ros_wrapper = ROSWrapper()
self._qt_wrapper = QtWrapper(self._widget)
self._master_info = MasterInfo()
self.cbhandler = CallbackHandler()
self.cbhandler.connect_robot_info_signal(self._qt_wrapper.update_robot_info)
self.cbhandler.connect_add_robot_signal(self._qt_wrapper.add_robot)
self.cbhandler.connect_remove_robot_signal(self._qt_wrapper.remove_robot)
# self._qt_wrapper.connect_signals()
# self._qt_wrapper.set_sliders_to_initial_values()
# Setup functions to get _master_info states from _ros_wrapper
self._ros_wrapper.setup_get_master_stop_state_function(self._master_info.get_master_stop_state)
self._ros_wrapper.setup_get_restrictions_function(self._master_info.get_restrictions)
self.set_callback_functions()
self._qt_wrapper.connect_signals()
# Stopping master at the beginning
self.master_stopped()
def set_callback_functions(self):
self.set_ROS_callback_functions()
self.set_Qt_callback_functions()
def set_ROS_callback_functions(self):
self._ros_wrapper.set_robots_list_update_callback(self.handle_robots_list_update)
self._ros_wrapper.set_robot_info_update_callback(self.handle_robot_info_update)
def set_Qt_callback_functions(self):
self._qt_wrapper.set_master_stop_state_updated_callback(self.handle_master_stop_state_updated)
self._qt_wrapper.set_restrictions_updated_callback(self.handle_restrictions_updated)
def handle_robots_list_update(self,robots_id_list):
self.update_robots_list(robots_id_list)
def handle_robot_info_update(self,robot_info):
self.update_robot_info(robot_info)
def handle_master_stop_state_updated(self):
if self._master_info.master_stop_state == SS.RUNNING:
self.master_stopped()
elif self._master_info.master_stop_state == SS.STOPPED:
self.master_started()
else:
raise ValueError('master_stop_state value undefined')
def handle_restrictions_updated(self,restrictions):
self.restrictions_changed(restrictions)
def update_robots_list(self,robots_id_list):
for robot_id in robots_id_list:
if robot_id not in self._master_info.robots_id_list:
self.new_robot_registered(robot_id)
for robot_id in self._master_info.robots_id_list:
if robot_id not in robots_id_list:
self.robot_unregistered(robot_id)
self._master_info.robots_id_list = robots_id_list
def new_robot_registered(self,robot_id):
self._ros_wrapper.add_robot_subscriber(robot_id)
self.cbhandler.add_robot_signal.emit(robot_id)
def robot_unregistered(self,robot_id):
self._ros_wrapper.remove_robot_subscriber(robot_id)
self.cbhandler.remove_robot_signal.emit(robot_id)
def update_robot_info(self,robot_info):
self.cbhandler.robot_info_signal.emit(robot_info)
def master_started(self):
self._master_info.master_stop_state = SS.RUNNING
self._ros_wrapper.master_started()
self._qt_wrapper.master_started()
def master_stopped(self):
self._master_info.master_stop_state = SS.STOPPED
self._ros_wrapper.master_stopped()
self._qt_wrapper.master_stopped()
def restrictions_changed(self,restrictions):
self._master_info.restrictions = restrictions
self._ros_wrapper.restrictions_changed(restrictions)