# serial_csv_logger.py import os, csv, datetime, json from typing import Dict, Any, Tuple, Optional class SerialCsvLogger: """ Writes parsed serial frames to CSV, segregated by message type: - D0VP_YYYY-MM-DD.csv (Docking valves) - D0CO_YYYY-MM-DD.csv (Docking conductivity) - P1VP_YYYY-MM-DD.csv (PU1 valves), P2VP..., P3VP... - P1CO_YYYY-MM-DD.csv (PU1 conductivity), etc. - Unknown_YYYY-MM-DD.csv (for anything unmatched) """ def __init__(self, out_dir: str = "serial_logs", rotate_daily: bool = True): self.out_dir = out_dir self.rotate_daily = rotate_daily self._writers: Dict[str, Tuple[csv.DictWriter, Any, str]] = {} # key -> (writer, file, date_str) os.makedirs(self.out_dir, exist_ok=True) def close(self): for _, (_, f, _) in self._writers.items(): try: f.close() except: pass self._writers.clear() # ---------- public API ---------- def log(self, parsed: Dict[str, Any]): msg_id = parsed.get("msg_id", "Unknown") date_str = datetime.date.today().isoformat() if self.rotate_daily else "all" key = f"{msg_id}" # rotate if day changed if key in self._writers and self._writers[key][2] != date_str: self._writers[key][1].close() del self._writers[key] writer, _, _ = self._ensure_writer(key, msg_id, date_str) row = self._build_row(msg_id, parsed) writer.writerow(row) # ---------- internals ---------- def _ensure_writer(self, key: str, msg_id: str, date_str: str): if key in self._writers: return self._writers[key] fname = f"{msg_id}_{date_str}.csv" path = os.path.join(self.out_dir, fname) f = open(path, "a", newline="") headers = self._headers_for(msg_id) writer = csv.DictWriter(f, fieldnames=headers) # write header only if file is empty if f.tell() == 0: writer.writeheader() self._writers[key] = (writer, f, date_str) return self._writers[key] def _headers_for(self, msg_id: str): # Common heads base = ["ts_iso", "ts_ms", "version", "msg_id"] if msg_id == "D0VP": return base + ["mv01","mv09","mv10","mv11","mmv01","mmv02","mmv03","sv01","sv02","sv03"] if msg_id in ("D0CO", "DOCO"): # write both scaled (uS) and raw (0.1 uS) for traceability return base + ["cs01_uS","cs01_0p1uS","cs02_uS","cs02_0p1uS"] if msg_id.endswith("VP") and len(msg_id) == 4 and msg_id[0] == "P": # P1VP / P2VP / P3VP return base + ["pu","mv02","mv03","mv04","mv05","mv06","mv07","mv08"] if msg_id.endswith("CO") and len(msg_id) == 4 and msg_id[0] == "P": # P1CO / P2CO / P3CO return base + ["pu","cs03_uS","cs03_0p1uS","cs04_uS","cs04_0p1uS","cs05_uS","cs05_0p1uS"] # fallback return base + ["payload_json"] def _build_row(self, msg_id: str, p: Dict[str, Any]) -> Dict[str, Any]: ts_iso = datetime.datetime.fromtimestamp(p.get("ts_ms", 0)/1000.0).isoformat() if "ts_ms" in p else "" row = {"ts_iso": ts_iso, "ts_ms": p.get("ts_ms", ""), "version": p.get("version",""), "msg_id": msg_id} if msg_id == "D0VP": row.update({ "mv01": p.get("mv01"), "mv09": p.get("mv09"), "mv10": p.get("mv10"), "mv11": p.get("mv11"), "mmv01": p.get("mmv01"), "mmv02": p.get("mmv02"), "mmv03": p.get("mmv03"), "sv01": p.get("sv01"), "sv02": p.get("sv02"), "sv03": p.get("sv03"), }) return row if msg_id in ("D0CO", "DOCO"): row.update({ "cs01_uS": p.get("cs01_uS"), "cs01_0p1uS": p.get("cs01_0p1uS"), "cs02_uS": p.get("cs02_uS"), "cs02_0p1uS": p.get("cs02_0p1uS"), }) return row if msg_id.endswith("VP") and len(msg_id) == 4 and msg_id[0] == "P": row.update({ "pu": p.get("pu"), "mv02": p.get("mv02"), "mv03": p.get("mv03"), "mv04": p.get("mv04"), "mv05": p.get("mv05"), "mv06": p.get("mv06"), "mv07": p.get("mv07"), "mv08": p.get("mv08"), }) return row if msg_id.endswith("CO") and len(msg_id) == 4 and msg_id[0] == "P": row.update({ "pu": p.get("pu"), "cs03_uS": p.get("cs03_uS"), "cs03_0p1uS": p.get("cs03_0p1uS"), "cs04_uS": p.get("cs04_uS"), "cs04_0p1uS": p.get("cs04_0p1uS"), "cs05_uS": p.get("cs05_uS"), "cs05_0p1uS": p.get("cs05_0p1uS"), }) return row # Unknown → keep full payload as JSON for later inspection pay = {k:v for k,v in p.items() if k not in ("version","msg_id","ts_ms")} row["payload_json"] = json.dumps(pay, separators=(",",":")) return row