NorthStar-Endurance-TestBench/test.py

92 lines
3.2 KiB
Python

import pandas as pd
from datetime import datetime
import os
import time
import canopen
import random # Required for status (Success/Error)
num_devices = 20 # One device (Valve 26)
tpdo_object_ids = [6004, 6005, 6006]
# Initialize data blocks to store time, written, read, and status
device_data_blocks = {f"Valve {device_id}": {"Time": [], "Written": [], "Read": [], "Status": []} for device_id in range(1, num_devices + 1)}
# CANopen Network Setup
network = canopen.Network()
network.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
# Load node with EDS file
node = canopen.RemoteNode(26, r'C:\Users\vineetagupta\Documents\NorthStar-Motor-Valve-Board-Firmware\MotorValveBoard\coappl\motorcontrollervalve.eds')
network.add_node(node)
# Set node to operational
node.nmt.state = 'OPERATIONAL'
time.sleep(0.5)
# Read TPDO config
node.tpdo.read()
node.tpdo[1].enabled = True # Assuming TPDO1 has the object mapped
# Print mapped variables for debugging
print("Mapped TPDO1 objects:")
for var in node.tpdo[1].map:
print(f" - {var.name} (Index: {hex(var.index)}, Subindex: {var.subindex})")
# Create a callback to handle incoming TPDO data
written_data = None
read_data = None
def pdo_callback(map_obj):
global written_data, read_data
for var in map_obj:
# Check the index of the received TPDO
if var.index == 0x6004: # Written data
written_data = var.raw
print(f"Received Written data (6004): {written_data}")
elif var.index == 0x6005: # Read data
read_data = var.raw
print(f"Received Read data (6005): {read_data}")
# Add the callback to TPDO
node.tpdo[1].add_callback(pdo_callback)
# Append data to the data structure
def append_data():
global written_data, read_data
for device_id in range(1, num_devices + 1):
if written_data is not None and read_data is not None:
# Append the new data with current timestamp
current_time = datetime.now().strftime('%H:%M:%S')
device_data_blocks[f"Valve {device_id}"]["Time"].append(current_time)
device_data_blocks[f"Valve {device_id}"]["Written"].append(written_data)
device_data_blocks[f"Valve {device_id}"]["Read"].append(read_data)
device_data_blocks[f"Valve {device_id}"]["Status"].append(random.choice(["Success", "Error"]))
# Run the loop to keep fetching and logging data
try:
while True:
append_data()
time.sleep(1) # Adjust the time as needed
except KeyboardInterrupt:
print("\nLogging stopped by user (Ctrl+C). Saving data...")
# After stopping, convert the dynamic data to a pandas DataFrame
device_dataframes = []
for device_id in range(1, num_devices + 1):
data = device_data_blocks[f"Valve {device_id}"]
df = pd.DataFrame(data)
df.columns = pd.MultiIndex.from_product([[f"Valve {device_id}"], df.columns])
device_dataframes.append(df)
# Combine all device dataframes
final_df = pd.concat(device_dataframes, axis=1)
# Save to Excel with multi-level headers
file_name = 'ValveData.xlsx'
final_df.to_excel(file_name)
# Show file path
file_path = os.path.abspath(file_name)
print(f"Grouped data saved to: {file_path}")