NorthStar-HMI/main.py
2025-07-08 17:02:10 +02:00

148 lines
5.2 KiB
Python

from fastapi import FastAPI, HTTPException, Query
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
import logging
import platform
if platform.system() in ['Darwin']: # macOS or Windows
from MockCAN import CANBackend
else :
from classCAN import CANBackend # Your real backend
app = FastAPI()
logging.basicConfig(level=logging.INFO)
can_backend = CANBackend()
# Serve static files (HTML, JS, CSS)
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.post("/connect_toggle")
def connect_toggle():
"""Toggle CAN connection."""
logging.info("Toggling CAN connection...")
if can_backend.connected:
can_backend.shutdown()
return {"connected": False}
else:
success = can_backend.connect(
node_id=0x02,
eds_path=r"/eds_file/processBoard_0.eds"
)
if not success:
raise HTTPException(status_code=500, detail="Connection failed.")
return {"connected": True}
@app.post("/command/{state}/pu/{pu_number}")
def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)):
"""Send a state command to a specific PU."""
logging.info(f"Sending state '{state}' to PU {pu_number} with ploop_setpoint {ploop_setpoint}")
try:
can_backend.send_state_command(state.upper(), pu_number, ploop_setpoint)
return {"status": "success", "command": state.upper(), "pu": pu_number, "ploop_setpoint": ploop_setpoint}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/monitor")
def get_monitor_data():
# TODO : agree on the structure
data = can_backend.get_latest_data()
print(f"[MONITOR] Latest SDO: {data}")
return {
"PU_1": {
"Qperm": data.get("FM2", 0.0),
"Qdilute": data.get("FM3", 0.0),
"Qdrain": data.get("FM4", 0.0),
"Qrecirc": data.get("FM5", 0.0),
"Pro": data.get("PS1", 0.0),
"Pdilute": data.get("PS2", 0.0),
"Prentate": data.get("PS3", 0.0),
"Conductivity": data.get("Cond", 0.0),
"MV02": data.get("MV02", 0.0),
"MV02_sp": data.get("MV02_sp", 0.0),
"MV03": data.get("MV03", 0.0),
"MV03_sp": data.get("MV03_sp", 0.0),
"MV05": data.get("MV05", 0.0),
"MV05_sp": data.get("MV05_sp", 0.0),
"MV06": data.get("MV06", 0.0),
"MV06_sp": data.get("MV06_sp", 0.0),
"MV07": data.get("MV07", 0.0),
"MV07_sp": data.get("MV07_sp", 0.0),
"MV08": data.get("MV08", 0.0),
"MV08_sp": data.get("MV08_sp", 0.0)
},
"PU_2": {
"Qperm": data.get("FM2", 0.0),
"Qdilute": data.get("FM3", 0.0),
"Qdrain": data.get("FM4", 0.0),
"Qrecirc": data.get("FM5", 0.0),
"Pro": data.get("PS1", 0.0),
"Pdilute": data.get("PS2", 0.0),
"Prentate": data.get("PS3", 0.0),
"Conductivity": data.get("Cond", 0.0),
"MV02": data.get("MV02", 0.0),
"MV02_sp": data.get("MV02_sp", 0.0),
"MV03": data.get("MV03", 0.0),
"MV03_sp": data.get("MV03_sp", 0.0),
"MV05": data.get("MV05", 0.0),
"MV05_sp": data.get("MV05_sp", 0.0),
"MV06": data.get("MV06", 0.0),
"MV06_sp": data.get("MV06_sp", 0.0),
"MV07": data.get("MV07", 0.0),
"MV07_sp": data.get("MV07_sp", 0.0),
"MV08": data.get("MV08", 0.0),
"MV08_sp": data.get("MV08_sp", 0.0)
},
"PU_3": {
"Qperm": data.get("FM2", 0.0),
"Qdilute": data.get("FM3", 0.0),
"Qdrain": data.get("FM4", 0.0),
"Qrecirc": data.get("FM5", 0.0),
"Pro": data.get("PS1", 0.0),
"Pdilute": data.get("PS2", 0.0),
"Prentate": data.get("PS3", 0.0),
"Conductivity": data.get("Cond", 0.0),
"MV02": data.get("MV02", 0.0),
"MV02_sp": data.get("MV02_sp", 0.0),
"MV03": data.get("MV03", 0.0),
"MV03_sp": data.get("MV03_sp", 0.0),
"MV05": data.get("MV05", 0.0),
"MV05_sp": data.get("MV05_sp", 0.0),
"MV06": data.get("MV06", 0.0),
"MV06_sp": data.get("MV06_sp", 0.0),
"MV07": data.get("MV07", 0.0),
"MV07_sp": data.get("MV07_sp", 0.0),
"MV08": data.get("MV08", 0.0),
"MV08_sp": data.get("MV08_sp", 0.0)
}
}
@app.get("/can_status")
def can_status():
"""Return current CAN connection status."""
return {"connected": can_backend.connected}
@app.get("/", response_class=HTMLResponse)
async def read_root():
"""Serve main HTML page."""
with open("static/index.html", "r") as file:
return HTMLResponse(content=file.read(), status_code=200)
@app.get("/monitor-page", response_class=HTMLResponse)
async def read_monitor_page():
"""Serve monitor HTML page."""
with open("static/monitor.html", "r") as file:
return HTMLResponse(content=file.read(), status_code=200)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
reload_dirs=["."],
)