ros2aria/pioneer_status.py

137 lines
5.1 KiB
Python
Raw Normal View History

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from nav_msgs.msg import Odometry
from sensor_msgs.msg import LaserScan
import time
import sys
from PyQt5.QtWidgets import QApplication, QWidget, QLabel, QGridLayout, QVBoxLayout, QMainWindow
from PyQt5.QtCore import QTimer, Qt
class PioneerStatusNode(Node):
def __init__(self, gui):
super().__init__('pioneer_status_node')
self.gui = gui # Reference to the GUI
self.topic_data = {}
# Subscribing to topics for pioneers 1-6
for i in range(1, 7):
for topic_type, msg_type in [("odom", Odometry), ("joint_states", JointState), ("scan", LaserScan)]:
topic_name = f'/pioneer{i}/{topic_type}'
self.topic_data[topic_name] = {"last_message_time": 0, "message_count": 0}
self.create_subscription(msg_type, topic_name, lambda msg, topic=topic_name: self.message_callback(msg, topic), 10)
def message_callback(self, msg, topic_name):
# Update the last received time and message count for each topic
self.topic_data[topic_name]["last_message_time"] = time.time()
self.topic_data[topic_name]["message_count"] += 1
def update_gui(self):
current_time = time.time()
for topic_name, data in self.topic_data.items():
# Calculate frequency
time_diff = current_time - data["last_message_time"]
frequency = data["message_count"] / time_diff if time_diff > 0 else 0
# Determine status (active if received within last 2 seconds)
status = "Active" if time_diff < 2 else "Inactive"
# Update the GUI labels for this topic
self.gui.update_status(topic_name, frequency, status)
# Reset message count to calculate frequency anew in the next interval
data["message_count"] = 0
class PioneerStatusGUI(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Pioneer Status Monitor")
self.setGeometry(100, 100, 600, 400)
# Main layout and widget setup
central_widget = QWidget()
self.setCentralWidget(central_widget)
layout = QVBoxLayout()
central_widget.setLayout(layout)
self.grid = QGridLayout()
layout.addLayout(self.grid)
# Header labels
self.grid.addWidget(QLabel("Topic"), 0, 0, alignment=Qt.AlignCenter)
self.grid.addWidget(QLabel("Frequency (Hz)"), 0, 1, alignment=Qt.AlignCenter)
self.grid.addWidget(QLabel("Status"), 0, 2, alignment=Qt.AlignCenter)
# Dictionary to store labels for each topic's frequency and status
self.topic_labels = {}
# Populate rows with placeholders for each topic
row = 1
for i in range(1, 7):
for topic_type in ["odom", "joint_states", "scan"]:
topic_name = f'/pioneer{i}/{topic_type}'
# Topic name label
self.grid.addWidget(QLabel(topic_name), row, 0)
# Frequency label
freq_label = QLabel("0.00")
freq_label.setAlignment(Qt.AlignCenter)
self.grid.addWidget(freq_label, row, 1)
# Status label
status_label = QLabel("Inactive")
status_label.setAlignment(Qt.AlignCenter)
status_label.setStyleSheet("color: red;")
self.grid.addWidget(status_label, row, 2)
# Store references to labels for updates
self.topic_labels[topic_name] = {
"frequency": freq_label,
"status": status_label
}
row += 1
def update_status(self, topic_name, frequency, status):
# Update frequency label
if topic_name in self.topic_labels:
self.topic_labels[topic_name]["frequency"].setText(f"{frequency:.2f}")
# Update status label color and text based on status
status_label = self.topic_labels[topic_name]["status"]
status_label.setText(status)
status_label.setStyleSheet("color: green;" if status == "Active" else "color: red;")
def main(args=None):
rclpy.init(args=args)
# PyQt5 application setup
app = QApplication(sys.argv)
gui = PioneerStatusGUI()
gui.show()
# Create the ROS 2 node
node = PioneerStatusNode(gui)
# QTimer to call rclpy.spin_once periodically
timer = QTimer()
timer.timeout.connect(lambda: rclpy.spin_once(node, timeout_sec=0.1))
timer.start(100) # Check for ROS messages every 100 ms
# QTimer to update the GUI every second
update_timer = QTimer()
update_timer.timeout.connect(node.update_gui)
update_timer.start(500) # Update GUI every 1 second
# Run the PyQt application
try:
sys.exit(app.exec_())
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()