opened window | removed ros1 wrappers

Signed-off-by: Jakub Delicat <delicat.kuba@gmail.com>
This commit is contained in:
Jakub Delicat 2023-08-16 12:38:35 +02:00
parent ce405ab4f1
commit 2a755bf328
4 changed files with 74 additions and 66 deletions

View File

@ -1,15 +1,15 @@
<library path="scripts"> <library path="scripts">
<class name="safety_user_plugin" type="user_plugin.UserPlugin" base_class_type="rqt_gui_py::Plugin"> <class name="safety_user_plugin" type="safety_user_plugin.user_plugin.UserPlugin" base_class_type="rqt_gui_py::Plugin">
<description> <description>
Safety system user plugin Safety system user plugin
</description> </description>
<qtgui> <qtgui>
<group> <group>
<label>Safety system</label> <label>L1.5 Safety system</label>
<icon type="theme">folder</icon> <icon type="theme">folder</icon>
</group> </group>
<label>User</label> <label>User</label>
<icon type="theme">system-help</icon> <icon type="theme">system-help</icon>
</qtgui> </qtgui>
</class> </class>
</library> </library>

View File

@ -6,6 +6,9 @@ import os
import math import math
import datetime import datetime
from ament_index_python.packages import get_package_share_directory
from safety_user_plugin.enums.clutch_state import ClutchState as CS from safety_user_plugin.enums.clutch_state import ClutchState as CS
from safety_user_plugin.enums.stop_state import StopState as SS from safety_user_plugin.enums.stop_state import StopState as SS
@ -39,7 +42,7 @@ class QtWrapper:
self.clear_robot_info() self.clear_robot_info()
ui_directory_path = "{0}/ui".format( ui_directory_path = "{0}/ui".format(
rospkg.RosPack().get_path("safety_user_plugin") get_package_share_directory("safety_user_plugin")
) )
self.distance_pixmap = QPixmap("{0}/Distance.png".format(ui_directory_path)) self.distance_pixmap = QPixmap("{0}/Distance.png".format(ui_directory_path))

View File

