NorthStar-HMI/main.py

238 lines
7.9 KiB
Python

from fastapi import FastAPI, HTTPException, Query, Form, Depends
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
import logging
import os
from fastapi import Request, APIRouter
import platform
from fastapi.templating import Jinja2Templates # pip install fastapi uvicorn jinja2 python-multipart passlib
from starlette.middleware.sessions import SessionMiddleware
from starlette.exceptions import HTTPException as StarletteHTTPException
from starlette.status import HTTP_302_FOUND
import json
from pathlib import Path
if platform.system() in ['Darwin']: # macOS or Windows
from MockCAN import CANBackend
else :
from classCAN import CANBackend # Your real backend
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="your_super_secret_key")
router = APIRouter()
templates = Jinja2Templates(directory="templates")
logging.basicConfig(level=logging.INFO)
can_backend = CANBackend()
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
can_backend = CANBackend()
# Serve static files (HTML, JS, CSS)
app.mount("/static", StaticFiles(directory="static"), name="static")
# Jinja2 templates
templates = Jinja2Templates(directory="templates")
# CREDENTIALS
# Load users from JSON file at startup
CREDENTIAL_PATH = Path("credentials.json")
if CREDENTIAL_PATH.exists():
with CREDENTIAL_PATH.open("r") as f:
CREDENTIALS = json.load(f)
else:
CREDENTIALS = {}
USERNAME = CREDENTIALS["username"]
PASSWORD = CREDENTIALS["password"]
# ======== LOGIN & SESSION HANDLING ========
def require_login(request: Request):
user = request.session.get("user")
if user != USERNAME:
# raise 302 to trigger redirection manually (FastAPI doesn't support redirects from Depends directly)
raise StarletteHTTPException(status_code=302, detail="Redirect to login")
return user
@app.get("/", response_class=HTMLResponse)
def login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login")
def login(request: Request, username: str = Form(...), password: str = Form(...)):
if username == USERNAME and password == PASSWORD:
request.session["user"] = username
return RedirectResponse("/control", status_code=HTTP_302_FOUND)
return templates.TemplateResponse("login.html", {"request": request, "error": "Invalid credentials.json"})
@app.get("/logout")
def logout(request: Request):
request.session.clear()
return RedirectResponse("/", status_code=HTTP_302_FOUND)
# ======== PROTECTED INTERFACE ========
@app.get("/control", response_class=HTMLResponse)
def control_page(request: Request):
if request.session.get("user") != USERNAME:
return RedirectResponse("/", status_code=HTTP_302_FOUND)
return templates.TemplateResponse("control.html", {"request": request})
@app.get("/monitor-page", response_class=HTMLResponse)
def monitor_page(request: Request):
with open("static/monitor.html") as f:
return HTMLResponse(f.read())
# ======== CAN + BACKEND ROUTES ========
@app.post("/connect_toggle")
def connect_toggle():
logging.info("Toggling CAN connection...")
if can_backend.connected:
can_backend.shutdown()
return {"connected": False}
else:
success = can_backend.connect()
if not success:
raise HTTPException(status_code=500, detail="Connection failed.")
return {"connected": True}
@app.post("/command/{state}/pu/{pu_number}")
def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)):
VALID_STATES = {
"IDLE", "PRE-PRODUCTION", "PRODUCTION", "FIRST_START",
"THERMALLOOPCLEANING", "DISINFECTION", "SLEEP"
}
state = state.upper()
if state not in VALID_STATES:
raise HTTPException(status_code=400, detail=f"Invalid state '{state}'")
logging.info(f"Sending state '{state}' to PU {pu_number}")
try:
can_backend.send_state_command(state, pu_number, ploop_setpoint)
current_state = can_backend.read_current_state(pu_number)
return {
"status": "success",
"command": state,
"pu": pu_number,
"ploop_setpoint": ploop_setpoint,
"current_state": current_state
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/pu_status")
def get_pu_status():
states = {
"PU1": can_backend.read_current_state(1),
"PU2": can_backend.read_current_state(2),
"PU3": can_backend.read_current_state(3),
}
logging.info(f"[PU STATUS] {states}")
return JSONResponse(content=states)
@app.get("/monitor")
def get_monitor_data():
data = can_backend.get_latest_data()
print(f"[MONITOR] Latest SDO: {data}")
return {
"PU_1": {
"Qperm": data.get("FM2", 0.0),
"Qdilute": data.get("FM3", 0.0),
"Qdrain": data.get("FM4", 0.0),
"Qrecirc": data.get("FM5", 0.0),
"Pro": data.get("PS1", 0.0),
"Pdilute": data.get("PS2", 0.0),
"Prentate": data.get("PS3", 0.0),
"Conductivity": data.get("Cond", 0.0),
"MV02": data.get("MV02", 0.0),
"MV02_sp": data.get("MV02_sp", 0.0),
"MV03": data.get("MV03", 0.0),
"MV03_sp": data.get("MV03_sp", 0.0),
"MV05": data.get("MV05", 0.0),
"MV05_sp": data.get("MV05_sp", 0.0),
"MV06": data.get("MV06", 0.0),
"MV06_sp": data.get("MV06_sp", 0.0),
"MV07": data.get("MV07", 0.0),
"MV07_sp": data.get("MV07_sp", 0.0),
"MV08": data.get("MV08", 0.0),
"MV08_sp": data.get("MV08_sp", 0.0)
},
"PU_2": {
"Qperm": data.get("FM2", 0.0),
"Qdilute": data.get("FM3", 0.0),
"Qdrain": data.get("FM4", 0.0),
"Qrecirc": data.get("FM5", 0.0),
"Pro": data.get("PS1", 0.0),
"Pdilute": data.get("PS2", 0.0),
"Prentate": data.get("PS3", 0.0),
"Conductivity": data.get("Cond", 0.0),
"MV02": data.get("MV02", 0.0),
"MV02_sp": data.get("MV02_sp", 0.0),
"MV03": data.get("MV03", 0.0),
"MV03_sp": data.get("MV03_sp", 0.0),
"MV05": data.get("MV05", 0.0),
"MV05_sp": data.get("MV05_sp", 0.0),
"MV06": data.get("MV06", 0.0),
"MV06_sp": data.get("MV06_sp", 0.0),
"MV07": data.get("MV07", 0.0),
"MV07_sp": data.get("MV07_sp", 0.0),
"MV08": data.get("MV08", 0.0),
"MV08_sp": data.get("MV08_sp", 0.0)
},
"PU_3": {
"Qperm": data.get("FM2", 0.0),
"Qdilute": data.get("FM3", 0.0),
"Qdrain": data.get("FM4", 0.0),
"Qrecirc": data.get("FM5", 0.0),
"Pro": data.get("PS1", 0.0),
"Pdilute": data.get("PS2", 0.0),
"Prentate": data.get("PS3", 0.0),
"Conductivity": data.get("Cond", 0.0),
"MV02": data.get("MV02", 0.0),
"MV02_sp": data.get("MV02_sp", 0.0),
"MV03": data.get("MV03", 0.0),
"MV03_sp": data.get("MV03_sp", 0.0),
"MV05": data.get("MV05", 0.0),
"MV05_sp": data.get("MV05_sp", 0.0),
"MV06": data.get("MV06", 0.0),
"MV06_sp": data.get("MV06_sp", 0.0),
"MV07": data.get("MV07", 0.0),
"MV07_sp": data.get("MV07_sp", 0.0),
"MV08": data.get("MV08", 0.0),
"MV08_sp": data.get("MV08_sp", 0.0)
}
}
@app.get("/can_status")
def can_status():
"""Return current CAN connection status."""
return {"connected": can_backend.connected}
app.include_router(router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="127.0.0.1",
port=8080,
reload=True,
reload_dirs=["."],
)