Working with one robot

Signed-off-by: Jakub Delicat <delicat.kuba@gmail.com>
This commit is contained in:
Jakub Delicat 2023-08-23 19:43:04 +02:00
parent 21d327c74f
commit a136c86ba1
3 changed files with 29 additions and 25 deletions

View File

@ -1,5 +1,3 @@
# from .ros_master_wrapper import ROSWrapper
# from .qt_master_wrapper import QtWrapper
from qt_gui.plugin import Plugin from qt_gui.plugin import Plugin
from python_qt_binding.QtCore import QTimer from python_qt_binding.QtCore import QTimer

View File

@ -1,7 +1,5 @@
#!/usr/bin/env python #!/usr/bin/env python
# coding=utf-8 # coding=utf-8
import os
import math import math
import datetime import datetime
from ament_index_python.packages import get_package_share_directory from ament_index_python.packages import get_package_share_directory
@ -38,9 +36,9 @@ class QtWrapper:
get_package_share_directory("safety_master_plugin") get_package_share_directory("safety_master_plugin")
) )
self.distance_pixmap = QPixmap("{0}/Distance.png".format(ui_directory_path)) self.distance_pixmap = QPixmap(f"{ui_directory_path}/Distance.png")
self.angular_pixmap = QPixmap("{0}/Angular.png".format(ui_directory_path)) self.angular_pixmap = QPixmap(f"{ui_directory_path}/Angular.png")
self.linear_pixmap = QPixmap("{0}/Linear.png".format(ui_directory_path)) self.linear_pixmap = QPixmap(f"{ui_directory_path}/Linear.png")
self.widget.distanceImage.setPixmap(self.distance_pixmap) self.widget.distanceImage.setPixmap(self.distance_pixmap)
self.widget.angularImage.setPixmap(self.angular_pixmap) self.widget.angularImage.setPixmap(self.angular_pixmap)
self.widget.linearImage.setPixmap(self.linear_pixmap) self.widget.linearImage.setPixmap(self.linear_pixmap)
@ -150,7 +148,7 @@ class QtWrapper:
def update_robot_info(self, robot_info): def update_robot_info(self, robot_info):
items_to_add = [] items_to_add = []
items_to_add.append(QStandardItem("pioneer{}".format(robot_info.robot_id))) items_to_add.append(QStandardItem(f"pioneer{robot_info.robot_id}"))
items_to_add.append(QStandardItem("{:.2f}".format(robot_info.battery_voltage))) items_to_add.append(QStandardItem("{:.2f}".format(robot_info.battery_voltage)))
if robot_info.state == SS.RUNNING: if robot_info.state == SS.RUNNING:
@ -184,7 +182,7 @@ class QtWrapper:
) )
items_list = self.stdItemModel.findItems( items_list = self.stdItemModel.findItems(
"pioneer{0}".format(robot_info.robot_id), Qt.MatchExactly f"pioneer{robot_info.robot_id}", Qt.MatchExactly
) )
if len(items_list) > 0: if len(items_list) > 0:
for item in items_list: for item in items_list:
@ -216,21 +214,19 @@ class QtWrapper:
self.stdItemModel.setItem(self.stdItemModel.rowCount(), 0, new_robot_info) self.stdItemModel.setItem(self.stdItemModel.rowCount(), 0, new_robot_info)
self.displayed_robots_id_list.append(robot_id) self.displayed_robots_id_list.append(robot_id)
self.log_info("pioneer{0} połączony".format(robot_id)) self.log_info(f"pioneer{robot_id} połączony")
else: else:
raise RuntimeError("Adding robot, which is already in master GUI") raise RuntimeError("Adding robot, which is already in master GUI")
def remove_robot(self, robot_id): def remove_robot(self, robot_id):
items_list = self.stdItemModel.findItems( items_list = self.stdItemModel.findItems(f"pioneer{robot_id}", Qt.MatchExactly)
"pioneer{0}".format(robot_id), Qt.MatchExactly
)
if len(items_list) > 0: if len(items_list) > 0:
for item in items_list: for item in items_list:
row_number = item.row() row_number = item.row()
self.stdItemModel.removeRow(row_number) self.stdItemModel.removeRow(row_number)
self.log_info("pioneer{0} rozłączony".format(robot_id)) self.log_info(f"pioneer{robot_id} rozłączony")
self.displayed_robots_id_list.remove(robot_id) self.displayed_robots_id_list.remove(robot_id)
def log_info(self, info_text): def log_info(self, info_text):
@ -239,9 +235,8 @@ class QtWrapper:
cursor = self.widget.logsBrowser.textCursor() cursor = self.widget.logsBrowser.textCursor()
cursor.movePosition(QTextCursor.End) cursor.movePosition(QTextCursor.End)
self.widget.logsBrowser.setTextCursor(cursor) self.widget.logsBrowser.setTextCursor(cursor)
# self.widget.logsBrowser.insertHtml('<font color="black">' + str(time) + '. ' + info_text + '</font><br>')
self.widget.logsBrowser.insertHtml( self.widget.logsBrowser.insertHtml(
'<font color="black"> {0} {1}</font><br>'.format(time, info_text) f'<font color="black"> {time} {info_text}</font><br>'
) )
self.scroll_to_bottom() self.scroll_to_bottom()

