#!/usr/bin/env python import rospy import rospkg from std_msgs.msg import Bool from safety_user_plugin.msg import RobotInfoMsg from safety_user_plugin.msg import RestrictionsMsg from robot_info import RobotInfo from restrictions import Restrictions from enums.clutch_state import ClutchState as CS from enums.stop_state import StopState as SS class ROSWrapper: def __init__(self): self.robots_list_update_callback = None self.selected_robot_info_update_callback = None self.master_stop_update_callback = None self.restrictions_update_callback = None self.get_user_stop_state = None self.get_clutch_state = None self.robot_info_subscriber = None self.master_stop_subscriber = None self.restrictions_subscriber = None self.user_stop_publisher = None self.clutch_publisher = None self.periodic_timer = None def setup_node(self): rospy.init_node('safety_user_plugin', anonymous=True) def setup_subscribers_and_publishers(self,robot_id): robot_name = 'PIONIER' + str(robot_id) self.robot_info_subscriber = rospy.Subscriber('/RosAria/'+robot_name+'/robot_info', RobotInfoMsg, self.handle_selected_robot_info_update) self.master_stop_subscriber = rospy.Subscriber('/RosAria/master_stop', Bool, self.handle_master_stop_update) self.restrictions_subscriber = rospy.Subscriber('/RosAria/restrictions', RestrictionsMsg, self.handle_restrictions_update) self.user_stop_publisher = rospy.Publisher('/RosAria/'+robot_name+'/user_stop',Bool,queue_size = 1) self.clutch_publisher = rospy.Publisher('/RosAria/'+robot_name+'/clutch',Bool,queue_size = 1) self.periodic_timer = rospy.Timer(rospy.Duration(0.2),self.publish_periodic_update) def unsubscribe(self,subscriber): if subscriber != None: subscriber.unsubscribe() def unregister_publisher(self,publisher): if publisher != None: publisher.unregister() def shutdown_timer(self,timer): if timer != None: timer.shutdown() def publish_periodic_update(self,event): stop_state = self.get_user_stop_state() clutch_state = self.get_clutch_state() if stop_state == SS.RUNNING: self.user_started() elif stop_state == SS.STOPPED: self.user_stopped() else: # TODO Raise wrong value error raise NotImplementedError if clutch_state == CS.ENGAGED: self.engage_clutch() elif clutch_state == CS.DISENGAGED: self.disengage_clutch() else: # TODO Raise wrong value error raise NotImplementedError def cancel_subscribers_and_publishers(self): self.shutdown_timer(self.periodic_timer) self.unsubscribe(self.robot_info_subscriber) self.unsubscribe(self.master_stop_subscriber) self.unsubscribe(self.restrictions_subscriber) self.unregister_publisher(self.user_stop_publisher) self.unregister_publisher(self.clutch_publisher) self.periodic_timer = None self.robot_info_subscriber = None self.master_stop_subscriber = None self.restrictions_subscriber = None self.user_stop_publisher = None self.clutch_publisher = None def unregister_node(self): self.cancel_subscribers_and_publishers() rospy.signal_shutdown('Closing safety user plugin') def select_robot(self,robot_id): self.setup_subscribers_and_publishers(robot_id) def setup_get_user_stop_state_function(self,function): self.get_user_stop_state = function def setup_get_clutch_state_function(self,function): self.get_clutch_state = function def scan_for_robots(self): robots_list = [] published_topics_list = rospy.get_published_topics(namespace='/') published_topics = [] for list_ in published_topics_list: published_topics.append(list_[0]) for topic in published_topics: if topic.find('RosAria') ==-1 or topic.find('robot_state') == -1: pass else: robot_number = topic.split('/') robot_number = robot_number[1] robot_number = robot_number[7:] if len(robot_number) > 0: robot_number = int(robot_number) robots_list.append(robot_number) return robots_list # ROSWrapper Callbacks def get_robots_list(self): robots_id_list = [] published_topics_list = rospy.get_published_topics(namespace='/') published_topics = [] for list_ in published_topics_list: published_topics.append(list_[0]) for topic in published_topics: if topic_find('RosAria') and topic_find('robot_state'): robot_number = topic.split('/') robot_number = robot_number[1] robot_number = robot_number[7:] if len(robot_number) > 0: robot_number = int(robot_number) robots_id_list.append(robot_number) self.robots_list_update_callback(robots_id_list) def handle_selected_robot_info_update(self,msg): _robot_info = RobotInfo(msg) self.selected_robot_info_update_callback(_robot_info) def handle_master_stop_update(self,msg): master_stop_state = SS.UNKNOWN if msg.data == True: master_stop_state = SS.RUNNING else: master_stop_state = SS.STOPPED self.master_stop_update_callback(master_stop_state) def handle_restrictions_update(self,msg): restrictions = Restrictions(msg) restrictions_update_callback(restrictions) # UserPlugin Callbacks def set_robots_list_update_callback(self,callback_function): self.robots_list_update_callback = callback_function def set_selected_robot_info_update_callback(self,callback_function): self.selected_robot_info_update_callback = callback_function def set_master_stop_update_callback(self,callback_function): self.master_stop_update_callback = callback_function def set_restrictions_update_callback(self,callback_function): self.restrictions_update_callback = callback_function def engage_clutch(self): msg = Bool() msg.data = True self.clutch_publisher.Publish(msg) def disengage_clutch(self): msg = Bool() msg.data = False self.clutch_publisher.Publish(msg) def user_started(): msg = Bool() msg.data = True self.user_stop_publisher.Publish(msg) def user_stopped(): msg = Bool() msg.data = False self.user_stop_publisher.Publish(msg)