import rclpy from rclpy.node import Node from sensor_msgs.msg import JointState from nav_msgs.msg import Odometry from sensor_msgs.msg import LaserScan import time import sys from PyQt5.QtWidgets import QApplication, QWidget, QLabel, QGridLayout, QVBoxLayout, QMainWindow from PyQt5.QtCore import QTimer, Qt class PioneerStatusNode(Node): def __init__(self, gui): super().__init__('pioneer_status_node') self.gui = gui # Reference to the GUI self.topic_data = {} # Subscribing to topics for pioneers 1-6 for i in range(1, 7): for topic_type, msg_type in [("odom", Odometry), ("joint_states", JointState), ("scan", LaserScan)]: topic_name = f'/pioneer{i}/{topic_type}' self.topic_data[topic_name] = {"last_message_time": 0, "message_count": 0} self.create_subscription(msg_type, topic_name, lambda msg, topic=topic_name: self.message_callback(msg, topic), 10) def message_callback(self, msg, topic_name): # Update the last received time and message count for each topic self.topic_data[topic_name]["last_message_time"] = time.time() self.topic_data[topic_name]["message_count"] += 1 def update_gui(self): current_time = time.time() for topic_name, data in self.topic_data.items(): # Calculate frequency time_diff = current_time - data["last_message_time"] frequency = data["message_count"] / time_diff if time_diff > 0 else 0 # Determine status (active if received within last 2 seconds) status = "Active" if time_diff < 2 else "Inactive" # Update the GUI labels for this topic self.gui.update_status(topic_name, frequency, status) # Reset message count to calculate frequency anew in the next interval data["message_count"] = 0 class PioneerStatusGUI(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("Pioneer Status Monitor") self.setGeometry(100, 100, 600, 400) # Main layout and widget setup central_widget = QWidget() self.setCentralWidget(central_widget) layout = QVBoxLayout() central_widget.setLayout(layout) self.grid = QGridLayout() layout.addLayout(self.grid) # Header labels self.grid.addWidget(QLabel("Topic"), 0, 0, alignment=Qt.AlignCenter) self.grid.addWidget(QLabel("Frequency (Hz)"), 0, 1, alignment=Qt.AlignCenter) self.grid.addWidget(QLabel("Status"), 0, 2, alignment=Qt.AlignCenter) # Dictionary to store labels for each topic's frequency and status self.topic_labels = {} # Populate rows with placeholders for each topic row = 1 for i in range(1, 7): for topic_type in ["odom", "joint_states", "scan"]: topic_name = f'/pioneer{i}/{topic_type}' # Topic name label self.grid.addWidget(QLabel(topic_name), row, 0) # Frequency label freq_label = QLabel("0.00") freq_label.setAlignment(Qt.AlignCenter) self.grid.addWidget(freq_label, row, 1) # Status label status_label = QLabel("Inactive") status_label.setAlignment(Qt.AlignCenter) status_label.setStyleSheet("color: red;") self.grid.addWidget(status_label, row, 2) # Store references to labels for updates self.topic_labels[topic_name] = { "frequency": freq_label, "status": status_label } row += 1 def update_status(self, topic_name, frequency, status): # Update frequency label if topic_name in self.topic_labels: self.topic_labels[topic_name]["frequency"].setText(f"{frequency:.2f}") # Update status label color and text based on status status_label = self.topic_labels[topic_name]["status"] status_label.setText(status) status_label.setStyleSheet("color: green;" if status == "Active" else "color: red;") def main(args=None): rclpy.init(args=args) # PyQt5 application setup app = QApplication(sys.argv) gui = PioneerStatusGUI() gui.show() # Create the ROS 2 node node = PioneerStatusNode(gui) # QTimer to call rclpy.spin_once periodically timer = QTimer() timer.timeout.connect(lambda: rclpy.spin_once(node, timeout_sec=0.1)) timer.start(100) # Check for ROS messages every 100 ms # QTimer to update the GUI every second update_timer = QTimer() update_timer.timeout.connect(node.update_gui) update_timer.start(500) # Update GUI every 1 second # Run the PyQt application try: sys.exit(app.exec_()) finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()