import sys import time import struct import canopen import can from can.listener import Listener from PyQt5.QtWidgets import ( QApplication, QWidget, QTableWidget, QTableWidgetItem, QVBoxLayout, QHBoxLayout, QLabel, QGroupBox, QFormLayout, QPushButton, QMessageBox, QHeaderView ) from PyQt5.QtCore import QTimer, Qt from PyQt5.QtGui import QColor motorBoardEdsPath = r'C:\Users\vineetagupta\Documents\NorthStar-Motor-Valve-Board-Firmware\MotorValveBoard\coappl\motorcontrollervalve.eds' puBoardEdsPath = r'C:\Users\vineetagupta\Documents\NorthStar-Endurance-TestBench\EnduranceTestBench\coappl\enduranceTestBench.eds' class SDOSetpointListener(Listener): def __init__(self, parent): self.parent = parent def on_message_received(self, msg): if 0x600 <= msg.arbitration_id <= 0x61F and len(msg.data) >= 8: node_id = msg.arbitration_id - 0x600 cmd = msg.data[0] & 0xE0 index = msg.data[1] | (msg.data[2] << 8) subindex = msg.data[3] if index == 0x6000 and subindex == 0x01 and cmd == 0x20: setpoint_val = msg.data[4] self.parent.motorBoardTpdoData[node_id]['setpoint'] = setpoint_val class NodeTableWindow(QWidget): def __init__(self, nodes): super().__init__() self.setWindowTitle("Endurance Test Bench") self.nodes = nodes self.motorBoardTpdoData = {} self.PuBoardData = {} self.connected = False self.network = None self.num_nodes = 0 self.init_ui() self.start_timer() def init_ui(self): layout = QVBoxLayout(self) button_row = QHBoxLayout() self.connect_btn = QPushButton("Connect to CAN", self) self.disconnect_btn = QPushButton("Disconnect", self) self.disconnect_btn.setEnabled(False) self.connect_btn.clicked.connect(self.handle_connect) self.disconnect_btn.clicked.connect(self.handle_disconnect) button_row.addWidget(self.connect_btn) button_row.addWidget(self.disconnect_btn) layout.addLayout(button_row) main_layout = QHBoxLayout() self.table_widget = QWidget(self) table_layout = QVBoxLayout() table_heading = QLabel("Valve Details") table_heading.setAlignment(Qt.AlignLeft) table_layout.addWidget(table_heading) self.table = QTableWidget(0, 4) self.table.setHorizontalHeaderLabels(["Node ID", "Setpoint", "Feedback", "Status"]) self.table.horizontalHeader().setStretchLastSection(True) self.table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) self.table.verticalHeader().setVisible(False) table_layout.addWidget(self.table) self.table_widget.setLayout(table_layout) self.details_widget = QWidget(self) details_layout = QVBoxLayout(self.details_widget) pressure_group = QGroupBox("Pressure Sensor Details", self) pressure_layout = QFormLayout(pressure_group) self.pressure_labels = [] for i in range(1, 5): label = QLabel("0 bar", self) self.pressure_labels.append(label) pressure_layout.addRow(f"Pressure Sensor {i}", label) details_layout.addWidget(pressure_group) flow_group = QGroupBox("Flow Meter Details", self) flow_layout = QFormLayout(flow_group) self.flow_rate_label = [] for i in range(1, 5): label = QLabel("0 L/h", self) self.flow_rate_label.append(label) flow_layout.addRow(f"Flowmeter {i}", label) details_layout.addWidget(flow_group) pump_group = QGroupBox("Pump Details", self) pump_layout = QFormLayout(pump_group) self.pump_value_label = QLabel("0 RPM", self) pump_layout.addRow("Speed", self.pump_value_label) details_layout.addWidget(pump_group) self.details_widget.setLayout(details_layout) main_layout.addWidget(self.table_widget) main_layout.addWidget(self.details_widget) layout.addLayout(main_layout) screen = QApplication.primaryScreen() screen_geometry = screen.availableGeometry() width = int(screen_geometry.width() * 0.4) height = int(screen_geometry.height() * 0.7) x = int((screen_geometry.width() - width) / 2) y = int((screen_geometry.height() - height) / 2) self.setGeometry(x, y, width, height) def start_timer(self): self.timer = QTimer() self.timer.setInterval(500) self.timer.timeout.connect(self.update_table) self.timer.start() def handle_connect(self): try: self.network = canopen.Network() self.network.connect(channel='PCAN_USBBUS1', bustype='pcan', bitrate=250000) self.nodes = {} self.motorBoardTpdoData = {} self.PuBoardData = {} self.init_motor_nodes() self.init_pu_node() self.listener = SDOSetpointListener(self) self.notifier = can.Notifier(self.network.bus, [self.listener]) self.connected = True self.num_nodes = len(self.nodes) self.table.setRowCount(self.num_nodes) for row, node_id in enumerate(sorted(self.nodes.keys())): item = QTableWidgetItem(str(node_id)) item.setTextAlignment(Qt.AlignCenter) self.table.setItem(row, 0, item) self.connect_btn.setStyleSheet("background-color: lightgreen;") self.disconnect_btn.setEnabled(True) QMessageBox.information(self, "Connected", "Successfully connected to CAN and initialized nodes.") except Exception as e: QMessageBox.critical(self, "Connection Error", f"Failed to connect:\n{e}") self.connected = False def handle_disconnect(self): try: if hasattr(self, 'notifier'): self.notifier.stop() if self.network: self.network.disconnect() self.connected = False self.nodes.clear() self.motorBoardTpdoData.clear() self.PuBoardData.clear() self.table.clearContents() self.table.setRowCount(0) self.connect_btn.setEnabled(True) self.disconnect_btn.setEnabled(False) self.connect_btn.setStyleSheet("") QMessageBox.information(self, "Disconnected", "CAN network disconnected.") except Exception as e: QMessageBox.warning(self, "Disconnect Error", f"Error while disconnecting:\n{e}") def init_motor_nodes(self): for node_id in range(5, 27): try: node = canopen.RemoteNode(node_id, motorBoardEdsPath) self.network.add_node(node) time.sleep(0.1) self.nodes[node_id] = node self.motorBoardTpdoData[node_id] = {'setpoint': '-', 'feedback': '-'} if node.nmt.state == 'OPERATIONAL': node.tpdo.read() node.tpdo[1].enabled = True def make_callback(nid): def cb(map_obj): for var in map_obj: if var.index == 0x6002: self.motorBoardTpdoData[nid]['feedback'] = var.raw return cb node.tpdo[1].add_callback(make_callback(node_id)) except Exception as e: print(f"Motor node {node_id} error: {e}") def init_pu_node(self): try: node = canopen.RemoteNode(0x1, puBoardEdsPath) self.network.add_node(node) time.sleep(0.1) self.nodes[1] = node self.PuBoardData = {'flowmeter': {}, 'pressure': {}, 'pump': {}} if node.nmt.state == 'OPERATIONAL': node.tpdo.read() # Enable all relevant TPDOs for tpdo_id in range(4, 9): # TPDO4 to TPDO8 if tpdo_id in node.tpdo: node.tpdo[tpdo_id].enabled = True def make_callback(): def cb(map_obj): for var in map_obj: if var.index == 0x6004: self.PuBoardData['flowmeter'][f'0x{var.subindex:02X}'] = var.raw elif var.index == 0x6005: self.PuBoardData['pressure'][f'0x{var.subindex:02X}'] = var.raw elif var.index == 0x6006: self.PuBoardData['pump'][f'0x{var.subindex:02X}'] = var.raw return cb node.tpdo[tpdo_id].add_callback(make_callback()) except Exception as e: print(f"PU node error: {e}") def update_table(self): if not self.connected: return for row, node_id in enumerate(sorted(self.nodes.keys())): node = self.nodes[node_id] try: status = node.nmt.state except: status = "UNKNOWN" node_item = self.table.item(row, 0) if node_item: node_item.setBackground( QColor("green") if status == "OPERATIONAL" else QColor("yellow") if status == "PRE-OPERATIONAL" else QColor("red") ) set_val = str(self.motorBoardTpdoData.get(node_id, {}).get('setpoint', '-')) setpoint_item = QTableWidgetItem(set_val) setpoint_item.setTextAlignment(Qt.AlignCenter) self.table.setItem(row, 1, setpoint_item) fb_val = str(self.motorBoardTpdoData.get(node_id, {}).get('feedback', '-')) feedback_item = QTableWidgetItem(fb_val) feedback_item.setTextAlignment(Qt.AlignCenter) self.table.setItem(row, 2, feedback_item) status_item = QTableWidgetItem(status) status_item.setTextAlignment(Qt.AlignCenter) self.table.setItem(row, 3, status_item) # Update flowmeter values for i in range(1, 5): value = self.PuBoardData.get("flowmeter", {}).get(f"0x{i:02X}", 0) self.flow_rate_label[i-1].setText(f"{value} L/h") # Update pressure sensor values for i in range(1, 5): value = self.PuBoardData.get("pressure", {}).get(f"0x{i:02X}", 0) self.pressure_labels[i-1].setText(f"{value} bar") # Update pump speed value = self.PuBoardData.get("pump", {}).get("0x00", 0) self.pump_value_label.setText(f"{value} RPM") def main(): app = QApplication(sys.argv) window = NodeTableWindow({}) window.show() sys.exit(app.exec_()) if __name__ == "__main__": main()