safety_user_plugin/safety_user_plugin/user_plugin.py
Jakub Delicat 1c621bf684 Working with one robot
Signed-off-by: Jakub Delicat <delicat.kuba@gmail.com>
2023-08-23 19:43:24 +02:00

281 lines
10 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
import os
from qt_gui.plugin import Plugin
from python_qt_binding.QtCore import QObject
from python_qt_binding.QtCore import pyqtSignal
from python_qt_binding.QtCore import QTimer
from safety_user_plugin.qt_wrapper import QtWrapper
from safety_user_plugin.ros_wrapper import ROSWrapper
from safety_user_plugin.user_widget import UserWidget
from safety_user_plugin.user_info import UserInfo
from safety_user_plugin.enums.clutch_state import ClutchState as CS
from safety_user_plugin.enums.stop_state import StopState as SS
class CallbackHandler(QObject):
signal = pyqtSignal(str)
signal_with_list_argument = pyqtSignal(str, list)
def __init__(self):
super(CallbackHandler, self).__init__()
def connect(self, slot):
self.signal.connect(slot)
def list_connect(self, slot):
self.signal_with_list_argument.connect(slot)
class UserPlugin(Plugin):
def __init__(self, context):
super(UserPlugin, self).__init__(context)
self.setObjectName("User Plugin - L1.5 safety system")
self.cbhandler = CallbackHandler()
self._widget = UserWidget(context)
self._qt_wrapper = QtWrapper(self._widget)
self._ros_wrapper = ROSWrapper()
self._user_info = UserInfo()
self._user_info.user_stop_state = SS.STOPPED
self._user_info.master_stop_state = SS.UNKNOWN
self._user_info.clutch_state = CS.UNKNOWN
# Setup functions to get _user_info states from _ros_wrapper
self._ros_wrapper.setup_get_user_stop_state_function(
self._user_info.get_user_stop_state
)
self._ros_wrapper.setup_get_clutch_state_function(
self._user_info.get_clutch_state
)
self.set_callback_functions()
# At the end!
self._qt_wrapper.connect_signals()
self._ros_wrapper.set_robots_list_update_callback(
self.handle_robots_list_update
)
self.cbhandler.signal.connect(self._qt_wrapper.handle_emitted_signal)
self.cbhandler.signal_with_list_argument.connect(
self._qt_wrapper.handle_emitted_signal_with_list_argument
)
# timer to consecutively send twist messages
self._update_parameter_timer = QTimer(self)
self._update_parameter_timer.timeout.connect(
self._ros_wrapper.qt_timer_callback
)
self._update_parameter_timer.start(100)
def call_qt_wrapper_method(self, method_name):
self.cbhandler.signal.emit(method_name)
def call_qt_wrapper_method_with_argument(self, method_name, argument):
self.cbhandler.signal_with_list_argument.emit(method_name, argument)
def shutdown_plugin(self):
self._ros_wrapper.unregister_node()
def set_callback_functions(self):
self.set_Qt_callback_functions()
self.set_ROS_callback_functions()
def set_ROS_callback_functions(self):
self._ros_wrapper.set_selected_robot_info_update_callback(
self.handle_selected_robot_info_update
)
self._ros_wrapper.set_master_stop_update_callback(
self.handle_master_stop_update
)
self._ros_wrapper.set_restrictions_update_callback(
self.handle_restrictions_update
)
self._ros_wrapper.set_robot_connection_lost_callback(
self.handle_robot_connection_lost
)
self._ros_wrapper.set_master_connection_lost_callback(
self.handle_master_connection_lost
)
def set_Qt_callback_functions(self):
self._qt_wrapper.set_robot_selection_callback(self.handle_robot_selection)
self._qt_wrapper.set_robot_release_callback(self.handle_robot_release)
self._qt_wrapper.set_user_stop_state_updated_callback(
self.handle_user_stop_state_updated
)
self._qt_wrapper.set_clutch_switched_callback(self.handle_clutch_switched)
# ROS callback functions
def handle_robots_list_update(self, robots_id_list):
self._user_info.update_robots_id_list(robots_id_list)
self._qt_wrapper.update_robots_list_gui(robots_id_list)
def handle_selected_robot_info_update(self, robot_info):
if robot_info is not None:
self.update_selected_robot_info(robot_info)
else:
raise RuntimeError("Updated robot_info is NoneType object")
def handle_master_stop_update(self, master_stop_state):
if master_stop_state != self._user_info.master_stop_state:
self._user_info.master_stop_state = master_stop_state
if master_stop_state == SS.STOPPED:
self.master_stopped()
elif master_stop_state == SS.RUNNING:
self.master_started()
else:
raise ValueError("Undefined master_stop_state received")
def handle_restrictions_update(self, restrictions):
self._qt_wrapper.update_restrictions_gui(restrictions)
# Qt callback functions
def handle_robot_selection(self, robot_id):
if self._user_info.selected_robot is None:
# TODO: hz subskrypcje służą do sprawdzaNIA czy robot jest już zajęty
# if self._ros_wrapper.robots_hz_value[robot_id] == None:
self.select_robot(robot_id)
# else:
# self._qt_wrapper.robot_selected_by_another_group_notify()
else:
raise RuntimeError("User already selected robot")
def handle_robot_release(self):
if self._user_info.selected_robot != None:
self.release_robot()
else:
raise RuntimeError("Cannot release. No robot selected")
def handle_user_stop_state_updated(self):
if self._user_info.selected_robot.obstacle_detected == True:
self._qt_wrapper.obstacle_is_detected_notify()
else:
if self._user_info.master_stop_state == SS.STOPPED:
self._qt_wrapper.master_is_stopped_notify()
elif self._user_info.master_stop_state == SS.RUNNING:
if self._user_info.user_stop_state == SS.RUNNING:
self.user_stopped()
elif self._user_info.user_stop_state == SS.STOPPED:
self.user_started()
else:
self._qt_wrapper.log_master_inactive()
else:
self._qt_wrapper.log_master_inactive()
def handle_clutch_switched(self):
if self._user_info.clutch_state == CS.ENGAGED:
self.disengage_clutch()
elif self._user_info.clutch_state == CS.DISENGAGED:
self.engage_clutch()
else:
raise ValueError("clutch_state value undefined")
def handle_robot_connection_lost(self):
if self._user_info.selected_robot != None:
lost_robot_id = self._user_info.selected_robot.robot_id
self.connection_to_robot_lost(lost_robot_id)
else:
raise RuntimeError(
"Connection lost callback received when no robot was selected"
)
def handle_master_connection_lost(self):
self.connection_to_master_lost()
# Operations
def master_stopped(self):
self._user_info.master_stop_state = SS.STOPPED
if self._user_info.selected_robot != None:
self.user_stopped()
self.call_qt_wrapper_method("master_stopped")
def master_started(self):
self._user_info.master_stop_state = SS.RUNNING
if self._user_info.selected_robot != None:
self.call_qt_wrapper_method("master_started")
def user_stopped(self):
self._user_info.user_stop_state = SS.STOPPED
self._ros_wrapper.user_robot_disabled()
self.call_qt_wrapper_method("user_stopped")
def user_started(self):
self._user_info.user_stop_state = SS.RUNNING
self._ros_wrapper.user_robot_enabled()
self.call_qt_wrapper_method("user_started")
def engage_clutch(self):
self._user_info.clutch_state = CS.ENGAGED
self._ros_wrapper.engage_clutch()
self.call_qt_wrapper_method("engage_clutch")
def disengage_clutch(self):
self._user_info.clutch_state = CS.DISENGAGED
self._ros_wrapper.disengage_clutch()
self.call_qt_wrapper_method("disengage_clutch")
def select_robot(self, robot_id):
self._ros_wrapper.select_robot(robot_id)
self.call_qt_wrapper_method_with_argument("select_robot", [robot_id])
self._user_info.select_robot(robot_id)
# Stop robot and engage clutch after taking control over it to put it in starting state
self.user_stopped()
self.engage_clutch()
if self._user_info.master_stop_state == SS.RUNNING:
self.master_started()
elif self._user_info.master_stop_state == SS.STOPPED:
self.master_stopped()
def release_robot(self):
self._ros_wrapper.release_robot()
self.call_qt_wrapper_method("release_robot")
self._user_info.release_robot()
self._user_info.master_stop_state = SS.UNKNOWN
def connection_to_robot_lost(self, lost_robot_id):
self._ros_wrapper.release_robot()
self._user_info.release_robot()
self.call_qt_wrapper_method_with_argument(
"connection_to_robot_lost", [lost_robot_id]
)
def connection_to_master_lost(self):
if self._user_info.selected_robot is not None:
self.master_stopped()
self.call_qt_wrapper_method("connection_to_master_lost")
def obstacle_detected(self):
self.call_qt_wrapper_method("obstacle_detected")
self.user_stopped()
def obstacle_removed(self):
self.call_qt_wrapper_method("obstacle_removed")
def update_selected_robot_info(self, robot_info):
if self._user_info.selected_robot is not None:
if (
robot_info.obstacle_detected is True
and self._user_info.selected_robot.obstacle_detected is False
):
self.obstacle_detected()
elif (
robot_info.obstacle_detected is False
and self._user_info.selected_robot.obstacle_detected is True
):
self.obstacle_removed()
self._user_info.update_selected_robot_info(robot_info)
self.call_qt_wrapper_method_with_argument(
"update_selected_robot_info", [robot_info]
)