# can_backend.py import os, time, threading from collections import deque import can, canopen import csv import asyncio from pathlib import Path import logging import numpy as np from crankshaft_functions import * logging.basicConfig(level=logging.INFO) class ValveControllerCAN: VALVE_NODE_ID = 0x05 SETPOINT_INDEX = 0x2002 SPEED_INDEDX = 0x3001 EDS_FILENAME = "motorcontrollervalve.eds" def __init__(self, eds_file: str | None = None): self.connected = False self.lock = threading.Lock() self.listener_active = False self.listener_thread = None self.network = None self.valve_node = None self.bus = None # in __init__ self.notifier = None self.latest = { "setpoint": 0.0, "position": 0.0, "timestamp": None, "speed_sp": None, "previous_position": 0.0, "speed": None, "Kp_term": None, "Ki_term": None, "Kd_term": None, "cmd": None, } self.data_window = deque(maxlen=1000) base = os.path.dirname(__file__) self.eds_path = eds_file or os.path.join(base, "eds_file", self.EDS_FILENAME) self.FEEDBACK_COB_ID = 0x180 + self.VALVE_NODE_ID # TPDO1 self.FEEDBACK_CMD_COB_ID = 0x280 + self.VALVE_NODE_ID # TPDO1 self.running = False self._record_task = None self._update_task = None self.csv_path = Path("recorded_data.csv") def connect(self) -> bool: if self.connected: return True try: self.network = canopen.Network() self.network.connect( interface="pcan", channel="PCAN_USBBUS1", bitrate=250000 ) self.valve_node = canopen.RemoteNode(self.VALVE_NODE_ID, self.eds_path) self.network.add_node(self.valve_node) try: self.valve_node.nmt.state = "OPERATIONAL" except Exception: pass # Add your callback to the network's notifier self.network.notifier.add_listener(self._feedback_loop) self.connected = True return True except Exception as e: logging.debug(f"[CONNECT ERROR] {e}") self.connected = False return False def disconnect(self): self.listener_active = False if self.listener_thread and self.listener_thread.is_alive(): self.listener_thread.join(timeout=1.0) if self.bus: try: self.bus.shutdown() except Exception: pass self.bus = None if self.network: try: self.network.disconnect() except Exception: pass self.network, self.valve_node = None, None self.connected = False def send_PID_coeffs(self, kp : float, ki : float, tol : float) -> bool: if not (self.connected and self.valve_node): logging.debug("[SPEED] Not connected") return False try: self.valve_node.sdo[self.SPEED_INDEDX][1].raw = kp * 100 self.valve_node.sdo[self.SPEED_INDEDX][2].raw = ki * 100 self.valve_node.sdo[self.SPEED_INDEDX][5].raw = tol * 1000 return True except Exception as e: logging.debug(f"[SDO WRITE ERROR] {e}") return False def send_speed(self, value: float) -> bool: if not (self.connected and self.valve_node): logging.debug("[SPEED] Not connected") return False try: scaled_value = value * 127 self.valve_node.sdo[self.SPEED_INDEDX][3].raw = scaled_value with self.lock: self.latest["speed_sp"] = value return True except Exception as e: logging.debug(f"[SDO WRITE ERROR] {e}") return False def send_setpoint(self, value: float) -> bool: if not (self.connected and self.valve_node): logging.debug("[SETPOINT] Not connected") return False try: scaled_value = round((int(value) * 10)) self.valve_node.sdo[self.SETPOINT_INDEX].raw = scaled_value with self.lock: self.latest["setpoint"] = value return True except Exception as e: logging.debug(f"[SDO WRITE ERROR] {e}") return False def _feedback_loop(self, msg: can.Message): self._handle_tpdo(msg) def _handle_tpdo(self, msg: can.Message): # Debug print so you SEE frames arriving logging.debug( f"[TPDO] {msg.arbitration_id:03X} [{len(msg.data)}]: {' '.join(f'{b:02X}' for b in msg.data)}" ) cob_id = msg.arbitration_id data = bytes(msg.data) if len(data) >= 2 and cob_id == self.FEEDBACK_COB_ID: raw_position = int.from_bytes(data[0:2], "little", signed=False) # Scale 0–255 → 0–100 position = raw_position / 10 now = time.time() with self.lock: previous_position = self.latest["position"] self.latest.update({"position": position, "timestamp": now}) sp = self.latest["setpoint"] speed_sp = self.latest["speed_sp"] speed = min(np.round(self.latest["position"] - previous_position, 1), 2) self.latest["speed"] = speed #print(self.latest["cmd"]) # theta = find_theta(14*self.latest["position"]/100, theta0 = 0.0) # print('theta',theta) self.data_window.append( ( now, position, sp, speed, speed_sp, self.latest["Kp_term"], self.latest["Ki_term"], self.latest["Kd_term"], self.latest["cmd"], ) ) if len(data) >= 8 and cob_id == self.FEEDBACK_CMD_COB_ID: now = time.time() with self.lock: self.latest.update( { "Kp_term": int.from_bytes(data[0:2], "little") / 10000.0, "Ki_term": int.from_bytes(data[2:4], "little") / 10000.0, "Kd_term": int.from_bytes(data[4:6], "little") / 10000.0, "cmd": int.from_bytes(data[4:6], "little") / 10000.0, } ) self.data_window.append( ( now, self.latest["position"], self.latest["setpoint"], self.latest["speed"], self.latest["speed_sp"], self.latest["Kp_term"], self.latest["Ki_term"], self.latest["Kd_term"], ) ) def get_feedback(self) -> dict: with self.lock: return dict(self.latest) def ensure_connected(self) -> bool: return self.connected or self.connect() async def record_loop(self): self.running = True with self.csv_path.open("w", newline="") as f: writer = csv.writer(f) writer.writerow(["timestamp", "position", "setpoint", "speed", "speed_sp","Kp_term", "Ki_term", "Kd_term" ]) while self.running: if self.data_window: # print( self.data_window[-1]) t, pos, sp, speed, speed_sp, Kp_term, Ki_term, Kd_term = self.data_window[-1] writer.writerow([t, pos, sp, speed, speed_sp, Kp_term, Ki_term, Kd_term ]) await asyncio.sleep(0.02) # match 20Hz async def stop_recording(self): self.running = False if self._record_task: await self._record_task self._record_task = None