View File

@ -30,29 +30,40 @@ class ROSWrapper(Node):
self.restrictions_publisher = None self.restrictions_publisher = None
self.setup_subscribers_and_publishers() self.setup_subscribers_and_publishers()
self.subs = []
self._qos = QoSProfile( def setup_subscribers_and_publishers(self):
publishers_qos = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE, reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
history=QoSHistoryPolicy.SYSTEM_DEFAULT, history=QoSHistoryPolicy.SYSTEM_DEFAULT,
liveliness=QoSLivelinessPolicy.AUTOMATIC, liveliness=QoSLivelinessPolicy.AUTOMATIC,
depth=10, depth=10,
) )
self.subs = []
def setup_subscribers_and_publishers(self):
self.master_stop_publisher = self.create_publisher( self.master_stop_publisher = self.create_publisher(
Bool, "/pioneers/master_stop", 10 Bool, "/pioneers/master_stop", qos_profile=publishers_qos
) )
self.restrictions_publisher = self.create_publisher( self.restrictions_publisher = self.create_publisher(
RestrictionsMsg, "/pioneers/restrictions", 10 RestrictionsMsg, "/pioneers/restrictions", qos_profile=publishers_qos
) )
def add_robot_subscriber(self, robot_id): def add_robot_subscriber(self, robot_id):
if robot_id not in self.robot_info_subscribers: if robot_id not in self.robot_info_subscribers:
topic_name = "/pioneer{0}/robot_info".format(robot_id) topic_name = "/pioneer{0}/robot_info".format(robot_id)
subscription_qos = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.VOLATILE,
history=QoSHistoryPolicy.SYSTEM_DEFAULT,
liveliness=QoSLivelinessPolicy.AUTOMATIC,
depth=10,
)
self.robot_info_subscribers[robot_id] = self.create_subscription( self.robot_info_subscribers[robot_id] = self.create_subscription(
RobotInfoMsg, topic_name, self.handle_robot_info_update, 10 RobotInfoMsg,
topic_name,
self.handle_robot_info_update,
qos_profile=subscription_qos,
) )
else: else:
raise RuntimeError( raise RuntimeError(
@ -72,11 +83,11 @@ class ROSWrapper(Node):
def unsubscribe(self, subscriber): def unsubscribe(self, subscriber):
if subscriber is not None: if subscriber is not None:
subscriber.destroy() self.destroy_subscription(subscriber)
def unregister_publisher(self, publisher): def unregister_publisher(self, publisher):
if publisher is not None: if publisher is not None:
publisher.destroy() self.destroy_publisher(publisher)
def publish_periodic_update(self): def publish_periodic_update(self):
restrictions = self.get_restrictions() restrictions = self.get_restrictions()
@ -123,7 +134,7 @@ class ROSWrapper(Node):
robots_id_list = [] robots_id_list = []
# Można to zrobić lepiej np. czekać na publikowaną wiadomość na robot_info # Można to zrobić lepiej np. czekać na publikowaną wiadomość na robot_info
for pioneer_id in range(6): for pioneer_id in range(1, 7):
published_topics_list = [] published_topics_list = []
try: try:
published_topics_list = self.get_publisher_names_and_types_by_node( published_topics_list = self.get_publisher_names_and_types_by_node(