#!/usr/bin/env python import rospy import rospkg from std_msgs.msg import Bool from safety_user_plugin.msg import RobotInfoMsg from safety_user_plugin.msg import RestrictionsMsg from robot_info import RobotInfo from restrictions import Restrictions from enums.clutch_state import ClutchState as CS from enums.stop_state import StopState as SS class ROSWrapper: def __init__(self): self.robots_list_update_callback = None self.selected_robot_info_update_callback = None self.master_stop_update_callback = None self.restrictions_update_callback = None self.get_user_stop_state = None self.get_clutch_state = None self.robot_info_subscriber = None self.master_stop_subscriber = None self.restrictions_subscriber = None self.user_stop_publisher = None self.clutch_publisher = None self.periodic_timer = None self.robots_list_timer = None self.connection_timer = None self.master_connection_timer = None # def setup_node(self): # rospy.init_node('safety_user_plugin', anonymous=True) def setup_subscribers_and_publishers(self,robot_id): robot_name = 'PIONIER' + str(robot_id) self.robot_info_subscriber = rospy.Subscriber('/RosAria/'+robot_name+'/robot_info', RobotInfoMsg, self.handle_selected_robot_info_update) self.master_stop_subscriber = rospy.Subscriber('/RosAria/master_stop', Bool, self.handle_master_stop_update) self.restrictions_subscriber = rospy.Subscriber('/RosAria/restrictions', RestrictionsMsg, self.handle_restrictions_update) self.user_stop_publisher = rospy.Publisher('/RosAria/'+robot_name+'/user_stop',Bool,queue_size = 1) self.clutch_publisher = rospy.Publisher('/RosAria/'+robot_name+'/clutch',Bool,queue_size = 1) self.periodic_timer = rospy.Timer(rospy.Duration(0.2),self.publish_periodic_update) self.connection_timer = rospy.Timer(rospy.Duration(1.5),self.handle_robot_connection_lost) self.master_connection_timer = rospy.Timer(rospy.Duration(1.5),self.handle_master_connection_lost) def unsubscribe(self,subscriber): if subscriber != None: subscriber.unregister() def unregister_publisher(self,publisher): if publisher != None: publisher.unregister() def shutdown_timer(self,timer): if timer != None: timer.shutdown() def publish_periodic_update(self,event): stop_state = self.get_user_stop_state() clutch_state = self.get_clutch_state() if stop_state == SS.RUNNING: self.user_started() elif stop_state == SS.STOPPED: self.user_stopped() else: raise ValueError('stop_state UNKNOWN when attempting to publish periodic update') if clutch_state == CS.ENGAGED: self.engage_clutch() elif clutch_state == CS.DISENGAGED: self.disengage_clutch() else: raise ValueError('clutch_state UNKNOWN when attempting to publish periodic update') def handle_robot_connection_lost(self,event): self.shutdown_timer(self.connection_timer) self.robot_connection_lost_callback() def handle_master_connection_lost(self,event): self.shutdown_timer(self.master_connection_timer) self.master_connection_lost_callback() def cancel_subscribers_and_publishers(self): self.shutdown_timer(self.periodic_timer) self.unsubscribe(self.robot_info_subscriber) self.unsubscribe(self.master_stop_subscriber) self.unsubscribe(self.restrictions_subscriber) self.unregister_publisher(self.user_stop_publisher) self.unregister_publisher(self.clutch_publisher) self.periodic_timer = None self.robot_info_subscriber = None self.master_stop_subscriber = None self.restrictions_subscriber = None self.user_stop_publisher = None self.clutch_publisher = None def restart_connection_timer(self): if self.connection_timer != None: self.connection_timer.shutdown() self.connection_timer = rospy.Timer(rospy.Duration(1.5),self.handle_robot_connection_lost) else: raise RuntimeError('Attempting to restart connection_timer when it is not initialized') def restart_master_connection_timer(self): if self.master_connection_timer != None: self.master_connection_timer.shutdown() self.master_connection_timer = rospy.Timer(rospy.Duration(1.5),self.handle_master_connection_lost) else: raise RuntimeError('Attempting to restart master_connection_timer when it is not initialized') def unregister_node(self): self.cancel_subscribers_and_publishers() rospy.signal_shutdown('Closing safety user plugin') def select_robot(self,robot_id): self.setup_subscribers_and_publishers(robot_id) def setup_get_user_stop_state_function(self,function): self.get_user_stop_state = function def setup_get_clutch_state_function(self,function): self.get_clutch_state = function # ROSWrapper Callbacks def get_robots_list(self,event): robots_id_list = [] published_topics_list = rospy.get_published_topics(namespace='/') published_topics = [] for list_ in published_topics_list: published_topics.append(list_[0]) for topic in published_topics: if topic.find('RosAria') ==-1 or topic.find('robot_info') == -1: pass else: robot_number = topic.split('/') robot_number = robot_number[2] robot_number = robot_number[7:] if len(robot_number) > 0: robot_number = int(robot_number) robots_id_list.append(robot_number) self.robots_list_update_callback(robots_id_list) def release_robot(self): self.unsubscribe(self.robot_info_subscriber) self.shutdown_timer(self.periodic_timer) self.shutdown_timer(self.connection_timer) def handle_selected_robot_info_update(self,msg): # Restarting connection timer to avoid raising robot_connection_lost_callback self.restart_connection_timer() _robot_info = RobotInfo(0) _robot_info.update_robot_info(msg) self.selected_robot_info_update_callback(_robot_info) def handle_master_stop_update(self,msg): # Restarting master connection timer to avoid raising master_connection_lost_callback self.restart_master_connection_timer() master_stop_state = SS.UNKNOWN if msg.data == True: master_stop_state = SS.RUNNING else: master_stop_state = SS.STOPPED self.master_stop_update_callback(master_stop_state) def handle_restrictions_update(self,msg): restrictions = Restrictions(msg) self.restrictions_update_callback(restrictions) # UserPlugin Callbacks def set_robots_list_update_callback(self,callback_function): self.robots_list_update_callback = callback_function self.robots_list_timer = rospy.Timer(rospy.Duration(0.5),self.get_robots_list) def set_selected_robot_info_update_callback(self,callback_function): self.selected_robot_info_update_callback = callback_function def set_master_stop_update_callback(self,callback_function): self.master_stop_update_callback = callback_function def set_restrictions_update_callback(self,callback_function): self.restrictions_update_callback = callback_function def set_robot_connection_lost_callback(self,callback_function): self.robot_connection_lost_callback = callback_function def set_master_connection_lost_callback(self,callback_function): self.master_connection_lost_callback = callback_function def engage_clutch(self): msg = Bool() msg.data = True self.clutch_publisher.publish(msg) def disengage_clutch(self): msg = Bool() msg.data = False self.clutch_publisher.publish(msg) def user_started(self): msg = Bool() msg.data = True self.user_stop_publisher.publish(msg) def user_stopped(self): msg = Bool() msg.data = False self.user_stop_publisher.publish(msg)