@ -33,7 +33,7 @@ class ROSWrapper:
self.robots_list_timer = None self.robots_list_timer = None
self.connection_timer = None self.connection_timer = None
self.master_connection_timer = None self.master_connection_timer = None
self.hz_update_timer = rospy.Timer(rospy.Duration(0.2), self.update_hz_values) # self.hz_update_timer = rospy.Timer(rospy.Duration(0.2), self.update_hz_values)
self.robots_hz_subscribers_dict = {} self.robots_hz_subscribers_dict = {}
self.rostopics_hz_dict = {} self.rostopics_hz_dict = {}
@ -45,46 +45,46 @@ class ROSWrapper:
def setup_subscribers_and_publishers(self, robot_id): def setup_subscribers_and_publishers(self, robot_id):
robot_name = "PIONIER" + str(robot_id) robot_name = "PIONIER" + str(robot_id)
self.robot_info_subscriber = rospy.Subscriber( # self.robot_info_subscriber = rospy.Subscriber(
"/{0}/RosAria/robot_info".format(robot_name), # "/{0}/RosAria/robot_info".format(robot_name),
RobotInfoMsg, # RobotInfoMsg,
self.handle_selected_robot_info_update, # self.handle_selected_robot_info_update,
) # )
self.master_stop_subscriber = rospy.Subscriber( # self.master_stop_subscriber = rospy.Subscriber(
"/PIONIER/master_stop", Bool, self.handle_master_stop_update # "/PIONIER/master_stop", Bool, self.handle_master_stop_update
) # )
self.restrictions_subscriber = rospy.Subscriber( # self.restrictions_subscriber = rospy.Subscriber(
"/PIONIER/restrictions", RestrictionsMsg, self.handle_restrictions_update # "/PIONIER/restrictions", RestrictionsMsg, self.handle_restrictions_update
) # )
self.user_stop_publisher = rospy.Publisher( # self.user_stop_publisher = rospy.Publisher(
"/{0}/RosAria/user_stop".format(robot_name), Bool, queue_size=1 # "/{0}/RosAria/user_stop".format(robot_name), Bool, queue_size=1
) # )
self.clutch_publisher = rospy.Publisher( # self.clutch_publisher = rospy.Publisher(
"/{0}/RosAria/clutch".format(robot_name), Bool, queue_size=1 # "/{0}/RosAria/clutch".format(robot_name), Bool, queue_size=1
) # )
if self.periodic_timer != None: # if self.periodic_timer != None:
self.shutdown_timer(self.periodic_timer) # self.shutdown_timer(self.periodic_timer)
self.periodic_timer = rospy.Timer( # self.periodic_timer = rospy.Timer(
rospy.Duration(0.1), self.publish_periodic_update # rospy.Duration(0.1), self.publish_periodic_update
) # )
print("NEW") # print("NEW")
if self.connection_timer != None: # if self.connection_timer != None:
self.shutdown_timer(self.connection_timer) # self.shutdown_timer(self.connection_timer)
self.connection_timer = rospy.Timer( # self.connection_timer = rospy.Timer(
rospy.Duration(5), self.handle_robot_connection_lost # rospy.Duration(5), self.handle_robot_connection_lost
) # )
if self.master_connection_timer != None: # if self.master_connection_timer != None:
self.shutdown_timer(self.master_connection_timer) # self.shutdown_timer(self.master_connection_timer)
self.master_connection_timer = rospy.Timer( # self.master_connection_timer = rospy.Timer(
rospy.Duration(1.5), self.handle_master_connection_lost # rospy.Duration(1.5), self.handle_master_connection_lost
) # )
if self.hz_update_timer != None: # if self.hz_update_timer != None:
self.shutdown_timer(self.hz_update_timer) # self.shutdown_timer(self.hz_update_timer)
self.hz_update_timer = rospy.Timer(rospy.Duration(0.2), self.update_hz_values) # self.hz_update_timer = rospy.Timer(rospy.Duration(0.2), self.update_hz_values)
def unsubscribe(self, subscriber): def unsubscribe(self, subscriber):
if subscriber != None: if subscriber != None:
@ -150,9 +150,9 @@ class ROSWrapper:
def restart_connection_timer(self): def restart_connection_timer(self):
if self.connection_timer != None: if self.connection_timer != None:
self.connection_timer.shutdown() self.connection_timer.shutdown()
self.connection_timer = rospy.Timer( # self.connection_timer = rospy.Timer(
rospy.Duration(1.5), self.handle_robot_connection_lost # rospy.Duration(1.5), self.handle_robot_connection_lost
) # )
else: else:
raise RuntimeError( raise RuntimeError(
"Attempting to restart connection_timer when it is not initialized" "Attempting to restart connection_timer when it is not initialized"
@ -161,9 +161,9 @@ class ROSWrapper:
def restart_master_connection_timer(self): def restart_master_connection_timer(self):
if self.master_connection_timer != None: if self.master_connection_timer != None:
self.master_connection_timer.shutdown() self.master_connection_timer.shutdown()
self.master_connection_timer = rospy.Timer( # self.master_connection_timer = rospy.Timer(
rospy.Duration(1.5), self.handle_master_connection_lost # rospy.Duration(1.5), self.handle_master_connection_lost
) # )
else: else:
raise RuntimeError( raise RuntimeError(
"Attempting to restart master_connection_timer when it is not initialized" "Attempting to restart master_connection_timer when it is not initialized"
@ -171,7 +171,7 @@ class ROSWrapper:
def unregister_node(self): def unregister_node(self):
self.cancel_subscribers_and_publishers() self.cancel_subscribers_and_publishers()
rospy.signal_shutdown("Closing safety user plugin") # rospy.signal_shutdown("Closing safety user plugin")
def select_robot(self, robot_id): def select_robot(self, robot_id):
self.setup_subscribers_and_publishers(robot_id) self.setup_subscribers_and_publishers(robot_id)
@ -186,11 +186,11 @@ class ROSWrapper:
def get_robots_list(self, event): def get_robots_list(self, event):
robots_id_list = [] robots_id_list = []
published_topics_list = rospy.get_published_topics(namespace="/") # published_topics_list = rospy.get_published_topics(namespace="/")
published_topics = [] published_topics = []
for list_ in published_topics_list: # for list_ in published_topics_list:
published_topics.append(list_[0]) # published_topics.append(list_[0])
for topic in published_topics: for topic in published_topics:
if topic.find("RosAria") == -1 or topic.find("robot_info") == -1: if topic.find("RosAria") == -1 or topic.find("robot_info") == -1:
@ -207,13 +207,14 @@ class ROSWrapper:
for robot_id in robots_id_list: for robot_id in robots_id_list:
if robot_id not in self.robots_hz_subscribers_dict: if robot_id not in self.robots_hz_subscribers_dict:
# Add hz subscriber # Add hz subscriber
self.rostopics_hz_dict[robot_id] = rostopic.ROSTopicHz(-1) print("ADDED SUB")
self.robots_hz_subscribers_dict[robot_id] = rospy.Subscriber( # self.rostopics_hz_dict[robot_id] = rostopic.ROSTopicHz(-1)
"/PIONIER{0}/RosAria/user_stop".format(robot_id), # self.robots_hz_subscribers_dict[robot_id] = rospy.Subscriber(
Bool, # "/PIONIER{0}/RosAria/user_stop".format(robot_id),
self.rostopics_hz_dict[robot_id].callback_hz, # Bool,
callback_args="/PIONIER{0}/RosAria/user_stop".format(robot_id), # self.rostopics_hz_dict[robot_id].callback_hz,
) # callback_args="/PIONIER{0}/RosAria/user_stop".format(robot_id),
# )
# Removing old hz subscribers # Removing old hz subscribers
for robot_key in self.robots_hz_subscribers_dict: for robot_key in self.robots_hz_subscribers_dict:
@ -258,7 +259,7 @@ class ROSWrapper:
# UserPlugin Callbacks # UserPlugin Callbacks
def set_robots_list_update_callback(self, callback_function): def set_robots_list_update_callback(self, callback_function):
self.robots_list_update_callback = callback_function self.robots_list_update_callback = callback_function
self.robots_list_timer = rospy.Timer(rospy.Duration(0.5), self.get_robots_list) # self.robots_list_timer = rospy.Timer(rospy.Duration(0.5), self.get_robots_list)
def set_selected_robot_info_update_callback(self, callback_function): def set_selected_robot_info_update_callback(self, callback_function):
self.selected_robot_info_update_callback = callback_function self.selected_robot_info_update_callback = callback_function

View File

@ -1,18 +1,22 @@
#!/usr/bin/env python #!/usr/bin/env python
import os import os
import rospkg from ament_index_python.packages import get_package_share_directory
from python_qt_binding.QtWidgets import QWidget from python_qt_binding.QtWidgets import QWidget
from python_qt_binding import loadUi from python_qt_binding import loadUi
class UserWidget(QWidget):
def __init__(self,context): class UserWidget(QWidget):
def __init__(self, context):
super(UserWidget, self).__init__() super(UserWidget, self).__init__()
ui_file = os.path.join(rospkg.RosPack().get_path('safety_user_plugin'), 'ui', 'user_view.ui') ui_file = os.path.join(
get_package_share_directory("safety_user_plugin"), "ui", "user_view.ui"
)
loadUi(ui_file, self) loadUi(ui_file, self)
self.setObjectName('User Plugin - L1.5 safety system') self.setObjectName("User Plugin - L1.5 safety system")
if context.serial_number() > 1: if context.serial_number() > 1:
self.setWindowTitle(self.windowTitle() + (' (%d)' % context.serial_number())) self.setWindowTitle(
context.add_widget(self) self.windowTitle() + (" (%d)" % context.serial_number())
)
context.add_widget(self)