NorthStar-HMI/main.py

244 lines
7.1 KiB
Python

from fastapi import FastAPI, HTTPException, Query, Form, Depends
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
import logging
import os
from fastapi import Request, APIRouter
import platform
from fastapi.templating import (
Jinja2Templates,
) # pip install fastapi uvicorn jinja2 python-multipart passlib
from starlette.middleware.sessions import SessionMiddleware
from starlette.exceptions import HTTPException as StarletteHTTPException
from starlette.status import HTTP_302_FOUND
import json
from pathlib import Path
from typing import Optional, Dict, Any
from fastapi import Query
import asyncio
import datetime
from valveBackend import ValveBackend
if platform.system() in ["Darwin"]: # macOS or Windows
from MockCAN import CANBackend
else:
from classCAN import CANBackend # Your real backend
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="your_super_secret_key")
router = APIRouter()
templates = Jinja2Templates(directory="templates")
logging.basicConfig(level=logging.INFO)
can_backend = CANBackend()
valve_backend = ValveBackend(eds_file="/home/hmi/Desktop/HMI/eds_file/inletvalveboard.eds")
# Serve static files (HTML, JS, CSS)
app.mount("/static", StaticFiles(directory="static"), name="static")
## BUFFER CREATION AND HELPER
# Global object to store the latest data
latest_data: Dict[str, Any] = {"PU_1": None, "PU_2": None, "PU_3": None}
def format_data(data):
return {
"timestamp": datetime.datetime.now().isoformat(),
"Qperm": data.get("FM2", 0.0),
"Qdilute": data.get("FM1", 0.0),
"Qdrain": data.get("FM4", 0.0),
"Qrecirc": data.get("FM3", 0.0),
"Pro": data.get("PS1", 0.0),
"Pdilute": data.get("PS3", 0.0),
"Prentate": data.get("PS2", 0.0),
"Conductivity": data.get("Cond", 0.0),
"MV02": data.get("MV02", 0.0),
"MV02_sp": data.get("MV02_sp", 0.0),
"MV03": data.get("MV03", 0.0),
"MV03_sp": data.get("MV03_sp", 0.0),
"MV05": data.get("MV05", 0.0),
"MV05_sp": data.get("MV05_sp", 0.0),
"MV06": data.get("MV06", 0.0),
"MV06_sp": data.get("MV06_sp", 0.0),
"MV07": data.get("MV07", 0.0),
"MV07_sp": data.get("MV07_sp", 0.0),
"MV08": data.get("MV08", 0.0),
"MV08_sp": data.get("MV08_sp", 0.0),
}
# CREDENTIALS
# Load users from JSON file at startup
CREDENTIAL_PATH = Path("credentials.json")
if CREDENTIAL_PATH.exists():
with CREDENTIAL_PATH.open("r") as f:
CREDENTIALS = json.load(f)
else:
CREDENTIALS = {}
USERNAME = CREDENTIALS["username"]
PASSWORD = CREDENTIALS["password"]
# ======== LOGIN & SESSION HANDLING ========
def require_login(request: Request):
user = request.session.get("user")
if user != USERNAME:
# raise 302 to trigger redirection manually (FastAPI doesn't support redirects from Depends directly)
raise StarletteHTTPException(status_code=302, detail="Redirect to login")
return user
@app.get("/", response_class=HTMLResponse)
def login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login")
def login(request: Request, username: str = Form(...), password: str = Form(...)):
if username == USERNAME and password == PASSWORD:
request.session["user"] = username
return RedirectResponse("/control", status_code=HTTP_302_FOUND)
return templates.TemplateResponse(
"login.html", {"request": request, "error": "Invalid credentials.json"}
)
@app.get("/logout")
def logout(request: Request):
request.session.clear()
return RedirectResponse("/", status_code=HTTP_302_FOUND)
# ======== PROTECTED INTERFACE ========
@app.get("/control", response_class=HTMLResponse)
def control_page(request: Request):
if request.session.get("user") != USERNAME:
return RedirectResponse("/", status_code=HTTP_302_FOUND)
return templates.TemplateResponse("control.html", {"request": request})
@app.get("/monitor-page", response_class=HTMLResponse)
def monitor_page(request: Request):
with open("static/monitor.html") as f:
return HTMLResponse(f.read())
# ======== CAN + BACKEND ROUTES ========
@app.post("/connect_toggle")
def connect_toggle():
logging.info("Toggling CAN connection...")
if can_backend.connected:
can_backend.shutdown()
return {"connected": False}
else:
success = can_backend.connect()
try:
valve_backend.connect()
except Exception as e:
print(f"Connection error : {e}")
if not success:
raise HTTPException(status_code=500, detail="Connection failed.")
return {"connected": True}
@app.post("/command/{state}/pu/{pu_number}")
def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)):
VALID_STATES = {
"IDLE",
"PRE-PRODUCTION",
"PRODUCTION",
"FIRST_START",
"THERMALLOOPCLEANING",
"DISINFECTION",
"SLEEP",
}
state = state.upper()
if state not in VALID_STATES:
raise HTTPException(status_code=400, detail=f"Invalid state '{state}'")
logging.info(f"Sending state '{state}' to PU {pu_number}")
try:
can_backend.send_state_command(state, pu_number, ploop_setpoint)
current_state = can_backend.read_current_state(pu_number)
return {
"status": "success",
"command": state,
"pu": pu_number,
"ploop_setpoint": ploop_setpoint,
"current_state": current_state,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/pu_status")
def get_pu_status():
states = {
"PU1": can_backend.read_current_state(1),
"PU2": can_backend.read_current_state(2),
"PU3": can_backend.read_current_state(3),
}
logging.info(f"[PU STATUS] {states}")
return JSONResponse(content=states)
async def update_latest_data():
while True:
for pu in [1, 2, 3]:
data = can_backend.get_latest_data(pu_number=pu)
logging.info(f"[MONITOR BUFFER] PU{pu}: {data}")
latest_data[f"PU_{pu}"] = format_data(data)
await asyncio.sleep(0.2) # Update every 100ms
@app.get("/monitor")
async def get_monitor_data(pu_number: Optional[int] = Query(None)):
logging.info(f"[MONITOR DATA QUERY] {pu_number}")
if pu_number:
return latest_data.get(f"PU_{pu_number}", {})
else:
return latest_data
@app.on_event("startup")
async def startup_event():
asyncio.create_task(update_latest_data())
@app.get("/can_status")
def can_status():
"""Return current CAN connection status."""
return {"connected": can_backend.connected}
@app.post("/command/feed_valve")
def feedvalve_control(MV01_opening: int = Query(...)):
"""Control MV01 feed valve"""
valve_backend.send_command(MV01_opening)
logging.info(f"Feed valve opening to {MV01_opening}")
return {"status": "ok"}
app.include_router(router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="127.0.0.1",
port=8080,
reload=True,
reload_dirs=["."],
)