from typing import Dict, Any, List, Tuple import re RE_PU_VP = re.compile(r'^P(?P[1-3])VP$') RE_PU_CO = re.compile(r'^P(?P[1-3])CO$') RE_DOCK_VP = re.compile(r'^D0VP$') RE_DOCK_CO = re.compile(r'^(D0CO|DOCO)$') # be tolerant def _to_i(s: str) -> int: try: return int(s.strip()) except: return 0 def _to_pct(s: str) -> int: try: return int(s.strip()) except: try: return int(float(s)) except: return 0 def _to_bool(s: str) -> bool: return str(s).strip() in ("1","true","True","TRUE") def _dock_vp(vals: List[str]) -> Dict[str, Any]: names = ["mv01","mv09","mv10","mv11","mmv01","mmv02","mmv03","sv01","sv02","sv03"] out: Dict[str, Any] = {} for k, v in zip(names, vals): out[k] = _to_bool(v) if k.startswith("sv") else _to_pct(v) return out def _dock_co(vals: List[str]) -> Dict[str, Any]: out: Dict[str, Any] = {} for name, v in zip(["cs01","cs02"], vals): q = _to_i(v) # 0.1 µS out[f"{name}_0p1uS"] = q out[f"{name}_uS"] = q*0.1 return out def _pu_vp(pu: int, vals: List[str]) -> Dict[str, Any]: out: Dict[str, Any] = {"pu": pu} for k, v in zip(["mv02","mv03","mv04","mv05","mv06","mv07","mv08"], vals): out[k] = _to_pct(v) return out def _pu_co(pu: int, vals: List[str]) -> Dict[str, Any]: out: Dict[str, Any] = {"pu": pu} for name, v in zip(["cs03","cs04","cs05"], vals): q = _to_i(v) out[f"{name}_0p1uS"] = q out[f"{name}_uS"] = q*0.1 return out def decode_frames(buffer: bytes) -> Tuple[List[Tuple[bytes, Dict[str, Any]]], bytes, int]: msgs: List[Tuple[bytes, Dict[str, Any]]] = [] errors = 0 parts = buffer.split(b"\n") remaining = parts[-1] for line in parts[:-1]: raw = line.strip().rstrip(b"\r") if not raw: continue try: t = raw.decode("utf-8") fields = [f.strip() for f in t.split(",")] if len(fields) < 3: raise ValueError("too few fields") version, msg_id, ts_ms = fields[0], fields[1], fields[2] data = fields[3:] parsed: Dict[str, Any] = {"version":version, "msg_id":msg_id, "ts_ms": int(ts_ms)} if RE_DOCK_VP.match(msg_id): parsed.update({"src":"dock","type":"valves"}) parsed.update(_dock_vp(data)) elif RE_DOCK_CO.match(msg_id): parsed.update({"src":"dock","type":"cond"}) parsed.update(_dock_co(data)) else: m = RE_PU_VP.match(msg_id) if m: pu = int(m.group("pu")) parsed.update({"src":"pu","type":"valves","pu":pu}) parsed.update(_pu_vp(pu, data)) else: m = RE_PU_CO.match(msg_id) if m: pu = int(m.group("pu")) parsed.update({"src":"pu","type":"cond","pu":pu}) parsed.update(_pu_co(pu, data)) else: parsed.update({"src":"unknown","type":"raw","data":data}) msgs.append((raw, parsed)) except Exception: errors += 1 return msgs, remaining, errors