File size: 5,840 Bytes
7385f67 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
import json
import queue
import time
import secrets
import paho.mqtt.client as mqtt
import streamlit as st
import matplotlib.pyplot as plt
# Initialize Streamlit app
st.title("Motor Control Panel")
# Description and context
st.markdown(
"""
This application accesses a public test demo of a motor control located in
Toronto, ON, Canada (as of 2024-07-27). Send speed commands to the motor and
visualize the RPM data.
For more context, you
can refer to this [Colab
notebook](https://colab.research.google.com/github/sparks-baird/self-driving-lab-demo/blob/main/notebooks/4.2-paho-mqtt-colab-sdl-demo-test.ipynb)
and the [self-driving-lab-demo
project](https://github.com/sparks-baird/self-driving-lab-demo). You may also be
interested in the Acceleration Consortium's ["Hello World"
microcourse](https://ac-microcourses.readthedocs.io/en/latest/courses/hello-world/index.html)
for self-driving labs.
"""
)
with st.form("mqtt_form"):
# MQTT Configuration
HIVEMQ_HOST = st.text_input(
"Enter your HiveMQ host:",
"248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud",
type="password",
)
HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "sgbaird")
HIVEMQ_PASSWORD = st.text_input(
"Enter your HiveMQ password:", "D.Pq5gYtejYbU#L", type="password"
)
PORT = st.number_input(
"Enter the port number:", min_value=1, max_value=65535, value=8883
)
# User input for the Pico ID
PICO_ID = st.text_input("Enter your Pico ID:", "test-fan", type="password")
# Sliders for speed values
speed = st.slider("Select the Speed value:", min_value=0, max_value=100, value=0)
submit_button = st.form_submit_button(label="Send Speed Command")
command_topic = f"sdl-demo/picow/{PICO_ID}/motor/speed/"
rpm_data_topic = f"sdl-demo/picow/{PICO_ID}/motor/rpm/"
# random session id to keep track of the session and filter out old data
experiment_id = secrets.token_hex(4) # 4 bytes = 8 characters
rpm_data_queue = queue.Queue()
# Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource
@st.cache_resource
def create_paho_client(tls=True):
client = mqtt.Client(protocol=mqtt.MQTTv5)
if tls:
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
return client
# Define setup separately since sensor_data is dynamic
def setup_paho_client(
client, rpm_data_topic, hostname, username, password=None, port=8883
):
def on_message(client, userdata, msg):
rpm_data_queue.put(json.loads(msg.payload))
def on_connect(client, userdata, flags, rc, properties=None):
if rc != 0:
print("Connected with result code " + str(rc))
client.subscribe(rpm_data_topic, qos=1)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username, password)
client.connect(hostname, port)
client.loop_start() # Use a non-blocking loop
return client
def send_command(client, command_topic, msg):
print("Sending command...")
result = client.publish(command_topic, json.dumps(msg), qos=2)
result.wait_for_publish() # Ensure the message is sent
if result.rc == mqtt.MQTT_ERR_SUCCESS:
print(f"Command sent: {msg} to topic {command_topic}")
else:
print(f"Failed to send command: {result.rc}")
# Helper function to plot RPM data
def plot_rpm_data(ax1, ax2, rpm_data):
times = [data["time"] for data in rpm_data]
rpm_values = [data["rpm"] for data in rpm_data]
ax1.clear()
ax1.plot(times, rpm_values, "-o", color="blue")
ax1.set_xlabel("Time (s)", fontsize=14)
ax1.set_ylabel("RPM", fontsize=14)
ax1.set_title("Motor RPM over Time", fontsize=16)
ax2.clear()
ax2.plot(times, rpm_values, "-o", color="blue")
ax2.set_ylim(0, max(1.1 * max(rpm_values), 1.1)) # Set fixed y-axis limits
ax2.set_xlabel("Time (s)")
ax2.set_ylabel("RPM (scaled)")
ax2.set_title("RPM Data Stream")
st.pyplot(fig)
# Function to check and update RPM data periodically
def check_rpm_data():
rpm_data = []
global fig, ax1, ax2
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
start_time = time.time()
last_data_time = start_time
timeout = 15 # Timeout duration in seconds
rpm_display = st.empty()
while True:
current_time = time.time()
if not rpm_data_queue.empty():
data = rpm_data_queue.get()
elapsed_time = current_time - start_time
data["time"] = elapsed_time
rpm_data.append(data)
last_data_time = current_time
with rpm_display.container():
plot_rpm_data(ax1, ax2, rpm_data)
st.write("RPM Data Received:", data)
if current_time - last_data_time > timeout:
st.error(
f"No RPM data received for {timeout} seconds. Please check the system."
)
break
time.sleep(1)
# Publish button
if submit_button:
if not PICO_ID or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD:
st.error("Please enter all required fields.")
else:
st.info(
f"Please wait while the command {speed} for experiment {experiment_id} is sent..."
)
client = create_paho_client(tls=True)
client = setup_paho_client(
client,
rpm_data_topic,
HIVEMQ_HOST,
HIVEMQ_USERNAME,
password=HIVEMQ_PASSWORD,
port=int(PORT),
)
command_msg = {"speed": speed, "_experiment_id": experiment_id}
send_command(client, command_topic, command_msg)
st.success(f"Command {speed} for experiment {experiment_id} sent successfully!")
# Start periodic check for RPM data
check_rpm_data()
|