NorthStar-Endurance-TestBench/FASTapi/endurance_can.py

160 lines
5.6 KiB
Python

# Real CANopen data manager for Endurance Test Bench HMI
import canopen
import time
class EnduranceDataManager:
def __init__(self):
self.motor_data = {} # {node_id: {setpoint, feedback}}
self.pu_data = {"setpoint": {}, "feedback": {}, "flowmeter": {}, "pressure": {}, "pump": {}}
self.connected = False
self.network = None
self.nodes = {}
self.serial_numbers = {}
def connect_to_can(self):
try:
self.network = canopen.Network()
self.network.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
self._init_pu_node()
self._init_motor_nodes()
self.connected = True
except Exception as e:
print(f"[CAN ERROR] Could not connect to CAN: {e}")
self.connected = False
def disconnect(self):
if self.network:
self.network.disconnect()
self.connected = False
self.motor_data.clear()
self.pu_data = {"setpoint": {}, "feedback": {}, "flowmeter": {}, "pressure": {}, "pump": {}}
self.nodes.clear()
def send_test_command(self, value: int):
try:
if self.valve_node:
self.valve_node.sdo[0x2007].raw = value
print(f"[TEST COMMAND] Sent value {value} to 0x2007")
else:
print("[TEST COMMAND] Valve node not initialized")
except Exception as e:
print(f"[TEST COMMAND ERROR] {e}")
def _init_pu_node(self):
try:
node = canopen.RemoteNode(1, r'C:\Users\vineetagupta\Documents\NorthStar_Bitbucket\EnduranceTestBench\EnduranceTestBench\coappl\enduranceTestBench.eds')
self.network.add_node(node)
node.nmt.state = 'OPERATIONAL'
node.tpdo.read()
self.nodes[1] = node
tpdo_cobs = {
0x284: 'setpoint',
0x285: 'setpoint',
0x286: 'setpoint',
0x281: 'flowmeter',
0x282: 'pressure',
0x283: 'pump'
}
for cob_id, key in tpdo_cobs.items():
tpdo_num = self._get_tpdo_number_by_cob_id(node, cob_id)
if tpdo_num is not None:
node.tpdo[tpdo_num].enabled = True
def make_pu_cb(k):
def cb(map_obj):
for var in map_obj:
sid = f"0x{var.subindex:02X}"
self.pu_data[k][sid] = var.raw
return cb
node.tpdo[tpdo_num].add_callback(make_pu_cb(key))
except Exception as e:
print(f"PU node error: {e}")
def _get_tpdo_number_by_cob_id(self, node, cob_id):
for i in range(1, 9):
if i in node.tpdo:
if node.tpdo[i].cob_id == cob_id:
return i
return None
def get_valve_data(self):
result = {}
for i in range(5, 25):
sid = f"0x{i - 4:02X}"
setpoint = self.pu_data["setpoint"].get(sid, 0)
feedback = self.motor_data.get(i, {}).get("feedback", 0)
drift = abs(setpoint - feedback)
serial = self.serial_numbers.get(i)
status = "UNKNOWN"
try:
if i in self.nodes:
status = self.nodes[i].nmt.state or "UNKNOWN"
except:
pass
if drift >= 5 and drift <= 10:
drift_display = f"⚠️ { drift}"
elif drift >= 10:
drift_display = f"{ drift}"
else:
drift_display = f"{drift}"
result[i] = {
"node_id": f"{i}",
"setpoint": setpoint,
"feedback": feedback,
"drift": drift_display,
"status": status,
"serial": serial
}
return result
def _init_motor_nodes(self):
for node_id in range(5, 6):
try:
motor_node = canopen.RemoteNode(
node_id,
r'C:\Users\vineetagupta\Documents\NorthStar_Bitbucket\MotorBoard\MotorValveBoard\coappl\motorcontrollervalve.eds'
)
self.network.add_node(motor_node)
motor_node.nmt.state = 'OPERATIONAL'
motor_node.tpdo.read()
motor_node.tpdo[1].enabled = True
def make_cb(nid):
def cb(map_obj):
for var in map_obj:
if var.index == 0x2004 and var.subindex == 0x0:
self.motor_data[nid] = {"feedback": var.raw}
return cb
motor_node.tpdo[1].add_callback(make_cb(node_id))
self.nodes[node_id] = motor_node
if not hasattr(self, 'serial_numbers'):
self.serial_numbers = {}
if node_id not in self.serial_numbers:
try:
serial = motor_node.sdo[0x1018][4].raw
self.serial_numbers[node_id] = serial
except Exception as e:
print(f"[Serial Read Error] Node {node_id}: {e}")
self.serial_numbers[node_id] = "Unknown"
except Exception as e:
print(f"[Motor Node {node_id}] Error: {e}")
def get_flow_data(self):
return self.pu_data["flowmeter"]
def get_pressure_data(self):
return self.pu_data["pressure"]
def get_pump_rpm(self):
return self.pu_data["pump"].get("0x00", 0)