NorthStar-HMI/protocol_decoder.py

95 lines
3.3 KiB
Python

from typing import Dict, Any, List, Tuple
import re
RE_PU_VP = re.compile(r'^P(?P<pu>[1-3])VP$')
RE_PU_CO = re.compile(r'^P(?P<pu>[1-3])CO$')
RE_DOCK_VP = re.compile(r'^D0VP$')
RE_DOCK_CO = re.compile(r'^(D0CO|DOCO)$') # be tolerant
def _to_i(s: str) -> int:
try: return int(s.strip())
except: return 0
def _to_pct(s: str) -> int:
try: return int(s.strip())
except:
try: return int(float(s))
except: return 0
def _to_bool(s: str) -> bool:
return str(s).strip() in ("1","true","True","TRUE")
def _dock_vp(vals: List[str]) -> Dict[str, Any]:
names = ["mv01","mv09","mv10","mv11","mmv01","mmv02","mmv03","sv01","sv02","sv03"]
out: Dict[str, Any] = {}
for k, v in zip(names, vals):
out[k] = _to_bool(v) if k.startswith("sv") else _to_pct(v)
return out
def _dock_co(vals: List[str]) -> Dict[str, Any]:
out: Dict[str, Any] = {}
for name, v in zip(["cs01","cs02"], vals):
q = _to_i(v) # 0.1 µS
out[f"{name}_0p1uS"] = q
out[f"{name}_uS"] = q*0.1
return out
def _pu_vp(pu: int, vals: List[str]) -> Dict[str, Any]:
out: Dict[str, Any] = {"pu": pu}
for k, v in zip(["mv02","mv03","mv04","mv05","mv06","mv07","mv08"], vals):
out[k] = _to_pct(v)
return out
def _pu_co(pu: int, vals: List[str]) -> Dict[str, Any]:
out: Dict[str, Any] = {"pu": pu}
for name, v in zip(["cs03","cs04","cs05"], vals):
q = _to_i(v)
out[f"{name}_0p1uS"] = q
out[f"{name}_uS"] = q*0.1
return out
def decode_frames(buffer: bytes) -> Tuple[List[Tuple[bytes, Dict[str, Any]]], bytes, int]:
msgs: List[Tuple[bytes, Dict[str, Any]]] = []
errors = 0
parts = buffer.split(b"\n")
remaining = parts[-1]
for line in parts[:-1]:
raw = line.strip().rstrip(b"\r")
if not raw: continue
try:
t = raw.decode("utf-8")
fields = [f.strip() for f in t.split(",")]
if len(fields) < 3: raise ValueError("too few fields")
version, msg_id, ts_ms = fields[0], fields[1], fields[2]
data = fields[3:]
parsed: Dict[str, Any] = {"version":version, "msg_id":msg_id, "ts_ms": int(ts_ms)}
if RE_DOCK_VP.match(msg_id):
parsed.update({"src":"dock","type":"valves"})
parsed.update(_dock_vp(data))
elif RE_DOCK_CO.match(msg_id):
parsed.update({"src":"dock","type":"cond"})
parsed.update(_dock_co(data))
else:
m = RE_PU_VP.match(msg_id)
if m:
pu = int(m.group("pu"))
parsed.update({"src":"pu","type":"valves","pu":pu})
parsed.update(_pu_vp(pu, data))
else:
m = RE_PU_CO.match(msg_id)
if m:
pu = int(m.group("pu"))
parsed.update({"src":"pu","type":"cond","pu":pu})
parsed.update(_pu_co(pu, data))
else:
parsed.update({"src":"unknown","type":"raw","data":data})
msgs.append((raw, parsed))
except Exception:
errors += 1
return msgs, remaining, errors