fan-control / app_backup.py
sgbaird's picture
app backup
7385f67
import json
import queue
import time
import secrets
import paho.mqtt.client as mqtt
import streamlit as st
import matplotlib.pyplot as plt
# Initialize Streamlit app
st.title("Motor Control Panel")
# Description and context
st.markdown(
"""
This application accesses a public test demo of a motor control located in
Toronto, ON, Canada (as of 2024-07-27). Send speed commands to the motor and
visualize the RPM data.
For more context, you
can refer to this [Colab
notebook](https://colab.research.google.com/github/sparks-baird/self-driving-lab-demo/blob/main/notebooks/4.2-paho-mqtt-colab-sdl-demo-test.ipynb)
and the [self-driving-lab-demo
project](https://github.com/sparks-baird/self-driving-lab-demo). You may also be
interested in the Acceleration Consortium's ["Hello World"
microcourse](https://ac-microcourses.readthedocs.io/en/latest/courses/hello-world/index.html)
for self-driving labs.
"""
)
with st.form("mqtt_form"):
# MQTT Configuration
HIVEMQ_HOST = st.text_input(
"Enter your HiveMQ host:",
"248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud",
type="password",
)
HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "sgbaird")
HIVEMQ_PASSWORD = st.text_input(
"Enter your HiveMQ password:", "D.Pq5gYtejYbU#L", type="password"
)
PORT = st.number_input(
"Enter the port number:", min_value=1, max_value=65535, value=8883
)
# User input for the Pico ID
PICO_ID = st.text_input("Enter your Pico ID:", "test-fan", type="password")
# Sliders for speed values
speed = st.slider("Select the Speed value:", min_value=0, max_value=100, value=0)
submit_button = st.form_submit_button(label="Send Speed Command")
command_topic = f"sdl-demo/picow/{PICO_ID}/motor/speed/"
rpm_data_topic = f"sdl-demo/picow/{PICO_ID}/motor/rpm/"
# random session id to keep track of the session and filter out old data
experiment_id = secrets.token_hex(4) # 4 bytes = 8 characters
rpm_data_queue = queue.Queue()
# Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource
@st.cache_resource
def create_paho_client(tls=True):
client = mqtt.Client(protocol=mqtt.MQTTv5)
if tls:
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
return client
# Define setup separately since sensor_data is dynamic
def setup_paho_client(
client, rpm_data_topic, hostname, username, password=None, port=8883
):
def on_message(client, userdata, msg):
rpm_data_queue.put(json.loads(msg.payload))
def on_connect(client, userdata, flags, rc, properties=None):
if rc != 0:
print("Connected with result code " + str(rc))
client.subscribe(rpm_data_topic, qos=1)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username, password)
client.connect(hostname, port)
client.loop_start() # Use a non-blocking loop
return client
def send_command(client, command_topic, msg):
print("Sending command...")
result = client.publish(command_topic, json.dumps(msg), qos=2)
result.wait_for_publish() # Ensure the message is sent
if result.rc == mqtt.MQTT_ERR_SUCCESS:
print(f"Command sent: {msg} to topic {command_topic}")
else:
print(f"Failed to send command: {result.rc}")
# Helper function to plot RPM data
def plot_rpm_data(ax1, ax2, rpm_data):
times = [data["time"] for data in rpm_data]
rpm_values = [data["rpm"] for data in rpm_data]
ax1.clear()
ax1.plot(times, rpm_values, "-o", color="blue")
ax1.set_xlabel("Time (s)", fontsize=14)
ax1.set_ylabel("RPM", fontsize=14)
ax1.set_title("Motor RPM over Time", fontsize=16)
ax2.clear()
ax2.plot(times, rpm_values, "-o", color="blue")
ax2.set_ylim(0, max(1.1 * max(rpm_values), 1.1)) # Set fixed y-axis limits
ax2.set_xlabel("Time (s)")
ax2.set_ylabel("RPM (scaled)")
ax2.set_title("RPM Data Stream")
st.pyplot(fig)
# Function to check and update RPM data periodically
def check_rpm_data():
rpm_data = []
global fig, ax1, ax2
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
start_time = time.time()
last_data_time = start_time
timeout = 15 # Timeout duration in seconds
rpm_display = st.empty()
while True:
current_time = time.time()
if not rpm_data_queue.empty():
data = rpm_data_queue.get()
elapsed_time = current_time - start_time
data["time"] = elapsed_time
rpm_data.append(data)
last_data_time = current_time
with rpm_display.container():
plot_rpm_data(ax1, ax2, rpm_data)
st.write("RPM Data Received:", data)
if current_time - last_data_time > timeout:
st.error(
f"No RPM data received for {timeout} seconds. Please check the system."
)
break
time.sleep(1)
# Publish button
if submit_button:
if not PICO_ID or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD:
st.error("Please enter all required fields.")
else:
st.info(
f"Please wait while the command {speed} for experiment {experiment_id} is sent..."
)
client = create_paho_client(tls=True)
client = setup_paho_client(
client,
rpm_data_topic,
HIVEMQ_HOST,
HIVEMQ_USERNAME,
password=HIVEMQ_PASSWORD,
port=int(PORT),
)
command_msg = {"speed": speed, "_experiment_id": experiment_id}
send_command(client, command_topic, command_msg)
st.success(f"Command {speed} for experiment {experiment_id} sent successfully!")
# Start periodic check for RPM data
check_rpm_data()