121 lines
4.9 KiB
Python
121 lines
4.9 KiB
Python
# serial_csv_logger.py
|
|
import os, csv, datetime, json
|
|
from typing import Dict, Any, Tuple, Optional
|
|
|
|
class SerialCsvLogger:
|
|
"""
|
|
Writes parsed serial frames to CSV, segregated by message type:
|
|
- D0VP_YYYY-MM-DD.csv (Docking valves)
|
|
- D0CO_YYYY-MM-DD.csv (Docking conductivity)
|
|
- P1VP_YYYY-MM-DD.csv (PU1 valves), P2VP..., P3VP...
|
|
- P1CO_YYYY-MM-DD.csv (PU1 conductivity), etc.
|
|
- Unknown_YYYY-MM-DD.csv (for anything unmatched)
|
|
"""
|
|
def __init__(self, out_dir: str = "serial_logs", rotate_daily: bool = True):
|
|
self.out_dir = out_dir
|
|
self.rotate_daily = rotate_daily
|
|
self._writers: Dict[str, Tuple[csv.DictWriter, Any, str]] = {} # key -> (writer, file, date_str)
|
|
os.makedirs(self.out_dir, exist_ok=True)
|
|
|
|
def close(self):
|
|
for _, (_, f, _) in self._writers.items():
|
|
try: f.close()
|
|
except: pass
|
|
self._writers.clear()
|
|
|
|
# ---------- public API ----------
|
|
def log(self, parsed: Dict[str, Any]):
|
|
msg_id = parsed.get("msg_id", "Unknown")
|
|
date_str = datetime.date.today().isoformat() if self.rotate_daily else "all"
|
|
key = f"{msg_id}"
|
|
|
|
# rotate if day changed
|
|
if key in self._writers and self._writers[key][2] != date_str:
|
|
self._writers[key][1].close()
|
|
del self._writers[key]
|
|
|
|
writer, _, _ = self._ensure_writer(key, msg_id, date_str)
|
|
row = self._build_row(msg_id, parsed)
|
|
writer.writerow(row)
|
|
|
|
# ---------- internals ----------
|
|
def _ensure_writer(self, key: str, msg_id: str, date_str: str):
|
|
if key in self._writers:
|
|
return self._writers[key]
|
|
|
|
fname = f"{msg_id}_{date_str}.csv"
|
|
path = os.path.join(self.out_dir, fname)
|
|
f = open(path, "a", newline="")
|
|
headers = self._headers_for(msg_id)
|
|
writer = csv.DictWriter(f, fieldnames=headers)
|
|
|
|
# write header only if file is empty
|
|
if f.tell() == 0:
|
|
writer.writeheader()
|
|
|
|
self._writers[key] = (writer, f, date_str)
|
|
return self._writers[key]
|
|
|
|
def _headers_for(self, msg_id: str):
|
|
# Common heads
|
|
base = ["ts_iso", "ts_ms", "version", "msg_id"]
|
|
|
|
if msg_id == "D0VP":
|
|
return base + ["mv01","mv09","mv10","mv11","mmv01","mmv02","mmv03","sv01","sv02","sv03"]
|
|
|
|
if msg_id in ("D0CO", "DOCO"):
|
|
# write both scaled (uS) and raw (0.1 uS) for traceability
|
|
return base + ["cs01_uS","cs01_0p1uS","cs02_uS","cs02_0p1uS"]
|
|
|
|
if msg_id.endswith("VP") and len(msg_id) == 4 and msg_id[0] == "P":
|
|
# P1VP / P2VP / P3VP
|
|
return base + ["pu","mv02","mv03","mv04","mv05","mv06","mv07","mv08"]
|
|
|
|
if msg_id.endswith("CO") and len(msg_id) == 4 and msg_id[0] == "P":
|
|
# P1CO / P2CO / P3CO
|
|
return base + ["pu","cs03_uS","cs03_0p1uS","cs04_uS","cs04_0p1uS","cs05_uS","cs05_0p1uS"]
|
|
|
|
# fallback
|
|
return base + ["payload_json"]
|
|
|
|
def _build_row(self, msg_id: str, p: Dict[str, Any]) -> Dict[str, Any]:
|
|
ts_iso = datetime.datetime.fromtimestamp(p.get("ts_ms", 0)/1000.0).isoformat() if "ts_ms" in p else ""
|
|
row = {"ts_iso": ts_iso, "ts_ms": p.get("ts_ms", ""), "version": p.get("version",""), "msg_id": msg_id}
|
|
|
|
if msg_id == "D0VP":
|
|
row.update({
|
|
"mv01": p.get("mv01"), "mv09": p.get("mv09"), "mv10": p.get("mv10"), "mv11": p.get("mv11"),
|
|
"mmv01": p.get("mmv01"), "mmv02": p.get("mmv02"), "mmv03": p.get("mmv03"),
|
|
"sv01": p.get("sv01"), "sv02": p.get("sv02"), "sv03": p.get("sv03"),
|
|
})
|
|
return row
|
|
|
|
if msg_id in ("D0CO", "DOCO"):
|
|
row.update({
|
|
"cs01_uS": p.get("cs01_uS"), "cs01_0p1uS": p.get("cs01_0p1uS"),
|
|
"cs02_uS": p.get("cs02_uS"), "cs02_0p1uS": p.get("cs02_0p1uS"),
|
|
})
|
|
return row
|
|
|
|
if msg_id.endswith("VP") and len(msg_id) == 4 and msg_id[0] == "P":
|
|
row.update({
|
|
"pu": p.get("pu"),
|
|
"mv02": p.get("mv02"), "mv03": p.get("mv03"), "mv04": p.get("mv04"),
|
|
"mv05": p.get("mv05"), "mv06": p.get("mv06"), "mv07": p.get("mv07"), "mv08": p.get("mv08"),
|
|
})
|
|
return row
|
|
|
|
if msg_id.endswith("CO") and len(msg_id) == 4 and msg_id[0] == "P":
|
|
row.update({
|
|
"pu": p.get("pu"),
|
|
"cs03_uS": p.get("cs03_uS"), "cs03_0p1uS": p.get("cs03_0p1uS"),
|
|
"cs04_uS": p.get("cs04_uS"), "cs04_0p1uS": p.get("cs04_0p1uS"),
|
|
"cs05_uS": p.get("cs05_uS"), "cs05_0p1uS": p.get("cs05_0p1uS"),
|
|
})
|
|
return row
|
|
|
|
# Unknown → keep full payload as JSON for later inspection
|
|
pay = {k:v for k,v in p.items() if k not in ("version","msg_id","ts_ms")}
|
|
row["payload_json"] = json.dumps(pay, separators=(",",":"))
|
|
return row
|