import threading import canopen import can import time import os import logging class CANBackend: """ CANBackend handles CANopen communication with two Process Units (PU1 and PU2). It listens for TPDOs, tracks real-time data, and sends SDO control commands such as setting system modes and setpoints. """ def __init__(self, eds_file=None): """ Initialize the CAN backend. :param eds_file: Optional path to the EDS file to use for the master node. """ self.network = None self.master_node = None self.master_node_id = 0x16 # Docking board node ID self.nodes = {} self.connected = False self.lock = threading.Lock() self.latest_data = { 0: {}, #Docking Parameters 1: {}, # PU1 data 2: {}, # PU2 data } # Default EDS file path self.eds_path = eds_file if eds_file else os.path.join(os.path.dirname(__file__), "eds_file", "dockingBoard_0.eds") def connect(self): """ Connects to the CAN network and sets up the master node. :return: True if successful, False otherwise. """ try: self.network = canopen.Network() self.network.connect(channel='can0', bustype='socketcan') self.master_node = canopen.RemoteNode(self.master_node_id, self.eds_path) self.network.add_node(self.master_node) self.master_node.nmt.state = 'OPERATIONAL' self.nodes[0] = self.master_node # Start background listener for TPDOs self.listener_active = True self.bus = can.interface.Bus(channel='can0', bustype='socketcan') self.listener_thread = threading.Thread(target=self._can_listener_loop, daemon=True) self.listener_thread.start() self.connected = True return True except Exception as e: print(f"[CONNECT ERROR] {e}") return False def shutdown(self): """ Cleanly shuts down the CAN backend and listener. """ self.listener_active = False if self.network: self.network.disconnect() if hasattr(self, 'bus'): self.bus.shutdown() self.nodes.clear() self.connected = False def _can_listener_loop(self): """ Background thread to listen for CAN TPDO messages. Updates the internal state for PU1 and PU2 based on COB-ID. """ while self.listener_active: msg = self.bus.recv(1.0) if msg is None: continue try: cob_id = msg.arbitration_id data = msg.data with self.lock: # ========== PU1 COB-IDs ========== if cob_id == 0x2A6 and len(data) >= 8: self.latest_data[1].update({ "FM1": int.from_bytes(data[0:2], 'little') / 100.0 * 60.0, "FM2": int.from_bytes(data[2:4], 'little') / 100.0 * 60.0, "FM3": int.from_bytes(data[4:6], 'little') / 100.0 * 60.0, "FM4": int.from_bytes(data[6:8], 'little') / 100.0 * 60.0, }) elif cob_id == 0x2A7 and len(data) == 6: self.latest_data[1].update({ "PS1": int.from_bytes(data[0:2], 'little') / 1000.0, "PS2": int.from_bytes(data[2:4], 'little') / 1000.0, "PS3": int.from_bytes(data[4:6], 'little') / 1000.0, }) elif cob_id == 0x2A8 and len(data) >= 8: self.latest_data[1].update({ "MV02_sp": int.from_bytes(data[0:2], 'little') / 100.0, "MV03_sp": int.from_bytes(data[2:4], 'little') / 100.0, "MV04_sp": int.from_bytes(data[4:6], 'little') / 100.0, "MV05_sp": int.from_bytes(data[6:8], 'little') / 100.0, }) elif cob_id == 0x2A9 and len(data) >= 8: self.latest_data[1].update({ "MV06_sp": int.from_bytes(data[0:2], 'little') / 100.0, "MV07_sp": int.from_bytes(data[2:4], 'little') / 100.0, "MV08_sp": int.from_bytes(data[4:6], 'little') / 100.0, "Pump_sp": int.from_bytes(data[6:8], 'little') / 100.0, }) elif cob_id == 0x2AA and len(data) >= 7: data = list(data) self.latest_data[1].update({ "MV02": 100 * data[0] / 255, "MV03": 100 * data[1] / 255, "MV04": 100 * data[2] / 255, "MV05": 100 * data[3] / 255, "MV06": 100 * data[4] / 255, "MV07": 100 * data[5] / 255, "MV08": 100 * data[6] / 255, }) elif cob_id == 0x2AB and len(data) >= 7: self.latest_data[1].update({ "PU1_STATE" : data[0], "Conductivity_Feed" : int.from_bytes(data[1:3], 'little') / 100.0, "Conductivity_Permeate": int.from_bytes(data[3:5], 'little') / 100.0, "Conductivity_Product" : int.from_bytes(data[5:7], 'little') / 100.0, }) # ========== PU2 COB-IDs ========== elif cob_id == 0x2AD and len(data) >= 8: self.latest_data[2].update({ "FM1": int.from_bytes(data[0:2], 'little') / 100.0 * 60.0, "FM2": int.from_bytes(data[2:4], 'little') / 100.0 * 60.0, "FM3": int.from_bytes(data[4:6], 'little') / 100.0 * 60.0, "FM4": int.from_bytes(data[6:8], 'little') / 100.0 * 60.0, }) elif cob_id == 0x2AE and len(data) == 6: self.latest_data[2].update({ "PS1": int.from_bytes(data[0:2], 'little') / 1000.0, "PS2": int.from_bytes(data[2:4], 'little') / 1000.0, "PS3": int.from_bytes(data[4:6], 'little') / 1000.0, }) elif cob_id == 0x2AF and len(data) >= 8: self.latest_data[2].update({ "MV02_sp": int.from_bytes(data[0:2], 'little') / 100.0, "MV03_sp": int.from_bytes(data[2:4], 'little') / 100.0, "MV04_sp": int.from_bytes(data[4:6], 'little') / 100.0, "MV05_sp": int.from_bytes(data[6:8], 'little') / 100.0, }) elif cob_id == 0x2B0 and len(data) >= 8: self.latest_data[2].update({ "MV06_sp": int.from_bytes(data[0:2], 'little') / 100.0, "MV07_sp": int.from_bytes(data[2:4], 'little') / 100.0, "MV08_sp": int.from_bytes(data[4:6], 'little') / 100.0, "Pump_sp": int.from_bytes(data[6:8], 'little') / 100.0, }) elif cob_id == 0x2B1 and len(data) >= 7: data = list(data) self.latest_data[2].update({ "MV02": 100 * data[0] / 255, "MV03": 100 * data[1] / 255, "MV04": 100 * data[2] / 255, "MV05": 100 * data[3] / 255, "MV06": 100 * data[4] / 255, "MV07": 100 * data[5] / 255, "MV08": 100 * data[6] / 255, }) # elif cob_id == 0x1B9 and len(data) >= 6: # self.latest_data[1].update({ # "Conductivity_Feed": int.from_bytes(data[0:2], 'little'), # "Conductivity_Permeate": int.from_bytes(data[2:4], 'little'), # "Conductivity_Product": int.from_bytes(data[4:6], 'little'), # }) # elif cob_id == 0x1BA and len(data) >= 6: # self.latest_data[1].update({ # "Temperature_Feed": int.from_bytes(data[0:2], 'little'), # "Temperature_Permeate": int.from_bytes(data[2:4], 'little'), # "Temperature_Product": int.from_bytes(data[4:6], 'little'), # }) # elif cob_id == 0x2B2 and len(data) >= 1: # self.latest_data[2]["PU2_STATE"] = data[0] elif cob_id == 0x2B2 and len(data) >= 7: self.latest_data[2].update({ "PU2_STATE" : data[0], "Conductivity_Feed" : int.from_bytes(data[1:3], 'little') / 100.0, "Conductivity_Permeate": int.from_bytes(data[3:5], 'little') / 100.0, "Conductivity_Product" : int.from_bytes(data[5:7], 'little') / 100.0, }) print("Conductivity_Product",int.from_bytes(data[5:7], 'little') / 100.0) # ========== Docking Parameters ========== elif cob_id == 0x2AC and len(data) >= 8: self.latest_data[0].update({ "Qdrain_sp": int.from_bytes(data[4:6], 'little'), "TankLevel": int.from_bytes(data[6:8], 'little'), }) elif cob_id == 0x2B3 and len(data) >= 8: self.latest_data[0].update({ "Inlet_flow": int.from_bytes(data[0:2], 'little') / 100.0, "Outlet_flow": int.from_bytes(data[2:4], 'little') / 100.0, "Pressure_perm": int.from_bytes(data[4:6], 'little') / 1000.0, "Pressure_ro": int.from_bytes(data[6:8], 'little') / 1000.0, }) # # ========== PU1 DRIFT CHECK ========== # if cob_id in (0x2A6, 0x2A8): # FM1 or MV03_sp updates for PU1 # mv03_sp = self.latest_data[1].get("MV03_sp") # qdrain = self.latest_data[1].get("FM1") # if mv03_sp is not None and qdrain is not None: # if mv03_sp <= 0 or qdrain <= 0: # print(f"🔇 Skipping PU1 drift check (idle) → MV03_sp: {mv03_sp:.2f}, Qdrain: {qdrain:.2f}") # elif detect_mv03_drift(mv03_sp, qdrain): # print(f"⚠️ Drift detected on PU1 → MV03_sp: {mv03_sp:.2f} vs Qdrain: {qdrain:.2f}") # else: # print(f"✅ No drift on PU1 → MV03_sp: {mv03_sp:.2f} vs Qdrain: {qdrain:.2f}") # # ========== PU2 DRIFT CHECK ========== # if cob_id in (0x2AD, 0x2AF): # FM1 or MV03_sp updates for PU2 # mv03_sp = self.latest_data[2].get("MV03_sp") # qdrain = self.latest_data[2].get("FM1") # if mv03_sp is not None and qdrain is not None: # if mv03_sp <= 0 or qdrain <= 0: # print(f"🔇 Skipping PU2 drift check (idle) → MV03_sp: {mv03_sp:.2f}, Qdrain: {qdrain:.2f}") # elif detect_mv03_drift(mv03_sp, qdrain): # print(f"⚠️ Drift detected on PU2 → MV03_sp: {mv03_sp:.2f} vs Qdrain: {qdrain:.2f}") # else: # print(f"✅ No drift on PU2 → MV03_sp: {mv03_sp:.2f} vs Qdrain: {qdrain:.2f}") except Exception as e: print(f"[TPDO PARSE ERROR] {e}") def get_latest_data(self, pu_number: int): """ Retrieve the latest real-time data for the given PU. :param pu_number: 1 or 2 :return: Dictionary of flow, pressure, valve data """ with self.lock: return self.latest_data.get(pu_number, {}).copy() def read_current_state(self, pu_number: int): """ Get the system mode (decoded string) of the given PU. :param pu_number: 1 or 2 :return: State name or "Offline" """ state_val = self.latest_data.get(pu_number, {}).get(f"PU{pu_number}_STATE") return self.decode_state(state_val) if state_val is not None else "Offline" def decode_state(self, state_val: int) -> str: """ Convert system state integer to human-readable label. :param state_val: Integer value from TPDO :return: String state name """ state_map = { 0: "SYSTEM_MODE_INIT", 1: "SYSTEM_MODE_OFF", 2: "SYSTEM_MODE_READY", 3: "SYSTEM_MODE_PRODUCTION", 4: "SYSTEM_MODE_LOW_LOOP_PRESSURE", 5: "SYSTEM_MODE_LOOP_CLEANING", 6: "SYSTEM_MODE_HEATING_RO", 7: "SYSTEM_MODE_RINSING_RO", 8: "SYSTEM_MODE_HEATING_EDI", 9: "SYSTEM_MODE_COOLING_EDI", 10: "SYSTEM_MODE_RO_FLUSH", 11: "SYSTEM_MODE_RO_RINSE", 12: "SYSTEM_MODE_EDI_RINSE", 15: "SYSTEM_MODE_FAIL_SAFE", 16: "SYSTEM_MODE_FIRST_FLUSH", 255: "SYSTEM_MODE_DEFAULT" } return state_map.get(state_val, f"UNKNOWN({state_val})") def send_state_command(self, state: str, pu_number: int, ploop_setpoint: float): """ Send the PU state and pressure loop setpoint to the master node. :param state: State string (e.g., "PRODUCTION") :param pu_number: PU1 or PU2 :param ploop_setpoint: Float setpoint in bar (will be scaled) """ if not self.connected: raise RuntimeError("CAN not connected") state_map = { "IDLE": 1, "PRE-PRODUCTION": 2, "PRODUCTION": 3, "MAINTENANCE": 8, "EMERGENCY_STOP": 9, "FIRST_START": 10 } if state not in state_map: raise ValueError(f"Invalid state: {state}") try: master_node = self.nodes.get(0) if master_node is None: raise ValueError("Master node not connected") state_index = 0x3000 setpoint_index = 0x3001 print(f"[DEBUG] Writing state {state_map[state]} to master OD 0x{state_index:04X}:{pu_number:02X}") master_node.sdo[state_index][pu_number].raw = state_map[state] & 0xFF print(f"[DEBUG] Writing ploop_setpoint {ploop_setpoint} to master OD 0x{setpoint_index:04X}:{pu_number:02X}") master_node.sdo[setpoint_index][1].raw = int(ploop_setpoint * 100) except Exception as e: print(f"[MASTER SDO WRITE ERROR] PU{pu_number}: {e}") raise def send_thermal_loop_cleaning(self, mode: str, pu_number: int): """ Activate or deactivate thermal loop cleaning mode. :param mode: "IDLE" or "ACTIVE" :param pu_number: PU1 or PU2 """ if not self.connected: raise RuntimeError("CAN not connected") mode_map = { "IDLE": 0, "ACTIVE": 1 } if mode not in mode_map: raise ValueError(f"Invalid thermal loop mode: {mode}") try: node = self.nodes.get(pu_number) if node is None: raise ValueError(f"PU{pu_number} not connected") print(f"[DEBUG] Sending thermal loop mode {mode} to 0x2024:{pu_number}") node.sdo[0x2024][pu_number].raw = mode_map[mode] except Exception as e: print(f"[THERMAL LOOP ERROR] PU{pu_number}: {e}") raise