NorthStar-Endurance-TestBench/single-valve-bench/MockValveController.py

58 lines
1.8 KiB
Python

import asyncio
import time
from collections import deque
from pathlib import Path
import csv
class ValveController:
def __init__(self):
self.connected = False
self.setpoint = 0.0
self.current_position = 0.0
self.data_window = deque(maxlen=200) # 10s at 20Hz
self.running = False
self._record_task = None
self._update_task = None
self.csv_path = Path("prbs_6_speed.csv")
async def connect(self):
self.connected = True
if not self._update_task:
self._update_task = asyncio.create_task(self.update_loop())
async def send_setpoint(self, value: float):
self.setpoint = value
async def read_position(self):
# Simple model: position moves toward setpoint
self.current_position += 0.1 * (self.setpoint - self.current_position)
return self.current_position
# async def update_loop(self):
# """Runs continuously to update valve position and data window at 20Hz"""
# while True:
# pos = await self.read_position()
# t = time.time()
# self.data_window.append((t, pos, self.setpoint))
# await asyncio.sleep(0.05) # 20Hz
async def record_loop(self):
self.running = True
with self.csv_path.open("w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["timestamp", "position", "setpoint"])
while self.running:
if self.data_window:
t, pos, sp = self.data_window[-1]
writer.writerow([t, pos, sp])
await asyncio.sleep(0.05) # match 20Hz
async def stop_recording(self):
self.running = False
if self._record_task:
await self._record_task
self._record_task = None