NorthStar-Endurance-TestBench/single-valve-bench/main.py

138 lines
3.7 KiB
Python

from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from MockValveController import ValveController
from ClassCAN import ValveControllerCAN
from starlette.concurrency import run_in_threadpool
import asyncio
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi.templating import (
Jinja2Templates,
)
import logging
from crankshaft_functions import *
import asyncio
import random
app = FastAPI()
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
valve = ValveControllerCAN()
templates = Jinja2Templates(directory="static")
@app.get("/", response_class=HTMLResponse)
def login_form(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/connect")
async def connect():
try:
status = valve.connect()
# if status :
# valve.start_listener()
return {"status": "connected" if status else "failed"}
except Exception as e:
import traceback; traceback.print_exc()
return {"status": "error", "detail": str(e)}
@app.post("/pid")
async def set_setpoint(req: Request):
data = await req.json()
kp = data["kp"]
ki = data["ki"]
tol = data["tol"]
valve.send_PID_coeffs(kp, ki, tol)
return {"status": "ok"}
@app.post("/setpoint")
async def set_setpoint(req: Request):
data = await req.json()
sp = data["setpoint"]
valve.send_setpoint(sp)
return {"setpoint": sp}
@app.post("/speed")
async def set_speed(req: Request):
data = await req.json()
sp = data["speed_sp"]
valve.send_speed(sp)
return {"speed_sp": sp}
@app.get("/position")
async def get_position():
# Return last 10s of data
data = list(valve.data_window)
return {
"time": [d[0] for d in data],
"position": [d[1] for d in data],
"setpoint": [d[2] for d in data],
"speed": [d[3] for d in data],
"speed_sp": [d[4] for d in data],
"Kp_term": [d[5] for d in data],
"Ki_term": [d[6] for d in data],
"Kd_term": [d[7] for d in data],
}
@app.post("/start_recording")
async def start_recording():
if valve.running:
return {"status": "already recording"}
valve._record_task = asyncio.create_task(valve.record_loop())
return {"status": "started"}
@app.post("/stop_recording")
async def stop_recording():
await valve.stop_recording()
return {"status": "stopped", "file": str(valve.csv_path.resolve())}
@app.post("/prbs")
async def run_prbs():
"""
Generate and send a true PRBS (Pseudo-Random Binary Sequence).
Hardcoded params:
- amplitude: 0 / 100
- dwell time: 0.1 s
- order n = 7 → sequence length = 127 steps
"""
async def prbs_task():
amplitude = 100
dwell_time = 0.3 # system τ ~0.1s
n = 6 # PRBS order
seq_length = 2**n - 1
# --- LFSR initialization ---
state = [1] * n # initial seed (nonzero)
prbs_bits = []
for _ in range(seq_length):
# Example taps for n=7 (maximal length LFSR: taps [7,6])
new_bit = state[-1] ^ state[-2] # XOR feedback
prbs_bits.append(state[-1])
state = [new_bit] + state[:-1]
# Convert bits → setpoints {0, amplitude}
prbs_sequence = [amplitude if bit == 1 else 0 for bit in prbs_bits]
for sp in prbs_sequence:
valve.send_setpoint(sp)
# valve.send_speed(sp)
await asyncio.sleep(dwell_time)
# return to 0 at the end
valve.send_setpoint(0)
asyncio.create_task(prbs_task())
return {"status": "started PRBS"}
app.mount("/static", StaticFiles(directory="static"), name="static")