253 lines
7.2 KiB
Python
253 lines
7.2 KiB
Python
from fastapi import FastAPI, HTTPException, Query, Form, Depends
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
|
|
import logging
|
|
import os
|
|
from fastapi import Request, APIRouter
|
|
import platform
|
|
from fastapi.templating import (
|
|
Jinja2Templates,
|
|
) # pip install fastapi uvicorn jinja2 python-multipart passlib
|
|
from starlette.middleware.sessions import SessionMiddleware
|
|
from starlette.exceptions import HTTPException as StarletteHTTPException
|
|
from starlette.status import HTTP_302_FOUND
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any
|
|
from fastapi import Query
|
|
import asyncio
|
|
import datetime
|
|
from valveBackend import ValveBackend
|
|
|
|
if platform.system() in ["Darwin"]: # macOS or Windows
|
|
from MockCAN import CANBackend
|
|
else:
|
|
from classCAN import CANBackend # Your real backend
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(SessionMiddleware, secret_key="your_super_secret_key")
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="templates")
|
|
logging.basicConfig(level=logging.INFO)
|
|
can_backend = CANBackend()
|
|
valve_backend = ValveBackend(eds_file="/home/hmi/Desktop/HMI/eds_file/inletvalveboard.eds")
|
|
|
|
# Serve static files (HTML, JS, CSS)
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
## BUFFER CREATION AND HELPER
|
|
# Global object to store the latest data
|
|
latest_data: Dict[str, Any] = {"PU_1": None, "PU_2": None, "PU_3": None}
|
|
|
|
|
|
def format_data(data):
|
|
return {
|
|
"timestamp": datetime.datetime.now().isoformat(),
|
|
"Qperm": data.get("FM2", 0.0),
|
|
"Qdilute": data.get("FM1", 0.0),
|
|
"Qdrain": data.get("FM4", 0.0),
|
|
"Qrecirc": data.get("FM3", 0.0),
|
|
|
|
"Pro": data.get("PS1", 0.0),
|
|
"Pdilute": data.get("PS3", 0.0),
|
|
"Prentate": data.get("PS2", 0.0),
|
|
|
|
"Conductivity": data.get("Cond", 0.0),
|
|
|
|
"MV02": data.get("MV02", 0.0),
|
|
"MV02_sp": data.get("MV02_sp", 0.0),
|
|
|
|
"MV03": data.get("MV03", 0.0),
|
|
"MV03_sp": data.get("MV03_sp", 0.0),
|
|
"MV04": data.get("MV05", 0.0),
|
|
"MV04_sp": data.get("MV05_sp", 0.0),
|
|
"MV05": data.get("MV05", 0.0),
|
|
"MV05_sp": data.get("MV05_sp", 0.0),
|
|
"MV06": data.get("MV06", 0.0),
|
|
"MV06_sp": data.get("MV06_sp", 0.0),
|
|
|
|
"MV07": data.get("MV07", 0.0),
|
|
"MV07_sp": data.get("MV07_sp", 0.0),
|
|
|
|
"MV08": data.get("MV08", 0.0),
|
|
"MV08_sp": data.get("MV08_sp", 0.0),
|
|
}
|
|
|
|
|
|
# CREDENTIALS
|
|
|
|
# Load users from JSON file at startup
|
|
CREDENTIAL_PATH = Path("credentials.json")
|
|
if CREDENTIAL_PATH.exists():
|
|
with CREDENTIAL_PATH.open("r") as f:
|
|
CREDENTIALS = json.load(f)
|
|
else:
|
|
CREDENTIALS = {}
|
|
|
|
USERNAME = CREDENTIALS["username"]
|
|
PASSWORD = CREDENTIALS["password"]
|
|
|
|
|
|
# ======== LOGIN & SESSION HANDLING ========
|
|
|
|
|
|
def require_login(request: Request):
|
|
user = request.session.get("user")
|
|
if user != USERNAME:
|
|
# raise 302 to trigger redirection manually (FastAPI doesn't support redirects from Depends directly)
|
|
raise StarletteHTTPException(status_code=302, detail="Redirect to login")
|
|
return user
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def login_form(request: Request):
|
|
return templates.TemplateResponse("login.html", {"request": request})
|
|
|
|
|
|
@app.post("/login")
|
|
def login(request: Request, username: str = Form(...), password: str = Form(...)):
|
|
if username == USERNAME and password == PASSWORD:
|
|
request.session["user"] = username
|
|
return RedirectResponse("/control", status_code=HTTP_302_FOUND)
|
|
return templates.TemplateResponse(
|
|
"login.html", {"request": request, "error": "Invalid credentials.json"}
|
|
)
|
|
|
|
|
|
@app.get("/logout")
|
|
def logout(request: Request):
|
|
request.session.clear()
|
|
return RedirectResponse("/", status_code=HTTP_302_FOUND)
|
|
|
|
|
|
# ======== PROTECTED INTERFACE ========
|
|
|
|
|
|
@app.get("/control", response_class=HTMLResponse)
|
|
def control_page(request: Request):
|
|
if request.session.get("user") != USERNAME:
|
|
return RedirectResponse("/", status_code=HTTP_302_FOUND)
|
|
return templates.TemplateResponse("control.html", {"request": request})
|
|
|
|
|
|
@app.get("/monitor-page", response_class=HTMLResponse)
|
|
def monitor_page(request: Request):
|
|
with open("static/monitor.html") as f:
|
|
return HTMLResponse(f.read())
|
|
|
|
|
|
# ======== CAN + BACKEND ROUTES ========
|
|
|
|
|
|
@app.post("/connect_toggle")
|
|
def connect_toggle():
|
|
logging.info("Toggling CAN connection...")
|
|
if can_backend.connected:
|
|
can_backend.shutdown()
|
|
return {"connected": False}
|
|
else:
|
|
success = can_backend.connect()
|
|
try:
|
|
valve_backend.connect()
|
|
except Exception as e:
|
|
print(f"Connection error : {e}")
|
|
|
|
if not success:
|
|
raise HTTPException(status_code=500, detail="Connection failed.")
|
|
return {"connected": True}
|
|
|
|
|
|
@app.post("/command/{state}/pu/{pu_number}")
|
|
def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)):
|
|
VALID_STATES = {
|
|
"IDLE",
|
|
"PRE-PRODUCTION",
|
|
"PRODUCTION",
|
|
"FIRST_START",
|
|
"THERMALLOOPCLEANING",
|
|
"DISINFECTION",
|
|
"SLEEP",
|
|
}
|
|
|
|
state = state.upper()
|
|
|
|
if state not in VALID_STATES:
|
|
raise HTTPException(status_code=400, detail=f"Invalid state '{state}'")
|
|
|
|
logging.info(f"Sending state '{state}' to PU {pu_number}")
|
|
|
|
try:
|
|
can_backend.send_state_command(state, pu_number, ploop_setpoint)
|
|
current_state = can_backend.read_current_state(pu_number)
|
|
return {
|
|
"status": "success",
|
|
"command": state,
|
|
"pu": pu_number,
|
|
"ploop_setpoint": ploop_setpoint,
|
|
"current_state": current_state,
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.get("/api/pu_status")
|
|
def get_pu_status():
|
|
states = {
|
|
"PU1": can_backend.read_current_state(1),
|
|
"PU2": can_backend.read_current_state(2),
|
|
"PU3": can_backend.read_current_state(3),
|
|
}
|
|
logging.info(f"[PU STATUS] {states}")
|
|
|
|
return JSONResponse(content=states)
|
|
|
|
|
|
async def update_latest_data():
|
|
while True:
|
|
for pu in [1, 2, 3]:
|
|
data = can_backend.get_latest_data(pu_number=pu)
|
|
logging.info(f"[MONITOR BUFFER] PU{pu}: {data}")
|
|
latest_data[f"PU_{pu}"] = format_data(data)
|
|
await asyncio.sleep(0.2) # Update every 100ms
|
|
|
|
|
|
@app.get("/monitor")
|
|
async def get_monitor_data(pu_number: Optional[int] = Query(None)):
|
|
logging.info(f"[MONITOR DATA QUERY] {pu_number}")
|
|
if pu_number:
|
|
return latest_data.get(f"PU_{pu_number}", {})
|
|
else:
|
|
return latest_data
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
asyncio.create_task(update_latest_data())
|
|
|
|
|
|
@app.get("/can_status")
|
|
def can_status():
|
|
"""Return current CAN connection status."""
|
|
return {"connected": can_backend.connected}
|
|
|
|
|
|
@app.post("/command/feed_valve")
|
|
def feedvalve_control(MV01_opening: int = Query(...)):
|
|
"""Control MV01 feed valve"""
|
|
valve_backend.send_command(MV01_opening)
|
|
logging.info(f"Feed valve opening to {MV01_opening}")
|
|
return {"status": "ok"}
|
|
|
|
app.include_router(router)
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(
|
|
"main:app",
|
|
host="127.0.0.1",
|
|
port=8080,
|
|
reload=True,
|
|
reload_dirs=["."],
|
|
)
|