NorthStar-Endurance-TestBench/gui.py

264 lines
11 KiB
Python

import sys
import time
import struct
import canopen
import can
from can.listener import Listener
from PyQt5.QtWidgets import (
QApplication, QWidget, QTableWidget, QTableWidgetItem, QVBoxLayout,
QHBoxLayout, QLabel, QGroupBox, QFormLayout, QPushButton, QMessageBox, QHeaderView
)
from PyQt5.QtCore import QTimer, Qt
from PyQt5.QtGui import QColor
motorBoardEdsPath = r'C:\Users\vineetagupta\Documents\NorthStar-Motor-Valve-Board-Firmware\MotorValveBoard\coappl\motorcontrollervalve.eds'
puBoardEdsPath = r'C:\Users\vineetagupta\Documents\NorthStar-Endurance-TestBench\EnduranceTestBench\coappl\enduranceTestBench.eds'
class SDOSetpointListener(Listener):
def __init__(self, parent):
self.parent = parent
def on_message_received(self, msg):
if 0x600 <= msg.arbitration_id <= 0x61F and len(msg.data) >= 8:
node_id = msg.arbitration_id - 0x600
cmd = msg.data[0] & 0xE0
index = msg.data[1] | (msg.data[2] << 8)
subindex = msg.data[3]
if index == 0x6000 and subindex == 0x01 and cmd == 0x20:
setpoint_val = msg.data[4]
self.parent.motorBoardTpdoData[node_id]['setpoint'] = setpoint_val
class NodeTableWindow(QWidget):
def __init__(self, nodes):
super().__init__()
self.setWindowTitle("Endurance Test Bench")
self.nodes = nodes
self.motorBoardTpdoData = {}
self.PuBoardData = {}
self.connected = False
self.network = None
self.num_nodes = 0
self.init_ui()
self.start_timer()
def init_ui(self):
layout = QVBoxLayout(self)
button_row = QHBoxLayout()
self.connect_btn = QPushButton("Connect to CAN", self)
self.disconnect_btn = QPushButton("Disconnect", self)
self.disconnect_btn.setEnabled(False)
self.connect_btn.clicked.connect(self.handle_connect)
self.disconnect_btn.clicked.connect(self.handle_disconnect)
button_row.addWidget(self.connect_btn)
button_row.addWidget(self.disconnect_btn)
layout.addLayout(button_row)
main_layout = QHBoxLayout()
self.table_widget = QWidget(self)
table_layout = QVBoxLayout()
table_heading = QLabel("Valve Details")
table_heading.setAlignment(Qt.AlignLeft)
table_layout.addWidget(table_heading)
self.table = QTableWidget(0, 4)
self.table.setHorizontalHeaderLabels(["Node ID", "Setpoint", "Feedback", "Status"])
self.table.horizontalHeader().setStretchLastSection(True)
self.table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch)
self.table.verticalHeader().setVisible(False)
table_layout.addWidget(self.table)
self.table_widget.setLayout(table_layout)
self.details_widget = QWidget(self)
details_layout = QVBoxLayout(self.details_widget)
pressure_group = QGroupBox("Pressure Sensor Details", self)
pressure_layout = QFormLayout(pressure_group)
self.pressure_labels = []
for i in range(1, 5):
label = QLabel("0 bar", self)
self.pressure_labels.append(label)
pressure_layout.addRow(f"Pressure Sensor {i}", label)
details_layout.addWidget(pressure_group)
flow_group = QGroupBox("Flow Meter Details", self)
flow_layout = QFormLayout(flow_group)
self.flow_rate_label = []
for i in range(1, 5):
label = QLabel("0 L/h", self)
self.flow_rate_label.append(label)
flow_layout.addRow(f"Flowmeter {i}", label)
details_layout.addWidget(flow_group)
pump_group = QGroupBox("Pump Details", self)
pump_layout = QFormLayout(pump_group)
self.pump_value_label = QLabel("0 RPM", self)
pump_layout.addRow("Speed", self.pump_value_label)
details_layout.addWidget(pump_group)
self.details_widget.setLayout(details_layout)
main_layout.addWidget(self.table_widget)
main_layout.addWidget(self.details_widget)
layout.addLayout(main_layout)
screen = QApplication.primaryScreen()
screen_geometry = screen.availableGeometry()
width = int(screen_geometry.width() * 0.4)
height = int(screen_geometry.height() * 0.7)
x = int((screen_geometry.width() - width) / 2)
y = int((screen_geometry.height() - height) / 2)
self.setGeometry(x, y, width, height)
def start_timer(self):
self.timer = QTimer()
self.timer.setInterval(500)
self.timer.timeout.connect(self.update_table)
self.timer.start()
def handle_connect(self):
try:
self.network = canopen.Network()
self.network.connect(channel='PCAN_USBBUS1', bustype='pcan', bitrate=250000)
self.nodes = {}
self.motorBoardTpdoData = {}
self.PuBoardData = {}
self.init_motor_nodes()
self.init_pu_node()
self.listener = SDOSetpointListener(self)
self.notifier = can.Notifier(self.network.bus, [self.listener])
self.connected = True
self.num_nodes = len(self.nodes)
self.table.setRowCount(self.num_nodes)
for row, node_id in enumerate(sorted(self.nodes.keys())):
item = QTableWidgetItem(str(node_id))
item.setTextAlignment(Qt.AlignCenter)
self.table.setItem(row, 0, item)
self.connect_btn.setStyleSheet("background-color: lightgreen;")
self.disconnect_btn.setEnabled(True)
QMessageBox.information(self, "Connected", "Successfully connected to CAN and initialized nodes.")
except Exception as e:
QMessageBox.critical(self, "Connection Error", f"Failed to connect:\n{e}")
self.connected = False
def handle_disconnect(self):
try:
if hasattr(self, 'notifier'):
self.notifier.stop()
if self.network:
self.network.disconnect()
self.connected = False
self.nodes.clear()
self.motorBoardTpdoData.clear()
self.PuBoardData.clear()
self.table.clearContents()
self.table.setRowCount(0)
self.connect_btn.setEnabled(True)
self.disconnect_btn.setEnabled(False)
self.connect_btn.setStyleSheet("")
QMessageBox.information(self, "Disconnected", "CAN network disconnected.")
except Exception as e:
QMessageBox.warning(self, "Disconnect Error", f"Error while disconnecting:\n{e}")
def init_motor_nodes(self):
for node_id in range(5, 27):
try:
node = canopen.RemoteNode(node_id, motorBoardEdsPath)
self.network.add_node(node)
time.sleep(0.1)
self.nodes[node_id] = node
self.motorBoardTpdoData[node_id] = {'setpoint': '-', 'feedback': '-'}
if node.nmt.state == 'OPERATIONAL':
node.tpdo.read()
node.tpdo[1].enabled = True
def make_callback(nid):
def cb(map_obj):
for var in map_obj:
if var.index == 0x6002:
self.motorBoardTpdoData[nid]['feedback'] = var.raw
return cb
node.tpdo[1].add_callback(make_callback(node_id))
except Exception as e:
print(f"Motor node {node_id} error: {e}")
def init_pu_node(self):
try:
node = canopen.RemoteNode(0x1, puBoardEdsPath)
self.network.add_node(node)
time.sleep(0.1)
self.nodes[1] = node
self.PuBoardData = {'flowmeter': {}, 'pressure': {}, 'pump': {}}
if node.nmt.state == 'OPERATIONAL':
node.tpdo.read()
# Enable all relevant TPDOs
for tpdo_id in range(4, 9): # TPDO4 to TPDO8
if tpdo_id in node.tpdo:
node.tpdo[tpdo_id].enabled = True
def make_callback():
def cb(map_obj):
for var in map_obj:
if var.index == 0x6004:
self.PuBoardData['flowmeter'][f'0x{var.subindex:02X}'] = var.raw
elif var.index == 0x6005:
self.PuBoardData['pressure'][f'0x{var.subindex:02X}'] = var.raw
elif var.index == 0x6006:
self.PuBoardData['pump'][f'0x{var.subindex:02X}'] = var.raw
return cb
node.tpdo[tpdo_id].add_callback(make_callback())
except Exception as e:
print(f"PU node error: {e}")
def update_table(self):
if not self.connected:
return
for row, node_id in enumerate(sorted(self.nodes.keys())):
node = self.nodes[node_id]
try:
status = node.nmt.state
except:
status = "UNKNOWN"
node_item = self.table.item(row, 0)
if node_item:
node_item.setBackground(
QColor("green") if status == "OPERATIONAL" else
QColor("yellow") if status == "PRE-OPERATIONAL" else
QColor("red")
)
set_val = str(self.motorBoardTpdoData.get(node_id, {}).get('setpoint', '-'))
setpoint_item = QTableWidgetItem(set_val)
setpoint_item.setTextAlignment(Qt.AlignCenter)
self.table.setItem(row, 1, setpoint_item)
fb_val = str(self.motorBoardTpdoData.get(node_id, {}).get('feedback', '-'))
feedback_item = QTableWidgetItem(fb_val)
feedback_item.setTextAlignment(Qt.AlignCenter)
self.table.setItem(row, 2, feedback_item)
status_item = QTableWidgetItem(status)
status_item.setTextAlignment(Qt.AlignCenter)
self.table.setItem(row, 3, status_item)
# Update flowmeter values
for i in range(1, 5):
value = self.PuBoardData.get("flowmeter", {}).get(f"0x{i:02X}", 0)
self.flow_rate_label[i-1].setText(f"{value} L/h")
# Update pressure sensor values
for i in range(1, 5):
value = self.PuBoardData.get("pressure", {}).get(f"0x{i:02X}", 0)
self.pressure_labels[i-1].setText(f"{value} bar")
# Update pump speed
value = self.PuBoardData.get("pump", {}).get("0x00", 0)
self.pump_value_label.setText(f"{value} RPM")
def main():
app = QApplication(sys.argv)
window = NodeTableWindow({})
window.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()