|
|
import gradio as gr |
|
|
import paho.mqtt.client as mqtt |
|
|
import json |
|
|
import time |
|
|
import os |
|
|
import requests |
|
|
import traceback |
|
|
from PIL import Image |
|
|
import io |
|
|
import threading |
|
|
|
|
|
print("Starting MQTT Print Control...") |
|
|
|
|
|
|
|
|
BAMBU_HOST = os.environ.get("BAMBU_HOST", "default_host") |
|
|
BAMBU_PORT = int(os.environ.get("BAMBU_PORT", 1883)) |
|
|
BAMBU_USERNAME = os.environ.get("BAMBU_USERNAME", "default_user") |
|
|
BAMBU_PASSWORD = os.environ.get("BAMBU_PASSWORD", "default_pass") |
|
|
DEFAULT_SERIAL = os.environ.get("DEFAULT_SERIAL", "default_serial") |
|
|
|
|
|
RPI_HOST = os.environ.get("RPI_HOST", "default_host") |
|
|
RPI_PORT = int(os.environ.get("RPI_PORT", 1883)) |
|
|
RPI_USERNAME = os.environ.get("RPI_USERNAME", "default_user") |
|
|
RPI_PASSWORD = os.environ.get("RPI_PASSWORD", "default_pass") |
|
|
|
|
|
latest_data = {} |
|
|
|
|
|
|
|
|
|
|
|
def setup_mqtt_client(host, port, username, password, on_message): |
|
|
client = mqtt.Client() |
|
|
client.username_pw_set(username, password) |
|
|
client.on_message = on_message |
|
|
client.connect(host, port) |
|
|
client.loop_start() |
|
|
return client |
|
|
|
|
|
|
|
|
|
|
|
def bambu_on_message(client, userdata, message): |
|
|
global latest_data |
|
|
latest_data.update(json.loads(message.payload)) |
|
|
latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
|
|
|
|
|
|
|
|
def rpi_on_message(client, userdata, message): |
|
|
global latest_data |
|
|
latest_data.update(json.loads(message.payload)) |
|
|
latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
|
|
|
|
|
|
|
|
|
|
|
def main_logic(): |
|
|
global bambu_client, rpi_client |
|
|
try: |
|
|
bambu_client = setup_mqtt_client( |
|
|
BAMBU_HOST, BAMBU_PORT, BAMBU_USERNAME, BAMBU_PASSWORD, bambu_on_message |
|
|
) |
|
|
rpi_client = setup_mqtt_client( |
|
|
RPI_HOST, RPI_PORT, RPI_USERNAME, RPI_PASSWORD, rpi_on_message |
|
|
) |
|
|
bambu_client.subscribe(f"bambu_a1_mini/response/{DEFAULT_SERIAL}") |
|
|
rpi_client.subscribe(f"bambu_a1_mini/response/{DEFAULT_SERIAL}") |
|
|
except Exception: |
|
|
print("MQTT setup failed:") |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
|
|
|
def send_print_parameters(nozzle_temp, bed_temp, print_speed, fan_speed): |
|
|
try: |
|
|
params = { |
|
|
"nozzle_temp": nozzle_temp, |
|
|
"bed_temp": bed_temp, |
|
|
"print_speed": print_speed, |
|
|
"fan_speed": fan_speed, |
|
|
"command": "generate_gcode", |
|
|
} |
|
|
topic = f"bambu_a1_mini/request/{DEFAULT_SERIAL}" |
|
|
rpi_client.publish(topic, json.dumps(params)) |
|
|
return "Parameters sent successfully." |
|
|
except Exception: |
|
|
print("Failed to send parameters:") |
|
|
traceback.print_exc() |
|
|
return "Failed to send parameters." |
|
|
|
|
|
|
|
|
|
|
|
def get_status(): |
|
|
return ( |
|
|
latest_data.get("status", "N/A"), |
|
|
latest_data.get("bed_temperature", "N/A"), |
|
|
latest_data.get("nozzle_temperature", "N/A"), |
|
|
latest_data.get("update_time", "Waiting for data..."), |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def capture_image(): |
|
|
try: |
|
|
img_url = latest_data.get("image_url", None) |
|
|
if img_url and img_url != "N/A": |
|
|
response = requests.get(img_url, timeout=10) |
|
|
if response.status_code == 200: |
|
|
return Image.open(io.BytesIO(response.content)) |
|
|
return None |
|
|
except Exception: |
|
|
print("Image capture failed:") |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Simplified Printer Control") as demo: |
|
|
status = gr.Textbox(label="Status") |
|
|
bed_temp = gr.Textbox(label="Bed Temperature") |
|
|
nozzle_temp = gr.Textbox(label="Nozzle Temperature") |
|
|
update_time = gr.Textbox(label="Last Update") |
|
|
image_display = gr.Image(label="Captured Image", type="pil") |
|
|
|
|
|
nozzle_input = gr.Slider( |
|
|
label="Nozzle Temperature", minimum=180, maximum=250, value=200 |
|
|
) |
|
|
bed_input = gr.Slider(label="Bed Temperature", minimum=40, maximum=100, value=60) |
|
|
speed_input = gr.Slider(label="Print Speed", minimum=20, maximum=150, value=60) |
|
|
fan_input = gr.Slider(label="Fan Speed", minimum=0, maximum=100, value=100) |
|
|
|
|
|
refresh = gr.Button("Refresh") |
|
|
refresh.click(fn=get_status, outputs=[status, bed_temp, nozzle_temp, update_time]) |
|
|
|
|
|
capture_btn = gr.Button("Capture Image") |
|
|
capture_btn.click(fn=capture_image, outputs=image_display) |
|
|
|
|
|
send_params_btn = gr.Button("Send Parameters") |
|
|
send_params_btn.click( |
|
|
fn=send_print_parameters, |
|
|
inputs=[nozzle_input, bed_input, speed_input, fan_input], |
|
|
outputs=status, |
|
|
) |
|
|
|
|
|
print("App initializing...") |
|
|
threading.Thread(target=main_logic, daemon=True).start() |
|
|
demo.launch() |