From 2446fb7b592b1292d28600ce78454e4e3d95b2cf Mon Sep 17 00:00:00 2001 From: aniketSaha Date: Thu, 14 Aug 2025 11:27:20 +0200 Subject: [PATCH] Added serial communication and removed obsolete files --- main.py | 154 +++++------- protocol_decoder.py | 94 +++++++ serial_csv_logger.py | 120 +++++++++ serial_manager.py | 234 ++++++++++++++++++ update_hmi.sh | 12 - .../analyze_from_csv.m | 0 .../download_recordings | 0 valveBackend.py | 30 --- 8 files changed, 513 insertions(+), 131 deletions(-) create mode 100644 protocol_decoder.py create mode 100644 serial_csv_logger.py create mode 100644 serial_manager.py delete mode 100644 update_hmi.sh rename analyze_from_csv.m => utils/analyze_from_csv.m (100%) rename download_recordings => utils/download_recordings (100%) delete mode 100644 valveBackend.py diff --git a/main.py b/main.py index d9f891f..4addd27 100644 --- a/main.py +++ b/main.py @@ -5,9 +5,7 @@ import logging import os from fastapi import Request, APIRouter import platform -from fastapi.templating import ( - Jinja2Templates, -) # pip install fastapi uvicorn jinja2 python-multipart passlib +from fastapi.templating import Jinja2Templates from starlette.middleware.sessions import SessionMiddleware from starlette.exceptions import HTTPException as StarletteHTTPException from starlette.status import HTTP_302_FOUND @@ -23,14 +21,15 @@ import numpy as np import aiohttp import httpx +from serial_manager import SerialConfig, SerialStore, SerialReader +from protocol_decoder import decode_frames +from serial_csv_logger import SerialCsvLogger # <-- CSV logger + if platform.system() in ["Darwin"]: # macOS or Windows from MockCAN import CANBackend - logging.basicConfig(level=logging.INFO) - else: from classCAN import CANBackend # Your real backend - logging.basicConfig(level=logging.INFO) logging.getLogger("uvicorn.access").setLevel(logging.WARNING) @@ -67,10 +66,12 @@ write_buffer = deque() flush_interval = 1.0 # flush every 1 second last_flush_time = datetime.datetime.now() +# ---- Serial intake globals ---- +serial_store = SerialStore(capacity=5000) +serial_reader: SerialReader | None = None +serial_csv: SerialCsvLogger | None = None # <-- added ## LOGGING - - def format_PU_data(data): return { "timestamp": datetime.datetime.now().isoformat(), @@ -78,8 +79,7 @@ def format_PU_data(data): "Qdilute": np.round(data.get("FM1", 0.0), 1), "Qdrain": np.round(data.get("FM4", 0.0), 1), "Qrecirc": np.round(data.get("FM3", 0.0), 1), - "QdrainEDI": np.round(data.get("FM2", 0.0), 1) - - np.round(data.get("FM1", 0.0), 1), + "QdrainEDI": np.round(data.get("FM2", 0.0), 1) - np.round(data.get("FM1", 0.0), 1), "Pro": np.round(data.get("PS2", 0.0), 2), "Pdilute": np.round(data.get("PS3", 0.0), 2), "Pretentate": np.round(data.get("PS1", 0.0), 2), @@ -102,7 +102,6 @@ def format_PU_data(data): "MV08_sp": np.round(data.get("MV08_sp", 0.0), 1), } - def format_DS_data(data): return { "timestamp": datetime.datetime.now().isoformat(), @@ -112,10 +111,7 @@ def format_DS_data(data): "Qoutlet": np.round(data.get("Outlet_flow", 0.0), 1), } - # CREDENTIALS - -# Load users from JSON file at startup CREDENTIAL_PATH = Path("credentials.json") if CREDENTIAL_PATH.exists(): with CREDENTIAL_PATH.open("r") as f: @@ -126,7 +122,6 @@ else: USERNAME = CREDENTIALS["username"] PASSWORD = CREDENTIALS["password"] - # ======== LOGIN & SESSION HANDLING ======== def require_login(request: Request): user = request.session.get("user") @@ -135,12 +130,10 @@ def require_login(request: Request): raise StarletteHTTPException(status_code=302, detail="Redirect to login") return user - @app.get("/", response_class=HTMLResponse) def login_form(request: Request): return templates.TemplateResponse("login.html", {"request": request}) - @app.post("/login") def login(request: Request, username: str = Form(...), password: str = Form(...)): if username == USERNAME and password == PASSWORD: @@ -150,22 +143,46 @@ def login(request: Request, username: str = Form(...), password: str = Form(...) "login.html", {"request": request, "error": "Invalid credentials.json"} ) - @app.get("/logout") def logout(request: Request): request.session.clear() return RedirectResponse("/", status_code=HTTP_302_FOUND) - -# ======== PROTECTED INTERFACE ======== - - +# ======== PROTECTED INTERFACE / STARTUP-SHUTDOWN ======== @app.on_event("startup") async def startup_event(): + # ----- CSV logger ----- + global serial_csv + serial_csv = SerialCsvLogger(out_dir="serial_logs", rotate_daily=True) + + # ----- start the serial reader ----- + global serial_reader + cfg = SerialConfig( + port=os.getenv("SERIAL_PORT", "/dev/ttyUSB0"), + baudrate=int(os.getenv("SERIAL_BAUD", "115200")), + csv_log_path=None, # disable the generic CSV inside reader; use segregated logger instead + ring_capacity=int(os.getenv("SERIAL_RING", "5000")), + ) + serial_reader = SerialReader( + cfg, + serial_store, + decoder=decode_frames, + on_message=(lambda p: serial_csv.log(p)) # write CSV per message type + ) + serial_reader.start() + + # ----- your existing tasks ----- asyncio.create_task(update_latest_data()) asyncio.create_task(update_latest_flow()) +@app.on_event("shutdown") +def _serial_stop(): + if serial_reader: + serial_reader.stop() + if serial_csv: + serial_csv.close() +# ======== PAGES ======== @app.get("/control", response_class=HTMLResponse) def control_page(request: Request): can_backend.connect() @@ -173,28 +190,35 @@ def control_page(request: Request): return RedirectResponse("/", status_code=HTTP_302_FOUND) return templates.TemplateResponse("control.html", {"request": request}) - @app.get("/monitor-DS", response_class=HTMLResponse) def monitor_page(request: Request): with open("static/monitor_DS.html") as f: return HTMLResponse(f.read()) - @app.get("/monitor-PU", response_class=HTMLResponse) def monitor_page(request: Request): with open("static/monitor_PU.html") as f: return HTMLResponse(f.read()) - @app.get("/multi-monitor-PU", response_class=HTMLResponse) def monitor_page(request: Request): with open("static/multi_pu_dashboard.html") as f: return HTMLResponse(f.read()) +# ======== SERIAL API ======== +@app.get("/serial/messages") +def serial_messages(n: int = 100): + return serial_store.latest(min(max(n, 1), 1000)) + +@app.get("/serial/stats") +def serial_stats(): + return serial_store.stats() + +@app.get("/serial/snapshot") +def serial_snapshot(): + return serial_store.latest_by_id() # ======== CAN + BACKEND ROUTES ======== - - @app.post("/connect_toggle") def connect_toggle(): logging.info(f"Toggling CAN connection, CAN is {can_backend.connected}") @@ -209,15 +233,11 @@ def connect_toggle(): raise HTTPException(status_code=500, detail="Connection failed.") return {"connected": can_backend.connected} - @app.get("/is_connected") def is_connected(): return {"connected": can_backend.connected} - # PU CONTROL - - @app.post("/command/{state}/pu/{pu_number}") def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)): global DEFAULT_FEED_VALVE @@ -262,12 +282,7 @@ def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) - - - ## MONITORING - - @app.get("/api/pu_status") def get_pu_status(): global active_PUs @@ -287,13 +302,12 @@ def get_pu_status(): return JSONResponse(content=states) - async def update_latest_data(): global active_PUs while True: # DS data = can_backend.get_latest_data(pu_number=0) - latest_data[f"DS"] = format_DS_data(data) + latest_data["DS"] = format_DS_data(data) # PUs for pu in active_PUs: @@ -303,13 +317,11 @@ async def update_latest_data(): logging.debug(f"[MONITOR DS BUFFER] latest_data: {latest_data}") await asyncio.sleep(0.05) - @app.get("/monitor") async def get_monitor_data(): return latest_data # return JSONResponse(content=latest_data) - # LOCAL RECORDER @app.post("/start_recording") async def start_recording(): @@ -324,11 +336,11 @@ async def start_recording(): filepath = os.path.join("recordings", filename) recording_file = open(filepath, "w", newline="") - fieldnames_common = ["timestamp", "pu", "QSkid"] + fieldnames_common = ["timestamp", "pu", "QSkid"] fieldnames_DS = list(format_DS_data({}).keys()) - fieldnames_DS.pop(0) # .pop(0) removing extra timestamp + fieldnames_DS.pop(0) # removing extra timestamp fieldnames_PUs = list(format_PU_data({}).keys()) - fieldnames_PUs.pop(0) # .pop(0) removing extra timestamp + fieldnames_PUs.pop(0) # removing extra timestamp fieldnames = fieldnames_common + fieldnames_DS + fieldnames_PUs @@ -340,7 +352,6 @@ async def start_recording(): logging.info(f"[RECORDING STARTED] File: {filepath}") return {"status": "recording started", "file": filename} - @app.post("/stop_recording") async def stop_recording(): global recording_flag, recording_task, recording_file @@ -360,7 +371,6 @@ async def stop_recording(): logging.info("[RECORDING STOPPED]") return {"status": "recording stopped"} - async def record_data_loop(): global recording_writer, recording_file, write_buffer, last_flush_time @@ -372,18 +382,13 @@ async def record_data_loop(): recording_writer.writerow(row) # Flush every flush_interval seconds - if ( - datetime.datetime.now() - last_flush_time - ).total_seconds() >= flush_interval: + if (datetime.datetime.now() - last_flush_time).total_seconds() >= flush_interval: recording_file.flush() last_flush_time = datetime.datetime.now() await asyncio.sleep(0.05) # 10 Hz - ## AUTOMATIC TESTING - - async def send_command_with_delay( state: str, pu: int, delay_s: int = 0, ploop_setpoint: float = 0.0 ): @@ -391,15 +396,11 @@ async def send_command_with_delay( logging.info(f"[AUTO TEST] Sending {state} to PU{pu} after {delay_s}s") can_backend.send_state_command(state, pu, ploop_setpoint) - async def set_patients_with_delay(count: int, delay_s: int): await asyncio.sleep(delay_s) - logging.info( - f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s" - ) + logging.info(f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s") set_patient_skid_users(count) - @router.post("/test/auto/1") async def auto_test_pu1(ploop_setpoint: float = Query(0.0)): pu = 1 @@ -407,52 +408,39 @@ async def auto_test_pu1(ploop_setpoint: float = Query(0.0)): asyncio.create_task(run_auto_test_pu1(pu, ploop_setpoint)) return {"status": "started", "pu": pu} - @router.post("/test/auto/2") async def auto_test_pu2(ploop_setpoint: float = Query(0.0)): logging.info("[AUTO TEST] Starting automatic test for 2 PUs") asyncio.create_task(run_auto_test_pu2(ploop_setpoint)) return {"status": "started", "pu": [1, 2]} - async def run_auto_test_pu1(pu: int, ploop_setpoint: float): - await send_command_with_delay( - "PRE-PRODUCTION", pu, delay_s=0, ploop_setpoint=ploop_setpoint - ) - await send_command_with_delay( - "PRODUCTION", pu, delay_s=180, ploop_setpoint=ploop_setpoint - ) + await send_command_with_delay("PRE-PRODUCTION", pu, delay_s=0, ploop_setpoint=ploop_setpoint) + await send_command_with_delay("PRODUCTION", pu, delay_s=180, ploop_setpoint=ploop_setpoint) await set_patients_with_delay(5, delay_s=60) await set_patients_with_delay(10, delay_s=60) await send_command_with_delay("IDLE", pu, delay_s=60, ploop_setpoint=ploop_setpoint) logging.info("[AUTO TEST] Finished PU1 test") - async def run_auto_test_pu2(ploop_setpoint: float): # Step 1: Run PU1 test await run_auto_test_pu1(1, ploop_setpoint) # Step 2: PU2 sequence - await send_command_with_delay( - "PRE-PRODUCTION", 2, delay_s=0, ploop_setpoint=ploop_setpoint - ) - await send_command_with_delay( - "PRODUCTION", 2, delay_s=180, ploop_setpoint=ploop_setpoint - ) + await send_command_with_delay("PRE-PRODUCTION", 2, delay_s=0, ploop_setpoint=ploop_setpoint) + await send_command_with_delay("PRODUCTION", 2, delay_s=180, ploop_setpoint=ploop_setpoint) await set_patients_with_delay(15, delay_s=60) await set_patients_with_delay(0, delay_s=60) await send_command_with_delay("IDLE", 2, delay_s=60, ploop_setpoint=ploop_setpoint) await send_command_with_delay("IDLE", 1, delay_s=60, ploop_setpoint=ploop_setpoint) logging.info("[AUTO TEST] Finished PU1 + PU2 test") - @router.post("/test/auto/3") async def auto_test_pu3(): # Call the function for PU3 auto test logging.info("Start auto test of 3 PU") return {"status": "started", "pu": 3} - # PATIENT SKID HELPERS async def update_latest_flow(): global active_PUs @@ -464,12 +452,10 @@ async def update_latest_flow(): latest_flow = int(data["log"]) logging.debug(f"Updated flow: {latest_flow}") latest_data["PatientSkid"]["QSkid"] = latest_flow - except Exception as e: logging.error(f"Error fetching flow: {e}") await asyncio.sleep(1.0) - def stop_patient_skid(): try: url = f"http://192.168.1.28:8000/stop_test" @@ -478,13 +464,9 @@ def stop_patient_skid(): if response.status_code == 200: return {"status": "success", "detail": response.json()} else: - raise HTTPException( - status_code=502, detail=f"Remote server error: {response.text}" - ) + raise HTTPException(status_code=502, detail=f"Remote server error: {response.text}") except httpx.RequestError as e: - raise HTTPException( - status_code=500, detail=f"Request to external server failed: {str(e)}" - ) + raise HTTPException(status_code=500, detail=f"Request to external server failed: {str(e)}") def set_patient_skid_users(count: int = 1): try: @@ -494,20 +476,14 @@ def set_patient_skid_users(count: int = 1): if response.status_code == 200: return {"status": "success", "detail": response.json()} else: - raise HTTPException( - status_code=502, detail=f"Remote server error: {response.text}" - ) + raise HTTPException(status_code=502, detail=f"Remote server error: {response.text}") except httpx.RequestError as e: - raise HTTPException( - status_code=500, detail=f"Request to external server failed: {str(e)}" - ) - + raise HTTPException(status_code=500, detail=f"Request to external server failed: {str(e)}") app.include_router(router) if __name__ == "__main__": import uvicorn - uvicorn.run( "main:app", host="127.0.0.1", diff --git a/protocol_decoder.py b/protocol_decoder.py new file mode 100644 index 0000000..a64e4fb --- /dev/null +++ b/protocol_decoder.py @@ -0,0 +1,94 @@ +from typing import Dict, Any, List, Tuple +import re + +RE_PU_VP = re.compile(r'^P(?P[1-3])VP$') +RE_PU_CO = re.compile(r'^P(?P[1-3])CO$') +RE_DOCK_VP = re.compile(r'^D0VP$') +RE_DOCK_CO = re.compile(r'^(D0CO|DOCO)$') # be tolerant + +def _to_i(s: str) -> int: + try: return int(s.strip()) + except: return 0 + +def _to_pct(s: str) -> int: + try: return int(s.strip()) + except: + try: return int(float(s)) + except: return 0 + +def _to_bool(s: str) -> bool: + return str(s).strip() in ("1","true","True","TRUE") + +def _dock_vp(vals: List[str]) -> Dict[str, Any]: + names = ["mv01","mv09","mv10","mv11","mmv01","mmv02","mmv03","sv01","sv02","sv03"] + out: Dict[str, Any] = {} + for k, v in zip(names, vals): + out[k] = _to_bool(v) if k.startswith("sv") else _to_pct(v) + return out + +def _dock_co(vals: List[str]) -> Dict[str, Any]: + out: Dict[str, Any] = {} + for name, v in zip(["cs01","cs02"], vals): + q = _to_i(v) # 0.1 µS + out[f"{name}_0p1uS"] = q + out[f"{name}_uS"] = q*0.1 + return out + +def _pu_vp(pu: int, vals: List[str]) -> Dict[str, Any]: + out: Dict[str, Any] = {"pu": pu} + for k, v in zip(["mv02","mv03","mv04","mv05","mv06","mv07","mv08"], vals): + out[k] = _to_pct(v) + return out + +def _pu_co(pu: int, vals: List[str]) -> Dict[str, Any]: + out: Dict[str, Any] = {"pu": pu} + for name, v in zip(["cs03","cs04","cs05"], vals): + q = _to_i(v) + out[f"{name}_0p1uS"] = q + out[f"{name}_uS"] = q*0.1 + return out + +def decode_frames(buffer: bytes) -> Tuple[List[Tuple[bytes, Dict[str, Any]]], bytes, int]: + msgs: List[Tuple[bytes, Dict[str, Any]]] = [] + errors = 0 + parts = buffer.split(b"\n") + remaining = parts[-1] + + for line in parts[:-1]: + raw = line.strip().rstrip(b"\r") + if not raw: continue + try: + t = raw.decode("utf-8") + fields = [f.strip() for f in t.split(",")] + if len(fields) < 3: raise ValueError("too few fields") + version, msg_id, ts_ms = fields[0], fields[1], fields[2] + data = fields[3:] + + parsed: Dict[str, Any] = {"version":version, "msg_id":msg_id, "ts_ms": int(ts_ms)} + + if RE_DOCK_VP.match(msg_id): + parsed.update({"src":"dock","type":"valves"}) + parsed.update(_dock_vp(data)) + elif RE_DOCK_CO.match(msg_id): + parsed.update({"src":"dock","type":"cond"}) + parsed.update(_dock_co(data)) + else: + m = RE_PU_VP.match(msg_id) + if m: + pu = int(m.group("pu")) + parsed.update({"src":"pu","type":"valves","pu":pu}) + parsed.update(_pu_vp(pu, data)) + else: + m = RE_PU_CO.match(msg_id) + if m: + pu = int(m.group("pu")) + parsed.update({"src":"pu","type":"cond","pu":pu}) + parsed.update(_pu_co(pu, data)) + else: + parsed.update({"src":"unknown","type":"raw","data":data}) + + msgs.append((raw, parsed)) + except Exception: + errors += 1 + + return msgs, remaining, errors diff --git a/serial_csv_logger.py b/serial_csv_logger.py new file mode 100644 index 0000000..df25574 --- /dev/null +++ b/serial_csv_logger.py @@ -0,0 +1,120 @@ +# serial_csv_logger.py +import os, csv, datetime, json +from typing import Dict, Any, Tuple, Optional + +class SerialCsvLogger: + """ + Writes parsed serial frames to CSV, segregated by message type: + - D0VP_YYYY-MM-DD.csv (Docking valves) + - D0CO_YYYY-MM-DD.csv (Docking conductivity) + - P1VP_YYYY-MM-DD.csv (PU1 valves), P2VP..., P3VP... + - P1CO_YYYY-MM-DD.csv (PU1 conductivity), etc. + - Unknown_YYYY-MM-DD.csv (for anything unmatched) + """ + def __init__(self, out_dir: str = "serial_logs", rotate_daily: bool = True): + self.out_dir = out_dir + self.rotate_daily = rotate_daily + self._writers: Dict[str, Tuple[csv.DictWriter, Any, str]] = {} # key -> (writer, file, date_str) + os.makedirs(self.out_dir, exist_ok=True) + + def close(self): + for _, (_, f, _) in self._writers.items(): + try: f.close() + except: pass + self._writers.clear() + + # ---------- public API ---------- + def log(self, parsed: Dict[str, Any]): + msg_id = parsed.get("msg_id", "Unknown") + date_str = datetime.date.today().isoformat() if self.rotate_daily else "all" + key = f"{msg_id}" + + # rotate if day changed + if key in self._writers and self._writers[key][2] != date_str: + self._writers[key][1].close() + del self._writers[key] + + writer, _, _ = self._ensure_writer(key, msg_id, date_str) + row = self._build_row(msg_id, parsed) + writer.writerow(row) + + # ---------- internals ---------- + def _ensure_writer(self, key: str, msg_id: str, date_str: str): + if key in self._writers: + return self._writers[key] + + fname = f"{msg_id}_{date_str}.csv" + path = os.path.join(self.out_dir, fname) + f = open(path, "a", newline="") + headers = self._headers_for(msg_id) + writer = csv.DictWriter(f, fieldnames=headers) + + # write header only if file is empty + if f.tell() == 0: + writer.writeheader() + + self._writers[key] = (writer, f, date_str) + return self._writers[key] + + def _headers_for(self, msg_id: str): + # Common heads + base = ["ts_iso", "ts_ms", "version", "msg_id"] + + if msg_id == "D0VP": + return base + ["mv01","mv09","mv10","mv11","mmv01","mmv02","mmv03","sv01","sv02","sv03"] + + if msg_id in ("D0CO", "DOCO"): + # write both scaled (uS) and raw (0.1 uS) for traceability + return base + ["cs01_uS","cs01_0p1uS","cs02_uS","cs02_0p1uS"] + + if msg_id.endswith("VP") and len(msg_id) == 4 and msg_id[0] == "P": + # P1VP / P2VP / P3VP + return base + ["pu","mv02","mv03","mv04","mv05","mv06","mv07","mv08"] + + if msg_id.endswith("CO") and len(msg_id) == 4 and msg_id[0] == "P": + # P1CO / P2CO / P3CO + return base + ["pu","cs03_uS","cs03_0p1uS","cs04_uS","cs04_0p1uS","cs05_uS","cs05_0p1uS"] + + # fallback + return base + ["payload_json"] + + def _build_row(self, msg_id: str, p: Dict[str, Any]) -> Dict[str, Any]: + ts_iso = datetime.datetime.fromtimestamp(p.get("ts_ms", 0)/1000.0).isoformat() if "ts_ms" in p else "" + row = {"ts_iso": ts_iso, "ts_ms": p.get("ts_ms", ""), "version": p.get("version",""), "msg_id": msg_id} + + if msg_id == "D0VP": + row.update({ + "mv01": p.get("mv01"), "mv09": p.get("mv09"), "mv10": p.get("mv10"), "mv11": p.get("mv11"), + "mmv01": p.get("mmv01"), "mmv02": p.get("mmv02"), "mmv03": p.get("mmv03"), + "sv01": p.get("sv01"), "sv02": p.get("sv02"), "sv03": p.get("sv03"), + }) + return row + + if msg_id in ("D0CO", "DOCO"): + row.update({ + "cs01_uS": p.get("cs01_uS"), "cs01_0p1uS": p.get("cs01_0p1uS"), + "cs02_uS": p.get("cs02_uS"), "cs02_0p1uS": p.get("cs02_0p1uS"), + }) + return row + + if msg_id.endswith("VP") and len(msg_id) == 4 and msg_id[0] == "P": + row.update({ + "pu": p.get("pu"), + "mv02": p.get("mv02"), "mv03": p.get("mv03"), "mv04": p.get("mv04"), + "mv05": p.get("mv05"), "mv06": p.get("mv06"), "mv07": p.get("mv07"), "mv08": p.get("mv08"), + }) + return row + + if msg_id.endswith("CO") and len(msg_id) == 4 and msg_id[0] == "P": + row.update({ + "pu": p.get("pu"), + "cs03_uS": p.get("cs03_uS"), "cs03_0p1uS": p.get("cs03_0p1uS"), + "cs04_uS": p.get("cs04_uS"), "cs04_0p1uS": p.get("cs04_0p1uS"), + "cs05_uS": p.get("cs05_uS"), "cs05_0p1uS": p.get("cs05_0p1uS"), + }) + return row + + # Unknown → keep full payload as JSON for later inspection + pay = {k:v for k,v in p.items() if k not in ("version","msg_id","ts_ms")} + row["payload_json"] = json.dumps(pay, separators=(",",":")) + return row diff --git a/serial_manager.py b/serial_manager.py new file mode 100644 index 0000000..3e8a5f4 --- /dev/null +++ b/serial_manager.py @@ -0,0 +1,234 @@ +# serial_manager.py +import threading +import time +import csv +from collections import deque +from dataclasses import dataclass +from typing import Any, Callable, Deque, Dict, List, Optional, Tuple + +import serial # provided by python3-serial + + +@dataclass +class SerialConfig: + """ + Configuration for the read-only serial intake. + """ + port: str = "/dev/ttyUSB0" + baudrate: int = 115200 + bytesize: int = serial.EIGHTBITS + parity: str = serial.PARITY_NONE + stopbits: int = serial.STOPBITS_ONE + timeout: float = 0.05 + rtscts: bool = False + dsrdtr: bool = False + xonxoff: bool = False + ring_capacity: int = 5000 + # If set, a single "generic" CSV will be written here (append mode). + # If you want segregated CSVs per message type, leave this as None and + # supply an `on_message` callback that writes where you want. + csv_log_path: Optional[str] = None # e.g. "/home/pi/hmi/serial_log.csv" + + +class SerialStore: + """ + Thread-safe store for recent parsed messages and intake stats. + Stores parsed dicts as returned by the decoder. + """ + def __init__(self, capacity: int): + self._buf: Deque[Dict[str, Any]] = deque(maxlen=capacity) + self._lock = threading.Lock() + self._stats = { + "frames_in": 0, + "frames_ok": 0, + "frames_bad": 0, + "restarts": 0, + "last_err": "", + } + self._latest_by_id: Dict[str, Dict[str, Any]] = {} + + def add(self, msg: Dict[str, Any], ok: bool = True): + with self._lock: + self._buf.append(msg) + self._stats["frames_in"] += 1 + if ok: + self._stats["frames_ok"] += 1 + else: + self._stats["frames_bad"] += 1 + mid = msg.get("msg_id") + if mid: + self._latest_by_id[mid] = msg + + def latest(self, n: int = 100) -> List[Dict[str, Any]]: + with self._lock: + return list(self._buf)[-n:] + + def latest_by_id(self) -> Dict[str, Dict[str, Any]]: + with self._lock: + return dict(self._latest_by_id) + + def stats(self) -> Dict[str, Any]: + with self._lock: + return dict(self._stats) + + def set_error(self, err: str): + with self._lock: + self._stats["last_err"] = err + + def inc_restart(self): + with self._lock: + self._stats["restarts"] += 1 + + +class SerialReader: + """ + Background read-only serial reader. + + Args: + cfg: SerialConfig + store: SerialStore + decoder: function(buffer: bytes) -> + (messages: List[Tuple[raw_frame: bytes, parsed: Dict]], remaining: bytes, errors: int) + on_message: optional callback called for each parsed dict (e.g., segregated CSV logger) + """ + def __init__( + self, + cfg: SerialConfig, + store: SerialStore, + decoder: Callable[[bytes], Tuple[List[Tuple[bytes, Dict[str, Any]]], bytes, int]], + on_message: Optional[Callable[[Dict[str, Any]], None]] = None, + ): + self.cfg = cfg + self.store = store + self.decoder = decoder + self.on_message = on_message + + self._ser: Optional[serial.Serial] = None + self._th: Optional[threading.Thread] = None + self._stop = threading.Event() + self._buffer = b"" + + # Optional generic CSV (single file) if cfg.csv_log_path is set + self._csv_file = None + self._csv_writer = None + + # ---------- lifecycle ---------- + def start(self): + self._stop.clear() + self._open_serial() + self._open_csv() + self._th = threading.Thread(target=self._run, name="SerialReader", daemon=True) + self._th.start() + + def stop(self): + self._stop.set() + if self._th and self._th.is_alive(): + self._th.join(timeout=2.0) + self._close_serial() + self._close_csv() + + # ---------- internals ---------- + def _open_serial(self): + try: + self._ser = serial.Serial( + port=self.cfg.port, + baudrate=self.cfg.baudrate, + bytesize=self.cfg.bytesize, + parity=self.cfg.parity, + stopbits=self.cfg.stopbits, + timeout=self.cfg.timeout, + rtscts=self.cfg.rtscts, + dsrdtr=self.cfg.dsrdtr, + xonxoff=self.cfg.xonxoff, + ) + except Exception as e: + self.store.set_error(f"Open error: {e}") + self._ser = None + + def _close_serial(self): + try: + if self._ser and self._ser.is_open: + self._ser.close() + except Exception: + pass + self._ser = None + + def _open_csv(self): + if not self.cfg.csv_log_path: + return + try: + self._csv_file = open(self.cfg.csv_log_path, "a", newline="") + self._csv_writer = csv.writer(self._csv_file) + # Write header only if file is empty (avoid duplicates on restart) + if self._csv_file.tell() == 0: + self._csv_writer.writerow(["ts_ms", "msg_id", "raw_hex", "parsed"]) + self._csv_file.flush() + except Exception as e: + self.store.set_error(f"CSV open error: {e}") + self._csv_file = None + self._csv_writer = None + + def _close_csv(self): + try: + if self._csv_file: + self._csv_file.close() + except Exception: + pass + self._csv_file = None + self._csv_writer = None + + def _log_csv(self, raw: bytes, parsed: Dict[str, Any]): + """Write to the optional single generic CSV.""" + if not self._csv_writer: + return + try: + self._csv_writer.writerow( + [parsed.get("ts_ms"), parsed.get("msg_id"), raw.hex(), parsed] + ) + self._csv_file.flush() + except Exception as e: + self.store.set_error(f"CSV write error: {e}") + + def _run(self): + backoff = 0.5 + while not self._stop.is_set(): + if not self._ser or not self._ser.is_open: + # reconnect with exponential backoff (capped) + self._close_serial() + time.sleep(backoff) + self.store.inc_restart() + self._open_serial() + backoff = min(backoff * 1.5, 5.0) + continue + + backoff = 0.5 + + try: + data = self._ser.read(4096) # non-blocking due to timeout + if data: + self._buffer += data + frames, remaining, errors = self.decoder(self._buffer) + self._buffer = remaining + + for raw, parsed in frames: + # store + self.store.add(parsed, ok=True) + # optional generic CSV + self._log_csv(raw, parsed) + # optional segregated sink + if self.on_message: + try: + self.on_message(parsed) + except Exception as e: + self.store.set_error(f"CSV sink error: {e}") + + # count decode errors + for _ in range(errors): + self.store.add({"error": "decode"}, ok=False) + else: + time.sleep(0.01) + + except Exception as e: + self.store.set_error(f"Read/Decode error: {e}") + self._close_serial() + time.sleep(0.5) diff --git a/update_hmi.sh b/update_hmi.sh deleted file mode 100644 index d1506cb..0000000 --- a/update_hmi.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash -set -e - -echo "[UPDATE] Pulling latest code..." -cd /home/hmi/Desktop/HMI || exit 1 -git reset --hard HEAD -git pull origin main - -echo "[RESTART] Restarting HMI service..." -sudo /bin/systemctl restart hmi.service - -echo "[DONE] HMI updated." diff --git a/analyze_from_csv.m b/utils/analyze_from_csv.m similarity index 100% rename from analyze_from_csv.m rename to utils/analyze_from_csv.m diff --git a/download_recordings b/utils/download_recordings similarity index 100% rename from download_recordings rename to utils/download_recordings diff --git a/valveBackend.py b/valveBackend.py deleted file mode 100644 index 82c7ef2..0000000 --- a/valveBackend.py +++ /dev/null @@ -1,30 +0,0 @@ -import canopen -import os - - -class ValveBackend: - def __init__(self, eds_file: str, node_id: int = 0x0F): - self.eds_file = eds_file - self.node_id = node_id - self.network = None - self.node = None - - def connect(self): - try: - self.network = canopen.Network() - self.network.connect(channel='can0', bustype='socketcan') - self.node = canopen.RemoteNode(self.node_id, self.eds_file) - self.network.add_node(self.node) - return True - except Exception as e: - print(f"[VALVE CONNECT ERROR] {e}") - return False - - def send_command(self, opening: int): - try: - if self.node is None: - raise RuntimeError("Valve node not initialized") - self.node.sdo[0x6000].raw = opening - print(f"[VALVE] Opening set to {opening}") - except Exception as e: - print(f"[VALVE CMD ERROR] {e}")