# Real CANopen data manager for Endurance Test Bench HMI import canopen import time class EnduranceDataManager: def __init__(self): self.motor_data = {} # {node_id: {setpoint, feedback}} self.pu_data = {"setpoint": {}, "feedback": {}, "flowmeter": {}, "pressure": {}, "pump": {}} self.connected = False self.network = None self.nodes = {} self.serial_numbers = {} def connect_to_can(self): try: self.network = canopen.Network() self.network.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000) self._init_pu_node() self._init_motor_nodes() self.connected = True except Exception as e: print(f"[CAN ERROR] Could not connect to CAN: {e}") self.connected = False def disconnect(self): if self.network: self.network.disconnect() self.connected = False self.motor_data.clear() self.pu_data = {"setpoint": {}, "feedback": {}, "flowmeter": {}, "pressure": {}, "pump": {}} self.nodes.clear() def _init_pu_node(self): try: node = canopen.RemoteNode(1, r'C:\Users\vineetagupta\Documents\NorthStar_Bitbucket\EnduranceTestBench\EnduranceTestBench\coappl\enduranceTestBench.eds') self.network.add_node(node) node.nmt.state = 'OPERATIONAL' node.tpdo.read() self.nodes[1] = node # Enable and hook TPDOs for valve and system data tpdo_cobs = { 0x284: 'setpoint', 0x285: 'setpoint', 0x286: 'setpoint', 0x281: 'flowmeter', 0x282: 'pressure', 0x283: 'pump' } for cob_id, key in tpdo_cobs.items(): tpdo_num = self._get_tpdo_number_by_cob_id(node, cob_id) if tpdo_num is not None: node.tpdo[tpdo_num].enabled = True def make_pu_cb(k): def cb(map_obj): for var in map_obj: sid = f"0x{var.subindex:02X}" self.pu_data[k][sid] = var.raw return cb node.tpdo[tpdo_num].add_callback(make_pu_cb(key)) except Exception as e: print(f"PU node error: {e}") def _get_tpdo_number_by_cob_id(self, node, cob_id): for i in range(1, 9): if i in node.tpdo: if node.tpdo[i].cob_id == cob_id: return i return None def get_valve_data(self): result = {} for i in range(5, 25): sid = f"0x{i - 4:02X}" setpoint = self.pu_data["setpoint"].get(sid, 0) feedback = self.motor_data.get(i, {}).get("feedback", 0) drift = abs(setpoint - feedback) serial = self.serial_numbers.get(i) status = "UNKNOWN" try: if i in self.nodes: status = self.nodes[i].nmt.state or "UNKNOWN" except: pass # Drift symbol if drift >= 5 and drift <= 10: drift_display = f"⚠️ { drift}" elif drift >= 10: drift_display = f"❌ { drift}" else: drift_display = f"{drift}" result[i] = { "node_id": f"{i}", "setpoint": setpoint, "feedback": feedback, "drift": drift_display, "status": status, "serial": serial } return result def _init_motor_nodes(self): for node_id in range(5, 25): # Motor nodes try: motor_node = canopen.RemoteNode( node_id, r'C:\Users\vineetagupta\Documents\NorthStar_Bitbucket\MotorBoard\MotorValveBoard\coappl\motorcontrollervalve.eds' ) self.network.add_node(motor_node) motor_node.nmt.state = 'OPERATIONAL' motor_node.tpdo.read() motor_node.tpdo[1].enabled = True # === Add callback to read feedback === def make_cb(nid): def cb(map_obj): for var in map_obj: if var.index == 0x2004 and var.subindex == 0x0: self.motor_data[nid] = {"feedback": var.raw} return cb motor_node.tpdo[1].add_callback(make_cb(node_id)) self.nodes[node_id] = motor_node # === Serial number read (only once) === if not hasattr(self, 'serial_numbers'): self.serial_numbers = {} if node_id not in self.serial_numbers: try: serial = motor_node.sdo[0x1018][4].raw self.serial_numbers[node_id] = serial except Exception as e: print(f"[Serial Read Error] Node {node_id}: {e}") self.serial_numbers[node_id] = "Unknown" except Exception as e: print(f"[Motor Node {node_id}] Error: {e}") def get_flow_data(self): return self.pu_data["flowmeter"] def get_pressure_data(self): return self.pu_data["pressure"] def get_pump_rpm(self): return self.pu_data["pump"].get("0x00", 0)