import json import queue import time import secrets import paho.mqtt.client as mqtt import streamlit as st import matplotlib.pyplot as plt # Initialize Streamlit app st.title("Motor Control Panel") # Description and context st.markdown( """ This application accesses a public test demo of a motor control located in Toronto, ON, Canada (as of 2024-07-27). Send speed commands to the motor and visualize the RPM data. For more context, you can refer to this [Colab notebook](https://colab.research.google.com/github/sparks-baird/self-driving-lab-demo/blob/main/notebooks/4.2-paho-mqtt-colab-sdl-demo-test.ipynb) and the [self-driving-lab-demo project](https://github.com/sparks-baird/self-driving-lab-demo). You may also be interested in the Acceleration Consortium's ["Hello World" microcourse](https://ac-microcourses.readthedocs.io/en/latest/courses/hello-world/index.html) for self-driving labs. """ ) with st.form("mqtt_form"): # MQTT Configuration HIVEMQ_HOST = st.text_input( "Enter your HiveMQ host:", "248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud", type="password", ) HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "sgbaird") HIVEMQ_PASSWORD = st.text_input( "Enter your HiveMQ password:", "D.Pq5gYtejYbU#L", type="password" ) PORT = st.number_input( "Enter the port number:", min_value=1, max_value=65535, value=8883 ) # User input for the Pico ID PICO_ID = st.text_input("Enter your Pico ID:", "test-fan", type="password") # Sliders for speed values speed = st.slider("Select the Speed value:", min_value=0, max_value=100, value=0) submit_button = st.form_submit_button(label="Send Speed Command") command_topic = f"sdl-demo/picow/{PICO_ID}/motor/speed/" rpm_data_topic = f"sdl-demo/picow/{PICO_ID}/motor/rpm/" # random session id to keep track of the session and filter out old data experiment_id = secrets.token_hex(4) # 4 bytes = 8 characters rpm_data_queue = queue.Queue() # Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource @st.cache_resource def create_paho_client(tls=True): client = mqtt.Client(protocol=mqtt.MQTTv5) if tls: client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT) return client # Define setup separately since sensor_data is dynamic def setup_paho_client( client, rpm_data_topic, hostname, username, password=None, port=8883 ): def on_message(client, userdata, msg): rpm_data_queue.put(json.loads(msg.payload)) def on_connect(client, userdata, flags, rc, properties=None): if rc != 0: print("Connected with result code " + str(rc)) client.subscribe(rpm_data_topic, qos=1) client.on_connect = on_connect client.on_message = on_message client.username_pw_set(username, password) client.connect(hostname, port) client.loop_start() # Use a non-blocking loop return client def send_command(client, command_topic, msg): print("Sending command...") result = client.publish(command_topic, json.dumps(msg), qos=2) result.wait_for_publish() # Ensure the message is sent if result.rc == mqtt.MQTT_ERR_SUCCESS: print(f"Command sent: {msg} to topic {command_topic}") else: print(f"Failed to send command: {result.rc}") # Helper function to plot RPM data def plot_rpm_data(ax1, ax2, rpm_data): times = [data["time"] for data in rpm_data] rpm_values = [data["rpm"] for data in rpm_data] ax1.clear() ax1.plot(times, rpm_values, "-o", color="blue") ax1.set_xlabel("Time (s)", fontsize=14) ax1.set_ylabel("RPM", fontsize=14) ax1.set_title("Motor RPM over Time", fontsize=16) ax2.clear() ax2.plot(times, rpm_values, "-o", color="blue") ax2.set_ylim(0, max(1.1 * max(rpm_values), 1.1)) # Set fixed y-axis limits ax2.set_xlabel("Time (s)") ax2.set_ylabel("RPM (scaled)") ax2.set_title("RPM Data Stream") st.pyplot(fig) # Function to check and update RPM data periodically def check_rpm_data(): rpm_data = [] global fig, ax1, ax2 fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8)) start_time = time.time() last_data_time = start_time timeout = 15 # Timeout duration in seconds rpm_display = st.empty() while True: current_time = time.time() if not rpm_data_queue.empty(): data = rpm_data_queue.get() elapsed_time = current_time - start_time data["time"] = elapsed_time rpm_data.append(data) last_data_time = current_time with rpm_display.container(): plot_rpm_data(ax1, ax2, rpm_data) st.write("RPM Data Received:", data) if current_time - last_data_time > timeout: st.error( f"No RPM data received for {timeout} seconds. Please check the system." ) break time.sleep(1) # Publish button if submit_button: if not PICO_ID or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD: st.error("Please enter all required fields.") else: st.info( f"Please wait while the command {speed} for experiment {experiment_id} is sent..." ) client = create_paho_client(tls=True) client = setup_paho_client( client, rpm_data_topic, HIVEMQ_HOST, HIVEMQ_USERNAME, password=HIVEMQ_PASSWORD, port=int(PORT), ) command_msg = {"speed": speed, "_experiment_id": experiment_id} send_command(client, command_topic, command_msg) st.success(f"Command {speed} for experiment {experiment_id} sent successfully!") # Start periodic check for RPM data check_rpm_data()