160 lines
5.6 KiB
Python
160 lines
5.6 KiB
Python
|
|
# Real CANopen data manager for Endurance Test Bench HMI
|
|
|
|
import canopen
|
|
import time
|
|
|
|
class EnduranceDataManager:
|
|
def __init__(self):
|
|
self.motor_data = {} # {node_id: {setpoint, feedback}}
|
|
self.pu_data = {"setpoint": {}, "feedback": {}, "flowmeter": {}, "pressure": {}, "pump": {}}
|
|
self.connected = False
|
|
self.network = None
|
|
self.nodes = {}
|
|
self.serial_numbers = {}
|
|
|
|
def connect_to_can(self):
|
|
try:
|
|
self.network = canopen.Network()
|
|
self.network.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
|
|
self._init_pu_node()
|
|
self._init_motor_nodes()
|
|
self.connected = True
|
|
except Exception as e:
|
|
print(f"[CAN ERROR] Could not connect to CAN: {e}")
|
|
self.connected = False
|
|
|
|
def disconnect(self):
|
|
if self.network:
|
|
self.network.disconnect()
|
|
self.connected = False
|
|
self.motor_data.clear()
|
|
self.pu_data = {"setpoint": {}, "feedback": {}, "flowmeter": {}, "pressure": {}, "pump": {}}
|
|
self.nodes.clear()
|
|
|
|
def send_test_command(self, value: int):
|
|
try:
|
|
if self.valve_node:
|
|
self.valve_node.sdo[0x2007].raw = value
|
|
print(f"[TEST COMMAND] Sent value {value} to 0x2007")
|
|
else:
|
|
print("[TEST COMMAND] Valve node not initialized")
|
|
except Exception as e:
|
|
print(f"[TEST COMMAND ERROR] {e}")
|
|
|
|
def _init_pu_node(self):
|
|
try:
|
|
node = canopen.RemoteNode(1, r'C:\Users\vineetagupta\Documents\NorthStar_Bitbucket\EnduranceTestBench\EnduranceTestBench\coappl\enduranceTestBench.eds')
|
|
self.network.add_node(node)
|
|
node.nmt.state = 'OPERATIONAL'
|
|
node.tpdo.read()
|
|
self.nodes[1] = node
|
|
|
|
tpdo_cobs = {
|
|
0x284: 'setpoint',
|
|
0x285: 'setpoint',
|
|
0x286: 'setpoint',
|
|
0x281: 'flowmeter',
|
|
0x282: 'pressure',
|
|
0x283: 'pump'
|
|
}
|
|
|
|
for cob_id, key in tpdo_cobs.items():
|
|
tpdo_num = self._get_tpdo_number_by_cob_id(node, cob_id)
|
|
if tpdo_num is not None:
|
|
node.tpdo[tpdo_num].enabled = True
|
|
|
|
def make_pu_cb(k):
|
|
def cb(map_obj):
|
|
for var in map_obj:
|
|
sid = f"0x{var.subindex:02X}"
|
|
self.pu_data[k][sid] = var.raw
|
|
return cb
|
|
|
|
node.tpdo[tpdo_num].add_callback(make_pu_cb(key))
|
|
|
|
except Exception as e:
|
|
print(f"PU node error: {e}")
|
|
|
|
def _get_tpdo_number_by_cob_id(self, node, cob_id):
|
|
for i in range(1, 9):
|
|
if i in node.tpdo:
|
|
if node.tpdo[i].cob_id == cob_id:
|
|
return i
|
|
return None
|
|
|
|
def get_valve_data(self):
|
|
result = {}
|
|
for i in range(5, 25):
|
|
sid = f"0x{i - 4:02X}"
|
|
setpoint = self.pu_data["setpoint"].get(sid, 0)
|
|
feedback = self.motor_data.get(i, {}).get("feedback", 0)
|
|
drift = abs(setpoint - feedback)
|
|
serial = self.serial_numbers.get(i)
|
|
status = "UNKNOWN"
|
|
try:
|
|
if i in self.nodes:
|
|
status = self.nodes[i].nmt.state or "UNKNOWN"
|
|
except:
|
|
pass
|
|
if drift >= 5 and drift <= 10:
|
|
drift_display = f"⚠️ { drift}"
|
|
elif drift >= 10:
|
|
drift_display = f"❌ { drift}"
|
|
else:
|
|
drift_display = f"{drift}"
|
|
result[i] = {
|
|
"node_id": f"{i}",
|
|
"setpoint": setpoint,
|
|
"feedback": feedback,
|
|
"drift": drift_display,
|
|
"status": status,
|
|
"serial": serial
|
|
}
|
|
return result
|
|
|
|
def _init_motor_nodes(self):
|
|
for node_id in range(5, 6):
|
|
try:
|
|
motor_node = canopen.RemoteNode(
|
|
node_id,
|
|
r'C:\Users\vineetagupta\Documents\NorthStar_Bitbucket\MotorBoard\MotorValveBoard\coappl\motorcontrollervalve.eds'
|
|
)
|
|
self.network.add_node(motor_node)
|
|
motor_node.nmt.state = 'OPERATIONAL'
|
|
motor_node.tpdo.read()
|
|
motor_node.tpdo[1].enabled = True
|
|
|
|
def make_cb(nid):
|
|
def cb(map_obj):
|
|
for var in map_obj:
|
|
if var.index == 0x2004 and var.subindex == 0x0:
|
|
self.motor_data[nid] = {"feedback": var.raw}
|
|
return cb
|
|
|
|
motor_node.tpdo[1].add_callback(make_cb(node_id))
|
|
self.nodes[node_id] = motor_node
|
|
|
|
if not hasattr(self, 'serial_numbers'):
|
|
self.serial_numbers = {}
|
|
|
|
if node_id not in self.serial_numbers:
|
|
try:
|
|
serial = motor_node.sdo[0x1018][4].raw
|
|
self.serial_numbers[node_id] = serial
|
|
except Exception as e:
|
|
print(f"[Serial Read Error] Node {node_id}: {e}")
|
|
self.serial_numbers[node_id] = "Unknown"
|
|
|
|
except Exception as e:
|
|
print(f"[Motor Node {node_id}] Error: {e}")
|
|
|
|
def get_flow_data(self):
|
|
return self.pu_data["flowmeter"]
|
|
|
|
def get_pressure_data(self):
|
|
return self.pu_data["pressure"]
|
|
|
|
def get_pump_rpm(self):
|
|
return self.pu_data["pump"].get("0x00", 0)
|