#!/usr/bin/env python import rospy import rospkg import rostopic from std_msgs.msg import Bool from rosaria_msgs.msg import RobotInfoMsg from rosaria_msgs.msg import RestrictionsMsg from robot_info import RobotInfo from restrictions import Restrictions from enums.clutch_state import ClutchState as CS from enums.stop_state import StopState as SS class ROSWrapper: def __init__(self): self.robots_list_update_callback = None self.selected_robot_info_update_callback = None self.master_stop_update_callback = None self.restrictions_update_callback = None self.get_user_stop_state = None self.get_clutch_state = None self.robot_info_subscriber = None self.master_stop_subscriber = None self.restrictions_subscriber = None self.user_stop_publisher = None self.clutch_publisher = None self.periodic_timer = None self.robots_list_timer = None self.connection_timer = None self.master_connection_timer = None self.hz_update_timer = rospy.Timer(rospy.Duration(0.2),self.update_hz_values) self.robots_hz_subscribers_dict = {} self.rostopics_hz_dict = {} self.robots_hz_value = {} # def setup_node(self): # rospy.init_node('safety_user_plugin', anonymous=True) def setup_subscribers_and_publishers(self,robot_id): robot_name = 'PIONIER' + str(robot_id) self.robot_info_subscriber = rospy.Subscriber('/{0}/RosAria/robot_info'.format(robot_name), RobotInfoMsg, self.handle_selected_robot_info_update) self.master_stop_subscriber = rospy.Subscriber('/PIONIER/master_stop', Bool, self.handle_master_stop_update) self.restrictions_subscriber = rospy.Subscriber('/PIONIER/restrictions', RestrictionsMsg, self.handle_restrictions_update) self.user_stop_publisher = rospy.Publisher('/{0}/RosAria/user_stop'.format(robot_name),Bool,queue_size = 1) self.clutch_publisher = rospy.Publisher('/{0}/RosAria/clutch'.format(robot_name),Bool,queue_size = 1) if self.periodic_timer != None: self.shutdown_timer(self.periodic_timer) self.periodic_timer = rospy.Timer(rospy.Duration(0.1),self.publish_periodic_update) print("NEW") if self.connection_timer != None: self.shutdown_timer(self.connection_timer) self.connection_timer = rospy.Timer(rospy.Duration(5),self.handle_robot_connection_lost) if self.master_connection_timer != None: self.shutdown_timer(self.master_connection_timer) self.master_connection_timer = rospy.Timer(rospy.Duration(1.5),self.handle_master_connection_lost) if self.hz_update_timer != None: self.shutdown_timer(self.hz_update_timer) self.hz_update_timer = rospy.Timer(rospy.Duration(0.2),self.update_hz_values) def unsubscribe(self,subscriber): if subscriber != None: subscriber.unregister() def unregister_publisher(self,publisher): if publisher != None: publisher.unregister() def shutdown_timer(self,timer): if timer != None: timer.shutdown() def publish_periodic_update(self,event): stop_state = self.get_user_stop_state() # clutch_state = self.get_clutch_state() if stop_state == SS.RUNNING: self.user_started() elif stop_state == SS.STOPPED: self.user_stopped() else: raise ValueError('stop_state UNKNOWN when attempting to publish periodic update') # if clutch_state == CS.ENGAGED: # self.engage_clutch() # elif clutch_state == CS.DISENGAGED: # self.disengage_clutch() # else: # raise ValueError('clutch_state UNKNOWN when attempting to publish periodic update') def update_hz_values(self,event): for key in self.rostopics_hz_dict: self.robots_hz_value[key] = self.rostopics_hz_dict[key].get_hz('/PIONIER{0}/RosAria/user_stop'.format(key)) def handle_robot_connection_lost(self,event): self.shutdown_timer(self.connection_timer) self.robot_connection_lost_callback() def handle_master_connection_lost(self,event): self.shutdown_timer(self.master_connection_timer) self.master_connection_lost_callback() def cancel_subscribers_and_publishers(self): self.shutdown_timer(self.periodic_timer) self.unsubscribe(self.robot_info_subscriber) self.unsubscribe(self.master_stop_subscriber) self.unsubscribe(self.restrictions_subscriber) self.unregister_publisher(self.user_stop_publisher) self.unregister_publisher(self.clutch_publisher) self.periodic_timer = None self.robot_info_subscriber = None self.master_stop_subscriber = None self.restrictions_subscriber = None self.user_stop_publisher = None self.clutch_publisher = None def restart_connection_timer(self): if self.connection_timer != None: self.connection_timer.shutdown() self.connection_timer = rospy.Timer(rospy.Duration(1.5),self.handle_robot_connection_lost) else: raise RuntimeError('Attempting to restart connection_timer when it is not initialized') def restart_master_connection_timer(self): if self.master_connection_timer != None: self.master_connection_timer.shutdown() self.master_connection_timer = rospy.Timer(rospy.Duration(1.5),self.handle_master_connection_lost) else: raise RuntimeError('Attempting to restart master_connection_timer when it is not initialized') def unregister_node(self): self.cancel_subscribers_and_publishers() rospy.signal_shutdown('Closing safety user plugin') def select_robot(self,robot_id): self.setup_subscribers_and_publishers(robot_id) def setup_get_user_stop_state_function(self,function): self.get_user_stop_state = function def setup_get_clutch_state_function(self,function): self.get_clutch_state = function # ROSWrapper Callbacks def get_robots_list(self,event): robots_id_list = [] published_topics_list = rospy.get_published_topics(namespace='/') published_topics = [] for list_ in published_topics_list: published_topics.append(list_[0]) for topic in published_topics: if topic.find('RosAria') ==-1 or topic.find('robot_info') == -1: pass else: robot_number = topic.split('/') robot_number = robot_number[1] robot_number = robot_number[7:] if len(robot_number) > 0: robot_number = int(robot_number) robots_id_list.append(robot_number) # Adding new hz subscribers for robot_id in robots_id_list: if robot_id not in self.robots_hz_subscribers_dict: # Add hz subscriber self.rostopics_hz_dict[robot_id] = rostopic.ROSTopicHz(-1) self.robots_hz_subscribers_dict[robot_id] = rospy.Subscriber('/PIONIER{0}/RosAria/user_stop'.format(robot_id),Bool, self.rostopics_hz_dict[robot_id].callback_hz,callback_args='/PIONIER{0}/RosAria/user_stop'.format(robot_id)) # Removing old hz subscribers for robot_key in self.robots_hz_subscribers_dict: if robot_key not in robots_id_list: # Remove old subscriber self.unsubscribe(self.robots_hz_subscribers_dict[robot_id]) self.robots_hz_subscribers_dict[robot_id] = None self.rostopics_hz_dict[robot_id] = None self.robots_list_update_callback(robots_id_list) def release_robot(self): self.unsubscribe(self.robot_info_subscriber) self.shutdown_timer(self.periodic_timer) self.shutdown_timer(self.connection_timer) self.shutdown_timer(self.master_connection_timer) def handle_selected_robot_info_update(self,msg): # Restarting connection timer to avoid raising robot_connection_lost_callback self.restart_connection_timer() _robot_info = RobotInfo(0) _robot_info.update_robot_info(msg) self.selected_robot_info_update_callback(_robot_info) def handle_master_stop_update(self,msg): # Restarting master connection timer to avoid raising master_connection_lost_callback self.restart_master_connection_timer() master_stop_state = SS.UNKNOWN if msg.data == True: master_stop_state = SS.RUNNING else: master_stop_state = SS.STOPPED self.master_stop_update_callback(master_stop_state) def handle_restrictions_update(self,msg): restrictions = Restrictions(msg) self.restrictions_update_callback(restrictions) # UserPlugin Callbacks def set_robots_list_update_callback(self,callback_function): self.robots_list_update_callback = callback_function self.robots_list_timer = rospy.Timer(rospy.Duration(0.5),self.get_robots_list) def set_selected_robot_info_update_callback(self,callback_function): self.selected_robot_info_update_callback = callback_function def set_master_stop_update_callback(self,callback_function): self.master_stop_update_callback = callback_function def set_restrictions_update_callback(self,callback_function): self.restrictions_update_callback = callback_function def set_robot_connection_lost_callback(self,callback_function): self.robot_connection_lost_callback = callback_function def set_master_connection_lost_callback(self,callback_function): self.master_connection_lost_callback = callback_function def engage_clutch(self): msg = Bool() msg.data = True self.clutch_publisher.publish(msg) def disengage_clutch(self): msg = Bool() msg.data = False self.clutch_publisher.publish(msg) def user_started(self): msg = Bool() msg.data = True self.user_stop_publisher.publish(msg) def user_stopped(self): msg = Bool() msg.data = False self.user_stop_publisher.publish(msg)