# serial_manager.py import threading import time import csv from collections import deque from dataclasses import dataclass from typing import Any, Callable, Deque, Dict, List, Optional, Tuple import serial # provided by python3-serial @dataclass class SerialConfig: """ Configuration for the read-only serial intake. """ port: str = "/dev/ttyUSB0" baudrate: int = 115200 bytesize: int = serial.EIGHTBITS parity: str = serial.PARITY_NONE stopbits: int = serial.STOPBITS_ONE timeout: float = 0.05 rtscts: bool = False dsrdtr: bool = False xonxoff: bool = False ring_capacity: int = 5000 # If set, a single "generic" CSV will be written here (append mode). # If you want segregated CSVs per message type, leave this as None and # supply an `on_message` callback that writes where you want. csv_log_path: Optional[str] = None # e.g. "/home/pi/hmi/serial_log.csv" class SerialStore: """ Thread-safe store for recent parsed messages and intake stats. Stores parsed dicts as returned by the decoder. """ def __init__(self, capacity: int): self._buf: Deque[Dict[str, Any]] = deque(maxlen=capacity) self._lock = threading.Lock() self._stats = { "frames_in": 0, "frames_ok": 0, "frames_bad": 0, "restarts": 0, "last_err": "", } self._latest_by_id: Dict[str, Dict[str, Any]] = {} def add(self, msg: Dict[str, Any], ok: bool = True): with self._lock: self._buf.append(msg) self._stats["frames_in"] += 1 if ok: self._stats["frames_ok"] += 1 else: self._stats["frames_bad"] += 1 mid = msg.get("msg_id") if mid: self._latest_by_id[mid] = msg def latest(self, n: int = 100) -> List[Dict[str, Any]]: with self._lock: return list(self._buf)[-n:] def latest_by_id(self) -> Dict[str, Dict[str, Any]]: with self._lock: return dict(self._latest_by_id) def stats(self) -> Dict[str, Any]: with self._lock: return dict(self._stats) def set_error(self, err: str): with self._lock: self._stats["last_err"] = err def inc_restart(self): with self._lock: self._stats["restarts"] += 1 class SerialReader: """ Background read-only serial reader. Args: cfg: SerialConfig store: SerialStore decoder: function(buffer: bytes) -> (messages: List[Tuple[raw_frame: bytes, parsed: Dict]], remaining: bytes, errors: int) on_message: optional callback called for each parsed dict (e.g., segregated CSV logger) """ def __init__( self, cfg: SerialConfig, store: SerialStore, decoder: Callable[[bytes], Tuple[List[Tuple[bytes, Dict[str, Any]]], bytes, int]], on_message: Optional[Callable[[Dict[str, Any]], None]] = None, ): self.cfg = cfg self.store = store self.decoder = decoder self.on_message = on_message self._ser: Optional[serial.Serial] = None self._th: Optional[threading.Thread] = None self._stop = threading.Event() self._buffer = b"" # Optional generic CSV (single file) if cfg.csv_log_path is set self._csv_file = None self._csv_writer = None # ---------- lifecycle ---------- def start(self): self._stop.clear() self._open_serial() self._open_csv() self._th = threading.Thread(target=self._run, name="SerialReader", daemon=True) self._th.start() def stop(self): self._stop.set() if self._th and self._th.is_alive(): self._th.join(timeout=2.0) self._close_serial() self._close_csv() # ---------- internals ---------- def _open_serial(self): try: self._ser = serial.Serial( port=self.cfg.port, baudrate=self.cfg.baudrate, bytesize=self.cfg.bytesize, parity=self.cfg.parity, stopbits=self.cfg.stopbits, timeout=self.cfg.timeout, rtscts=self.cfg.rtscts, dsrdtr=self.cfg.dsrdtr, xonxoff=self.cfg.xonxoff, ) except Exception as e: self.store.set_error(f"Open error: {e}") self._ser = None def _close_serial(self): try: if self._ser and self._ser.is_open: self._ser.close() except Exception: pass self._ser = None def _open_csv(self): if not self.cfg.csv_log_path: return try: self._csv_file = open(self.cfg.csv_log_path, "a", newline="") self._csv_writer = csv.writer(self._csv_file) # Write header only if file is empty (avoid duplicates on restart) if self._csv_file.tell() == 0: self._csv_writer.writerow(["ts_ms", "msg_id", "raw_hex", "parsed"]) self._csv_file.flush() except Exception as e: self.store.set_error(f"CSV open error: {e}") self._csv_file = None self._csv_writer = None def _close_csv(self): try: if self._csv_file: self._csv_file.close() except Exception: pass self._csv_file = None self._csv_writer = None def _log_csv(self, raw: bytes, parsed: Dict[str, Any]): """Write to the optional single generic CSV.""" if not self._csv_writer: return try: self._csv_writer.writerow( [parsed.get("ts_ms"), parsed.get("msg_id"), raw.hex(), parsed] ) self._csv_file.flush() except Exception as e: self.store.set_error(f"CSV write error: {e}") def _run(self): backoff = 0.5 while not self._stop.is_set(): if not self._ser or not self._ser.is_open: # reconnect with exponential backoff (capped) self._close_serial() time.sleep(backoff) self.store.inc_restart() self._open_serial() backoff = min(backoff * 1.5, 5.0) continue backoff = 0.5 try: data = self._ser.read(4096) # non-blocking due to timeout if data: self._buffer += data frames, remaining, errors = self.decoder(self._buffer) self._buffer = remaining for raw, parsed in frames: # store self.store.add(parsed, ok=True) # optional generic CSV self._log_csv(raw, parsed) # optional segregated sink if self.on_message: try: self.on_message(parsed) except Exception as e: self.store.set_error(f"CSV sink error: {e}") # count decode errors for _ in range(errors): self.store.add({"error": "decode"}, ok=False) else: time.sleep(0.01) except Exception as e: self.store.set_error(f"Read/Decode error: {e}") self._close_serial() time.sleep(0.5)