from prefect import flow,get_run_logger, pause_flow_run, settings from prefect.blocks.notifications import SlackWebhook from prefect.context import get_run_context from DB_utils import set_maintenance_status,generate_empty_well,insert_maintenance_log from prefect.input import RunInput class UserInput(RunInput): Maintenance_Completed : bool name: str @flow def request_wells_maintenance(maintenance_type): MESSAGE = "HITL request" logger = get_run_logger() slack_block = SlackWebhook.load("prefect-test") message = str(MESSAGE) flow_run = get_run_context().flow_run if flow_run and settings.PREFECT_UI_URL: flow_run_url = ( f"{settings.PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run.id}" ) message += f"\n\nOT2-LCM requests a {maintenance_type}, please open the <{flow_run_url}|paused flow run>, complete with your user name and then click 'Resume'" slack_block.notify(message) description = f""" OT2-LCM requests a {maintenance_type}! Please finish the maintenance and enter your name: """ user_input = pause_flow_run( wait_for_input=UserInput.with_initial_data( description=description ), timeout = 3000 #Should we have a timeout, or just let it waits until maintenance complete? ) #Same as above, and even without this timeout handler, following database update codes will not run if not user_input: logger.warning("[DEBUG]Flow resumed automatically due to timeout.") return "Maintenance flow timed out with no user interaction. " \ "The requested maintenance was not carried out." if maintenance_type == "wellplate_maintenance": generate_empty_well() msg_out = f"Updating wells status on DB by {user_input.name}" logger.info(msg_out) insert_maintenance_log(maintenance_type, user_input.name) set_maintenance_status(maintenance_type,0) return msg_out