#!/usr/bin/env python import os from qt_gui.plugin import Plugin from python_qt_binding.QtCore import QObject from python_qt_binding.QtCore import pyqtSignal from qt_wrapper import QtWrapper from ros_wrapper import ROSWrapper from user_widget import UserWidget from user_info import UserInfo from enums.clutch_state import ClutchState as CS from enums.stop_state import StopState as SS class CallbackHandler(QObject): signal = pyqtSignal(str) signal_with_list_argument = pyqtSignal(str,list) def __init__(self): super(CallbackHandler, self).__init__() def connect(self,slot): self.signal.connect(slot) def list_connect(self,slot): self.signal_with_list_argument.connect(slot) class UserPlugin(Plugin): def __init__(self,context): super(UserPlugin, self).__init__(context) self.setObjectName('User Plugin - L1.5 safety system') # setStyleSheet("background-color:black;") self.cbhandler = CallbackHandler() self._widget = UserWidget(context) self._qt_wrapper = QtWrapper(self._widget) self._ros_wrapper = ROSWrapper() self._user_info = UserInfo() self._user_info.user_stop_state = SS.STOPPED self._user_info.master_stop_state = SS.UNKNOWN self._user_info.clutch_state = CS.UNKNOWN # Setup functions to get _user_info states from _ros_wrapper self._ros_wrapper.setup_get_user_stop_state_function(self._user_info.get_user_stop_state) self._ros_wrapper.setup_get_clutch_state_function(self._user_info.get_clutch_state) # self._ros_wrapper.setup_node() self.set_callback_functions() # At the end! self._qt_wrapper.connect_signals() self._ros_wrapper.set_robots_list_update_callback(self.handle_robots_list_update) self.cbhandler.signal.connect(self._qt_wrapper.handle_emitted_signal) self.cbhandler.signal_with_list_argument.connect(self._qt_wrapper.handle_emitted_signal_with_list_argument) def call_qt_wrapper_method(self,method_name): self.cbhandler.signal.emit(method_name) def call_qt_wrapper_method_with_argument(self,method_name,argument): self.cbhandler.signal_with_list_argument.emit(method_name,argument) def shutdown_plugin(self): self._ros_wrapper.unregister_node() def set_callback_functions(self): self.set_Qt_callback_functions() self.set_ROS_callback_functions() def set_ROS_callback_functions(self): self._ros_wrapper.set_selected_robot_info_update_callback(self.handle_selected_robot_info_update) self._ros_wrapper.set_master_stop_update_callback(self.handle_master_stop_update) self._ros_wrapper.set_restrictions_update_callback(self.handle_restrictions_update) self._ros_wrapper.set_robot_connection_lost_callback(self.handle_robot_connection_lost) self._ros_wrapper.set_master_connection_lost_callback(self.handle_master_connection_lost) def set_Qt_callback_functions(self): self._qt_wrapper.set_robot_selection_callback(self.handle_robot_selection) self._qt_wrapper.set_robot_release_callback(self.handle_robot_release) self._qt_wrapper.set_user_stop_state_updated_callback(self.handle_user_stop_state_updated) self._qt_wrapper.set_clutch_switched_callback(self.handle_clutch_switched) # ROS callback functions def handle_robots_list_update(self,robots_id_list): self._user_info.update_robots_id_list(robots_id_list) self._qt_wrapper.update_robots_list_gui(robots_id_list) def handle_selected_robot_info_update(self,robot_info): if robot_info != None: self.update_selected_robot_info(robot_info) else: raise RuntimeError('Updated robot_info is NoneType object') def handle_master_stop_update(self,master_stop_state): if (master_stop_state != self._user_info.master_stop_state): self._user_info.master_stop_state = master_stop_state if master_stop_state == SS.STOPPED: self.master_stopped() elif master_stop_state == SS.RUNNING: self.master_started() else: raise ValueError('Undefined master_stop_state received') def handle_restrictions_update(self,restrictions): self._qt_wrapper.update_restrictions_gui(restrictions) # Qt callback functions def handle_robot_selection(self,robot_id): if self._user_info.selected_robot == None: if self._ros_wrapper.robots_hz_value[robot_id] == None: self.select_robot(robot_id) else: self._qt_wrapper.robot_selected_by_another_group_notify() else: raise RuntimeError('User already selected robot') def handle_robot_release(self): if self._user_info.selected_robot != None: self.release_robot() else: raise RuntimeError('Cannot release. No robot selected') def handle_user_stop_state_updated(self): if self._user_info.selected_robot.obstacle_detected == True: self._qt_wrapper.obstacle_is_detected_notify() else: if self._user_info.master_stop_state == SS.STOPPED: self._qt_wrapper.master_is_stopped_notify() elif self._user_info.master_stop_state == SS.RUNNING: if self._user_info.user_stop_state == SS.RUNNING: self.user_stopped() elif self._user_info.user_stop_state == SS.STOPPED: self.user_started() else: raise ValueError('user_stop_state value undefined') else: raise ValueError('master_stop_state value undefined') def handle_clutch_switched(self): if self._user_info.clutch_state == CS.ENGAGED: self.disengage_clutch() elif self._user_info.clutch_state == CS.DISENGAGED: self.engage_clutch() else: raise ValueError('clutch_state value undefined') def handle_robot_connection_lost(self): if self._user_info.selected_robot != None: lost_robot_id = self._user_info.selected_robot.robot_id self.connection_to_robot_lost(lost_robot_id) else: raise RuntimeError('Connection lost callback received when no robot was selected') def handle_master_connection_lost(self): self.connection_to_master_lost() # Operations def master_stopped(self): self._user_info.master_stop_state = SS.STOPPED if self._user_info.selected_robot != None: self.user_stopped() self.call_qt_wrapper_method('master_stopped') # self._qt_wrapper.master_stopped() def master_started(self): self._user_info.master_stop_state = SS.RUNNING if self._user_info.selected_robot != None: self.call_qt_wrapper_method('master_started') # self._qt_wrapper.master_started() def user_stopped(self): self._user_info.user_stop_state = SS.STOPPED self._ros_wrapper.user_stopped() self.call_qt_wrapper_method('user_stopped') # self._qt_wrapper.user_stopped() def user_started(self): self._user_info.user_stop_state = SS.RUNNING self._ros_wrapper.user_started() self.call_qt_wrapper_method('user_started') # self._qt_wrapper.user_started() def engage_clutch(self): self._user_info.clutch_state = CS.ENGAGED self._ros_wrapper.engage_clutch() self.call_qt_wrapper_method('engage_clutch') # self._qt_wrapper.engage_clutch() def disengage_clutch(self): self._user_info.clutch_state = CS.DISENGAGED self._ros_wrapper.disengage_clutch() self.call_qt_wrapper_method('disengage_clutch') # self._qt_wrapper.disengage_clutch() def select_robot(self,robot_id): self._ros_wrapper.select_robot(robot_id) # self._qt_wrapper.select_robot(robot_id) self.call_qt_wrapper_method_with_argument('select_robot',[robot_id]) self._user_info.select_robot(robot_id) # Stop robot and engage clutch after taking control over it to put it in starting state self.user_stopped() self.engage_clutch() if self._user_info.master_stop_state == SS.RUNNING: self.master_started() elif self._user_info.master_stop_state == SS.STOPPED: self.master_stopped() def release_robot(self): self._ros_wrapper.release_robot() self.call_qt_wrapper_method('release_robot') # self._qt_wrapper.release_robot() self._user_info.release_robot() self._user_info.master_stop_state = SS.UNKNOWN def connection_to_robot_lost(self,lost_robot_id): # pass self._ros_wrapper.release_robot() self._user_info.release_robot() self.call_qt_wrapper_method_with_argument('connection_to_robot_lost',[lost_robot_id]) # self._qt_wrapper.connection_to_robot_lost(lost_robot_id) def connection_to_master_lost(self): if self._user_info.selected_robot != None: self.master_stopped() self.call_qt_wrapper_method('connection_to_master_lost') def obstacle_detected(self): self.call_qt_wrapper_method('obstacle_detected') self.user_stopped() def obstacle_removed(self): self.call_qt_wrapper_method('obstacle_removed') def update_selected_robot_info(self,robot_info): if self._user_info.selected_robot != None: if robot_info.obstacle_detected == True and self._user_info.selected_robot.obstacle_detected == False: self.obstacle_detected() elif robot_info.obstacle_detected == False and self._user_info.selected_robot.obstacle_detected == True: self.obstacle_removed() self._user_info.update_selected_robot_info(robot_info) self.call_qt_wrapper_method_with_argument('update_selected_robot_info',[robot_info]) # self._qt_wrapper.update_selected_robot_info(robot_info)