#!/usr/bin/env python import rospy import rospkg from std_msgs.msg import Bool from safety_user_plugin.msg import RobotInfoMsg from safety_user_plugin.msg import RestrictionsMsg from robot_info import RobotInfo from master_restrictions import Restrictions from enums.stop_state import StopState as SS class ROSWrapper: def __init__(self): self.robots_list_update_callback = None self.robot_info_update_callback = None self.robot_info_subscribers = {} self.master_stop_publisher = None self.restrictions_publisher = None self.robots_list_timer = None self.periodic_timer = None self.setup_subscribers_and_publishers() def setup_subscribers_and_publishers(self): self.master_stop_publisher = rospy.Publisher('/RosAria/master_stop',Bool,queue_size = 1) self.restrictions_publisher = rospy.Publisher('/RosAria/restrictions',RestrictionsMsg,queue_size = 1) self.periodic_timer = rospy.Timer(rospy.Duration(0.5),self.publish_periodic_update) def add_robot_subscriber(self,robot_id): if robot_id not in self.robot_info_subscribers: topic_name = '/RosAria/PIONIER{0}/robot_info'.format(robot_id) new_sub = rospy.Subscriber(topic_name, RobotInfoMsg, self.handle_robot_info_update) self.robot_info_subscribers[robot_id] = new_sub else: raise RuntimeError('Subscriber for PIONIER{0} already in dictionary'.format(robot_id)) def remove_robot_subscriber(self,robot_id): if robot_id in self.robot_info_subscribers: self.unsubscribe(self.robot_info_subscribers[robot_id]) del self.robot_info_subscribers[robot_id] else: raise RuntimeError('Subscriber for PIONIER{0} not in dictionary. Cannot be removed'.format(robot_id)) def unsubscribe(self,subscriber): if subscriber != None: subscriber.unregister() def unregister_publisher(self,publisher): if publisher != None: publisher.unregister() def shutdown_timer(self,timer): if timer != None: timer.shutdown() def publish_periodic_update(self,event): restrictions = self.get_restrictions() restrictions_msg = RestrictionsMsg() restrictions_msg.distance.data = restrictions.distance restrictions_msg.linear_velocity.data = restrictions.linear_velocity restrictions_msg.angular_velocity.data = restrictions.angular_velocity stop_state = self.get_master_stop_state() stop_msg = Bool() if stop_state == SS.RUNNING: stop_msg.data = True elif stop_state == SS.STOPPED: stop_msg.data = False else: stop_msg.data = False raise ValueError('stop_state UNKNOWN when attempting to publish periodic update') self.restrictions_publisher.publish(restrictions_msg) self.master_stop_publisher.publish(stop_msg) def cancel_subscribers_and_publishers(self): self.shutdown_timer(self.robots_list_timer) self.shutdown_timer(self.periodic_timer) self.unregister_publisher(self.master_stop_publisher) self.unregister_publisher(self.restrictions_publisher) for key, value in self.robot_info_subscribers.iteritems(): self.unsubscribe(value) def unregister_node(self): self.cancel_subscribers_and_publishers() rospy.signal_shutdown('Closing safety master plugin') def setup_get_master_stop_state_function(self,function): self.get_master_stop_state = function def setup_get_restrictions_function(self,function): self.get_restrictions = function # ROSWrapper Callbacks def get_robots_list(self,event): robots_id_list = [] published_topics_list = rospy.get_published_topics(namespace='/') published_topics = [] for list_ in published_topics_list: published_topics.append(list_[0]) for topic in published_topics: if topic.find('RosAria') ==-1 or topic.find('robot_info') == -1: pass else: robot_number = topic.split('/') robot_number = robot_number[2] robot_number = robot_number[7:] if len(robot_number) > 0: robot_number = int(robot_number) robots_id_list.append(robot_number) self.robots_list_update_callback(robots_id_list) def handle_robot_info_update(self,msg): _robot_info = RobotInfo(0) _robot_info.update_robot_info(msg) self.robot_info_update_callback(_robot_info) # MasterPlugin Callbacks def set_robots_list_update_callback(self,callback_function): self.robots_list_update_callback = callback_function self.robots_list_timer = rospy.Timer(rospy.Duration(0.5),self.get_robots_list) def set_robot_info_update_callback(self,callback_function): self.robot_info_update_callback = callback_function def master_started(self): msg = Bool() msg.data = True self.master_stop_publisher.publish(msg) def master_stopped(self): msg = Bool() msg.data = False self.master_stop_publisher.publish(msg) def restrictions_changed(self,restrictions): msg = RestrictionsMsg() msg.distance.data = restrictions.distance msg.linear_velocity.data = restrictions.linear_velocity msg.angular_velocity.data = restrictions.angular_velocity self.restrictions_publisher.publish(msg)