bambu-a1-mini / app.py
Neil-YL's picture
Back to latest
911cfbb verified
import gradio as gr
import paho.mqtt.client as mqtt
import json
import time
import os
import requests
import traceback
from PIL import Image
import io
import threading
print("Starting MQTT Print Control...")
# Environment variables
BAMBU_HOST = os.environ.get("BAMBU_HOST", "default_host")
BAMBU_PORT = int(os.environ.get("BAMBU_PORT", 1883))
BAMBU_USERNAME = os.environ.get("BAMBU_USERNAME", "default_user")
BAMBU_PASSWORD = os.environ.get("BAMBU_PASSWORD", "default_pass")
DEFAULT_SERIAL = os.environ.get("DEFAULT_SERIAL", "default_serial")
RPI_HOST = os.environ.get("RPI_HOST", "default_host")
RPI_PORT = int(os.environ.get("RPI_PORT", 1883))
RPI_USERNAME = os.environ.get("RPI_USERNAME", "default_user")
RPI_PASSWORD = os.environ.get("RPI_PASSWORD", "default_pass")
latest_data = {}
# Centralized MQTT client setup
def setup_mqtt_client(host, port, username, password, on_message):
client = mqtt.Client()
client.username_pw_set(username, password)
client.on_message = on_message
client.connect(host, port)
client.loop_start()
return client
# Single callback responsibility
def bambu_on_message(client, userdata, message):
global latest_data
latest_data.update(json.loads(message.payload))
latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
def rpi_on_message(client, userdata, message):
global latest_data
latest_data.update(json.loads(message.payload))
latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# Centralized error handling
def main_logic():
global bambu_client, rpi_client
try:
bambu_client = setup_mqtt_client(
BAMBU_HOST, BAMBU_PORT, BAMBU_USERNAME, BAMBU_PASSWORD, bambu_on_message
)
rpi_client = setup_mqtt_client(
RPI_HOST, RPI_PORT, RPI_USERNAME, RPI_PASSWORD, rpi_on_message
)
bambu_client.subscribe(f"bambu_a1_mini/response/{DEFAULT_SERIAL}")
rpi_client.subscribe(f"bambu_a1_mini/response/{DEFAULT_SERIAL}")
except Exception:
print("MQTT setup failed:")
traceback.print_exc()
# Sending print parameters functionality
def send_print_parameters(nozzle_temp, bed_temp, print_speed, fan_speed):
try:
params = {
"nozzle_temp": nozzle_temp,
"bed_temp": bed_temp,
"print_speed": print_speed,
"fan_speed": fan_speed,
"command": "generate_gcode",
}
topic = f"bambu_a1_mini/request/{DEFAULT_SERIAL}"
rpi_client.publish(topic, json.dumps(params))
return "Parameters sent successfully."
except Exception:
print("Failed to send parameters:")
traceback.print_exc()
return "Failed to send parameters."
# Simple Gradio interface
def get_status():
return (
latest_data.get("status", "N/A"),
latest_data.get("bed_temperature", "N/A"),
latest_data.get("nozzle_temperature", "N/A"),
latest_data.get("update_time", "Waiting for data..."),
)
# Image capture functionality
def capture_image():
try:
img_url = latest_data.get("image_url", None)
if img_url and img_url != "N/A":
response = requests.get(img_url, timeout=10)
if response.status_code == 200:
return Image.open(io.BytesIO(response.content))
return None
except Exception:
print("Image capture failed:")
traceback.print_exc()
return None
with gr.Blocks(title="Simplified Printer Control") as demo:
status = gr.Textbox(label="Status")
bed_temp = gr.Textbox(label="Bed Temperature")
nozzle_temp = gr.Textbox(label="Nozzle Temperature")
update_time = gr.Textbox(label="Last Update")
image_display = gr.Image(label="Captured Image", type="pil")
nozzle_input = gr.Slider(
label="Nozzle Temperature", minimum=180, maximum=250, value=200
)
bed_input = gr.Slider(label="Bed Temperature", minimum=40, maximum=100, value=60)
speed_input = gr.Slider(label="Print Speed", minimum=20, maximum=150, value=60)
fan_input = gr.Slider(label="Fan Speed", minimum=0, maximum=100, value=100)
refresh = gr.Button("Refresh")
refresh.click(fn=get_status, outputs=[status, bed_temp, nozzle_temp, update_time])
capture_btn = gr.Button("Capture Image")
capture_btn.click(fn=capture_image, outputs=image_display)
send_params_btn = gr.Button("Send Parameters")
send_params_btn.click(
fn=send_print_parameters,
inputs=[nozzle_input, bed_input, speed_input, fan_input],
outputs=status,
)
print("App initializing...")
threading.Thread(target=main_logic, daemon=True).start()
demo.launch()