NorthStar-HMI/main.py

103 lines
3.2 KiB
Python

from fastapi import FastAPI, HTTPException, Query
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
import logging
import os
from fastapi import Request, APIRouter
import subprocess
import platform
if platform.system() in ['Darwin']: # macOS or Windows
from MockCAN import CANBackend
else :
from classCAN import CANBackend # Your real backend
app = FastAPI()
router = APIRouter()
logging.basicConfig(level=logging.INFO)
can_backend = CANBackend()
# Serve static files (HTML, JS, CSS)
app.mount("/static", StaticFiles(directory="static"), name="static")
# @router.post("/webhook")
# async def github_webhook(request: Request):
# payload = await request.json()
# print("[WEBHOOK] Received webhook:", payload.get("head_commit", {}).get("message"))
# try:
# # Call the update script on the HOST using host bash
# subprocess.run(
# ["/usr/bin/bash", "-c", "bash /home/hmi/Desktop/HMI/update_hmi.sh"],
# check=True,
# capture_output=True,
# text=True
# )
# return {"status": "Update triggered"}
# except subprocess.CalledProcessError as e:
# print(f"[WEBHOOK] Update failed:\n{e.stderr}")
# return {"status": "Update failed", "error": str(e)}
@app.post("/connect_toggle")
def connect_toggle():
"""Toggle CAN connection."""
logging.info("Toggling CAN connection...")
if can_backend.connected:
can_backend.shutdown()
return {"connected": False}
else:
success = can_backend.connect(
node_id=0x02,
eds_path = os.path.join(os.path.dirname(__file__), "eds_file", "processBoard_0.eds")
)
if not success:
raise HTTPException(status_code=500, detail="Connection failed.")
return {"connected": True}
@app.post("/command/{state}/pu/{pu_number}")
def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)):
"""Send a state command to a specific PU."""
logging.info(f"Sending state '{state}' to PU {pu_number}")
try:
can_backend.send_state_command(state.upper(), pu_number, ploop_setpoint)
return {"status": "success", "command": state.upper(), "pu": pu_number, "ploop_setpoint": ploop_setpoint}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/monitor")
def get_monitor_data():
data = can_backend.get_latest_data()
print(f"[MONITOR] Latest SDO: {data}")
return {
"Qperm": [data.get("FM2", 0.0), data.get("FM2", 0.0), 0.0],
"Pdilute": [data.get("PS1", 0.0), 0.0, 0.0],
"Conductivity": [0.0, 0.0, 0.0],
"Pro": [0.0, 0.0, 0.0],
}
@app.get("/can_status")
def can_status():
"""Return current CAN connection status."""
return {"connected": can_backend.connected}
@app.get("/", response_class=HTMLResponse)
async def read_root():
"""Serve main HTML page."""
with open("static/index.html", "r") as file:
return HTMLResponse(content=file.read(), status_code=200)
app.include_router(router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="127.0.0.1",
port=8000,
reload=True,
reload_dirs=["."],
)