|
|
import asyncio
|
|
|
import base64
|
|
|
import os
|
|
|
import random
|
|
|
import logging
|
|
|
import functools
|
|
|
from typing import Any, Callable, Optional, Tuple, Union, TypeVar, Awaitable
|
|
|
|
|
|
from playwright._impl._errors import Error as PlaywrightError
|
|
|
from playwright._impl._errors import TimeoutError, TargetClosedError
|
|
|
from playwright.async_api import Download, Page
|
|
|
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CUA_KEY_TO_PLAYWRIGHT_KEY = {
|
|
|
"/": "Divide",
|
|
|
"\\": "Backslash",
|
|
|
"alt": "Alt",
|
|
|
"arrowdown": "ArrowDown",
|
|
|
"arrowleft": "ArrowLeft",
|
|
|
"arrowright": "ArrowRight",
|
|
|
"arrowup": "ArrowUp",
|
|
|
"backspace": "Backspace",
|
|
|
"capslock": "CapsLock",
|
|
|
"cmd": "Meta",
|
|
|
"ctrl": "Control",
|
|
|
"delete": "Delete",
|
|
|
"end": "End",
|
|
|
"enter": "Enter",
|
|
|
"esc": "Escape",
|
|
|
"home": "Home",
|
|
|
"insert": "Insert",
|
|
|
"option": "Alt",
|
|
|
"pagedown": "PageDown",
|
|
|
"pageup": "PageUp",
|
|
|
"shift": "Shift",
|
|
|
"space": " ",
|
|
|
"super": "Meta",
|
|
|
"tab": "Tab",
|
|
|
"win": "Meta",
|
|
|
}
|
|
|
|
|
|
F = TypeVar("F", bound=Callable[..., Awaitable[Any]])
|
|
|
|
|
|
|
|
|
def handle_target_closed(max_retries: int = 2, timeout_secs: int = 30):
|
|
|
"""
|
|
|
Decorator to handle TargetClosedError and tunnel connection errors by attempting to recover the page.
|
|
|
|
|
|
Args:
|
|
|
max_retries: Maximum number of retry attempts
|
|
|
timeout_secs: Timeout for page operations during recovery
|
|
|
"""
|
|
|
|
|
|
def decorator(func: F) -> F:
|
|
|
@functools.wraps(func)
|
|
|
async def wrapper(*args, **kwargs):
|
|
|
|
|
|
logger = args[0].logger
|
|
|
page = None
|
|
|
if len(args) >= 2 and hasattr(
|
|
|
args[1], "url"
|
|
|
):
|
|
|
page = args[1]
|
|
|
|
|
|
retries = 0
|
|
|
last_error = None
|
|
|
|
|
|
while retries <= max_retries:
|
|
|
try:
|
|
|
return await func(*args, **kwargs)
|
|
|
except (TargetClosedError, PlaywrightError) as e:
|
|
|
|
|
|
is_tunnel_error = "net::ERR_TUNNEL_CONNECTION_FAILED" in str(e)
|
|
|
is_target_closed = isinstance(
|
|
|
e, TargetClosedError
|
|
|
) or "Target page, context or browser has been closed" in str(e)
|
|
|
|
|
|
if not (is_tunnel_error or is_target_closed):
|
|
|
|
|
|
raise e
|
|
|
|
|
|
last_error = e
|
|
|
retries += 1
|
|
|
|
|
|
if retries > max_retries:
|
|
|
raise e
|
|
|
|
|
|
if page is None:
|
|
|
|
|
|
raise e
|
|
|
|
|
|
error_type = (
|
|
|
"tunnel connection" if is_tunnel_error else "target closed"
|
|
|
)
|
|
|
logger.warning(
|
|
|
f"{error_type} error in {func.__name__}, attempting recovery (retry {retries}/{max_retries})"
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
|
|
|
await _recover_page(page, timeout_secs, logger)
|
|
|
|
|
|
await asyncio.sleep(0.5)
|
|
|
except Exception as recovery_error:
|
|
|
logger.error(f"Page recovery failed: {recovery_error}")
|
|
|
|
|
|
raise e from recovery_error
|
|
|
|
|
|
|
|
|
raise last_error
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
return decorator
|
|
|
|
|
|
|
|
|
async def _recover_page(page: Page, timeout_secs: int = 30, logger=None) -> None:
|
|
|
"""
|
|
|
Attempt to recover a closed page by reloading it.
|
|
|
|
|
|
Args:
|
|
|
page: The Playwright page object to recover
|
|
|
timeout_secs: Timeout for recovery operations
|
|
|
"""
|
|
|
logger = logger or logging.getLogger("playwright_controller")
|
|
|
try:
|
|
|
|
|
|
await page.evaluate("1", timeout=1000)
|
|
|
|
|
|
return
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
|
|
|
await page.evaluate("window.stop()", timeout=2000)
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
|
|
|
await page.reload(timeout=timeout_secs * 1000)
|
|
|
await page.wait_for_load_state("load", timeout=timeout_secs * 1000)
|
|
|
logger.info("playwright_controller._recover_page(): Page recovery successful")
|
|
|
except Exception as e:
|
|
|
logger.error(f"playwright_controller._recover_page(): Page reload failed: {e}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
current_url = page.url
|
|
|
if current_url and current_url != "about:blank":
|
|
|
await page.goto(current_url, timeout=timeout_secs * 1000)
|
|
|
await page.wait_for_load_state("load", timeout=timeout_secs * 1000)
|
|
|
logger.info(
|
|
|
"playwright_controller._recover_page(): Page recovery via goto successful"
|
|
|
)
|
|
|
else:
|
|
|
raise Exception(
|
|
|
"playwright_controller._recover_page(): No valid URL to navigate to"
|
|
|
)
|
|
|
except Exception as goto_error:
|
|
|
raise Exception(
|
|
|
f"playwright_controller._recover_page(): All recovery methods failed. Reload error: {e}, Goto error: {goto_error}"
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def handle_target_closed_with_context(max_retries: int = 2, timeout_secs: int = 30):
|
|
|
"""
|
|
|
Enhanced decorator that can also handle browser context recreation.
|
|
|
Use this for critical operations where you have access to the browser context.
|
|
|
"""
|
|
|
|
|
|
def decorator(func: F) -> F:
|
|
|
@functools.wraps(func)
|
|
|
async def wrapper(*args, **kwargs):
|
|
|
logger = args[0].logger
|
|
|
page = None
|
|
|
if len(args) >= 2 and hasattr(args[1], "url"):
|
|
|
page = args[1]
|
|
|
|
|
|
retries = 0
|
|
|
last_error = None
|
|
|
|
|
|
while retries <= max_retries:
|
|
|
try:
|
|
|
return await func(*args, **kwargs)
|
|
|
except (TargetClosedError, PlaywrightError) as e:
|
|
|
|
|
|
is_tunnel_error = "net::ERR_TUNNEL_CONNECTION_FAILED" in str(e)
|
|
|
is_target_closed = isinstance(
|
|
|
e, TargetClosedError
|
|
|
) or "Target page, context or browser has been closed" in str(e)
|
|
|
|
|
|
if not (is_tunnel_error or is_target_closed):
|
|
|
|
|
|
raise e
|
|
|
|
|
|
last_error = e
|
|
|
retries += 1
|
|
|
|
|
|
if retries > max_retries:
|
|
|
raise e
|
|
|
|
|
|
if page is None:
|
|
|
raise e
|
|
|
|
|
|
error_type = (
|
|
|
"tunnel connection" if is_tunnel_error else "target closed"
|
|
|
)
|
|
|
logger.warning(
|
|
|
f"playwright_controller.handle_target_closed_with_context(): {error_type} error in {func.__name__}, attempting enhanced recovery (retry {retries}/{max_retries})"
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
|
|
|
context = page.context
|
|
|
browser = context.browser
|
|
|
|
|
|
if browser and not browser.is_connected():
|
|
|
|
|
|
logger.error(
|
|
|
"playwright_controller.handle_target_closed_with_context(): Browser connection lost - cannot recover automatically"
|
|
|
)
|
|
|
raise e
|
|
|
|
|
|
|
|
|
await _recover_page(page, timeout_secs)
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
|
|
except Exception as recovery_error:
|
|
|
logger.error(
|
|
|
f"playwright_controller.handle_target_closed_with_context(): Enhanced page recovery failed: {recovery_error}"
|
|
|
)
|
|
|
raise e from recovery_error
|
|
|
|
|
|
raise last_error
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
return decorator
|
|
|
|
|
|
|
|
|
class PlaywrightController:
|
|
|
def __init__(
|
|
|
self,
|
|
|
animate_actions: bool = False,
|
|
|
downloads_folder: Optional[str] = None,
|
|
|
viewport_width: int = 1440,
|
|
|
viewport_height: int = 900,
|
|
|
_download_handler: Optional[Callable[[Download], None]] = None,
|
|
|
to_resize_viewport: bool = True,
|
|
|
single_tab_mode: bool = False,
|
|
|
sleep_after_action: int = 10,
|
|
|
timeout_load: int = 1,
|
|
|
logger=None,
|
|
|
) -> None:
|
|
|
"""
|
|
|
A controller for Playwright to interact with web pages.
|
|
|
animate_actions: If True, actions will be animated.
|
|
|
downloads_folder: The folder to save downloads to.
|
|
|
viewport_width: The width of the viewport.
|
|
|
viewport_height: The height of the viewport.
|
|
|
_download_handler: A handler for downloads.
|
|
|
to_resize_viewport: If True, the viewport will be resized.
|
|
|
single_tab_mode (bool): If True, forces navigation to happen in the same tab rather than opening new tabs/windows.
|
|
|
|
|
|
"""
|
|
|
self.animate_actions = animate_actions
|
|
|
self.downloads_folder = downloads_folder
|
|
|
self.viewport_width = viewport_width
|
|
|
self.viewport_height = viewport_height
|
|
|
self._download_handler = _download_handler
|
|
|
self.to_resize_viewport = to_resize_viewport
|
|
|
self.single_tab_mode = single_tab_mode
|
|
|
self._sleep_after_action = sleep_after_action
|
|
|
self._timeout_load = timeout_load
|
|
|
self.logger = logger or logging.getLogger("playwright_controller")
|
|
|
|
|
|
|
|
|
self.last_cursor_position: Tuple[float, float] = (0.0, 0.0)
|
|
|
|
|
|
async def sleep(self, page: Page, duration: Union[int, float]) -> None:
|
|
|
await asyncio.sleep(duration)
|
|
|
|
|
|
@handle_target_closed()
|
|
|
async def on_new_page(self, page: Page) -> None:
|
|
|
assert page is not None
|
|
|
|
|
|
await page.bring_to_front()
|
|
|
page.on("download", self._download_handler)
|
|
|
if self.to_resize_viewport and self.viewport_width and self.viewport_height:
|
|
|
await page.set_viewport_size(
|
|
|
{"width": self.viewport_width, "height": self.viewport_height}
|
|
|
)
|
|
|
await self.sleep(page, 0.2)
|
|
|
try:
|
|
|
await page.wait_for_load_state(timeout=30000)
|
|
|
except PlaywrightTimeoutError:
|
|
|
self.logger.error("WARNING: Page load timeout, page might not be loaded")
|
|
|
|
|
|
await page.evaluate("window.stop()")
|
|
|
|
|
|
@handle_target_closed()
|
|
|
async def _ensure_page_ready(self, page: Page) -> None:
|
|
|
assert page is not None
|
|
|
await self.on_new_page(page)
|
|
|
|
|
|
@handle_target_closed()
|
|
|
async def get_screenshot(self, page: Page, path: str | None = None) -> bytes:
|
|
|
"""
|
|
|
Capture a screenshot of the current page.
|
|
|
|
|
|
Args:
|
|
|
page (Page): The Playwright page object.
|
|
|
path (str, optional): The file path to save the screenshot. If None, the screenshot will be returned as bytes. Default: None
|
|
|
"""
|
|
|
await self._ensure_page_ready(page)
|
|
|
try:
|
|
|
screenshot = await page.screenshot(path=path, timeout=15000)
|
|
|
return screenshot
|
|
|
except Exception:
|
|
|
await page.evaluate("window.stop()")
|
|
|
|
|
|
screenshot = await page.screenshot(path=path, timeout=15000)
|
|
|
return screenshot
|
|
|
|
|
|
@handle_target_closed()
|
|
|
async def back(self, page: Page) -> None:
|
|
|
await self._ensure_page_ready(page)
|
|
|
await page.go_back()
|
|
|
|
|
|
@handle_target_closed()
|
|
|
async def visit_page(self, page: Page, url: str) -> Tuple[bool, bool]:
|
|
|
await self._ensure_page_ready(page)
|
|
|
reset_prior_metadata_hash = False
|
|
|
reset_last_download = False
|
|
|
try:
|
|
|
|
|
|
await page.goto(url)
|
|
|
await page.wait_for_load_state()
|
|
|
reset_prior_metadata_hash = True
|
|
|
except Exception as e_outer:
|
|
|
|
|
|
if self.downloads_folder and "net::ERR_ABORTED" in str(e_outer):
|
|
|
async with page.expect_download() as download_info:
|
|
|
try:
|
|
|
await page.goto(url)
|
|
|
except Exception as e_inner:
|
|
|
if "net::ERR_ABORTED" in str(e_inner):
|
|
|
pass
|
|
|
else:
|
|
|
raise e_inner
|
|
|
download = await download_info.value
|
|
|
fname = os.path.join(
|
|
|
self.downloads_folder, download.suggested_filename
|
|
|
)
|
|
|
await download.save_as(fname)
|
|
|
message = f"<body style=\"margin: 20px;\"><h1>Successfully downloaded '{download.suggested_filename}' to local path:<br><br>{fname}</h1></body>"
|
|
|
await page.goto(
|
|
|
"data:text/html;base64,"
|
|
|
+ base64.b64encode(message.encode("utf-8")).decode("utf-8")
|
|
|
)
|
|
|
reset_last_download = True
|
|
|
else:
|
|
|
raise e_outer
|
|
|
return reset_prior_metadata_hash, reset_last_download
|
|
|
|
|
|
@handle_target_closed()
|
|
|
async def page_down(
|
|
|
self, page: Page, amount: int = 400, full_page: bool = False
|
|
|
) -> None:
|
|
|
await self._ensure_page_ready(page)
|
|
|
if full_page:
|
|
|
await page.mouse.wheel(0, self.viewport_height - 50)
|
|
|
else:
|
|
|
await page.mouse.wheel(0, amount)
|
|
|
|
|
|
@handle_target_closed()
|
|
|
async def page_up(
|
|
|
self, page: Page, amount: int = 400, full_page: bool = False
|
|
|
) -> None:
|
|
|
await self._ensure_page_ready(page)
|
|
|
if full_page:
|
|
|
await page.mouse.wheel(0, -self.viewport_height + 50)
|
|
|
else:
|
|
|
await page.mouse.wheel(0, -amount)
|
|
|
|
|
|
async def gradual_cursor_animation(
|
|
|
self, page: Page, start_x: float, start_y: float, end_x: float, end_y: float
|
|
|
) -> None:
|
|
|
|
|
|
|
|
|
await page.evaluate("""
|
|
|
(function() {
|
|
|
if (!document.getElementById('red-cursor')) {
|
|
|
let cursor = document.createElement('div');
|
|
|
cursor.id = 'red-cursor';
|
|
|
cursor.style.width = '10px';
|
|
|
cursor.style.height = '10px';
|
|
|
cursor.style.backgroundColor = 'red';
|
|
|
cursor.style.position = 'absolute';
|
|
|
cursor.style.borderRadius = '50%';
|
|
|
cursor.style.zIndex = '10000';
|
|
|
document.body.appendChild(cursor);
|
|
|
}
|
|
|
})();
|
|
|
""")
|
|
|
|
|
|
steps = 20
|
|
|
for step in range(steps):
|
|
|
x = start_x + (end_x - start_x) * (step / steps)
|
|
|
y = start_y + (end_y - start_y) * (step / steps)
|
|
|
|
|
|
await page.evaluate(f"""
|
|
|
(function() {{
|
|
|
let cursor = document.getElementById('red-cursor');
|
|
|
if (cursor) {{
|
|
|
cursor.style.left = '{x}px';
|
|
|
cursor.style.top = '{y}px';
|
|
|
}}
|
|
|
}})();
|
|
|
""")
|
|
|
await asyncio.sleep(0.05)
|
|
|
|
|
|
self.last_cursor_position = (end_x, end_y)
|
|
|
await asyncio.sleep(1.0)
|
|
|
|
|
|
@handle_target_closed()
|
|
|
async def click_coords(self, page: Page, x: float, y: float) -> None:
|
|
|
new_page: Page | None = None
|
|
|
await self._ensure_page_ready(page)
|
|
|
|
|
|
if self.animate_actions:
|
|
|
|
|
|
start_x, start_y = self.last_cursor_position
|
|
|
await self.gradual_cursor_animation(page, start_x, start_y, x, y)
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
try:
|
|
|
|
|
|
async with page.expect_event("popup", timeout=1000) as page_info:
|
|
|
await page.mouse.click(x, y, delay=10)
|
|
|
new_page = await page_info.value
|
|
|
assert isinstance(new_page, Page)
|
|
|
await self.on_new_page(new_page)
|
|
|
except TimeoutError:
|
|
|
pass
|
|
|
else:
|
|
|
try:
|
|
|
|
|
|
async with page.expect_event("popup", timeout=1000) as page_info:
|
|
|
await page.mouse.click(x, y, delay=10)
|
|
|
new_page = await page_info.value
|
|
|
assert isinstance(new_page, Page)
|
|
|
await self.on_new_page(new_page)
|
|
|
except TimeoutError:
|
|
|
pass
|
|
|
return new_page
|
|
|
|
|
|
@handle_target_closed()
|
|
|
async def hover_coords(self, page: Page, x: float, y: float) -> None:
|
|
|
"""
|
|
|
Hovers the mouse over the specified coordinates.
|
|
|
|
|
|
Args:
|
|
|
page (Page): The Playwright page object.
|
|
|
x (float): The x coordinate to hover over.
|
|
|
y (float): The y coordinate to hover over.
|
|
|
"""
|
|
|
await self._ensure_page_ready(page)
|
|
|
|
|
|
if self.animate_actions:
|
|
|
|
|
|
start_x, start_y = self.last_cursor_position
|
|
|
await self.gradual_cursor_animation(page, start_x, start_y, x, y)
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
await page.mouse.move(x, y)
|
|
|
|
|
|
@handle_target_closed()
|
|
|
async def fill_coords(
|
|
|
self,
|
|
|
page: Page,
|
|
|
x: float,
|
|
|
y: float,
|
|
|
value: str,
|
|
|
press_enter: bool = True,
|
|
|
delete_existing_text: bool = False,
|
|
|
) -> None:
|
|
|
await self._ensure_page_ready(page)
|
|
|
new_page: Page | None = None
|
|
|
|
|
|
if self.animate_actions:
|
|
|
|
|
|
start_x, start_y = self.last_cursor_position
|
|
|
await self.gradual_cursor_animation(page, start_x, start_y, x, y)
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
await page.mouse.click(x, y)
|
|
|
|
|
|
if delete_existing_text:
|
|
|
await page.keyboard.press("ControlOrMeta+A")
|
|
|
await page.keyboard.press("Backspace")
|
|
|
|
|
|
|
|
|
if len(value) < 100:
|
|
|
delay_typing_speed = 50 + 100 * random.random()
|
|
|
else:
|
|
|
delay_typing_speed = 10
|
|
|
|
|
|
if self.animate_actions:
|
|
|
try:
|
|
|
|
|
|
async with page.expect_event("popup", timeout=1000) as page_info:
|
|
|
try:
|
|
|
await page.keyboard.type(value)
|
|
|
except PlaywrightError:
|
|
|
await page.keyboard.type(value, delay=delay_typing_speed)
|
|
|
if press_enter:
|
|
|
await page.keyboard.press("Enter")
|
|
|
new_page = await page_info.value
|
|
|
assert isinstance(new_page, Page)
|
|
|
await self.on_new_page(new_page)
|
|
|
except TimeoutError:
|
|
|
pass
|
|
|
else:
|
|
|
try:
|
|
|
|
|
|
async with page.expect_event("popup", timeout=1000) as page_info:
|
|
|
try:
|
|
|
await page.keyboard.type(value)
|
|
|
except PlaywrightError:
|
|
|
await page.keyboard.type(value, delay=delay_typing_speed)
|
|
|
if press_enter:
|
|
|
await page.keyboard.press("Enter")
|
|
|
new_page = await page_info.value
|
|
|
assert isinstance(new_page, Page)
|
|
|
await self.on_new_page(new_page)
|
|
|
except TimeoutError:
|
|
|
pass
|
|
|
|
|
|
return new_page
|
|
|
|
|
|
async def keypress(self, page: Page, keys: list[str]) -> None:
|
|
|
"""
|
|
|
Press specified keys in sequence.
|
|
|
|
|
|
Args:
|
|
|
page (Page): The Playwright page object
|
|
|
keys (List[str]): List of keys to press
|
|
|
"""
|
|
|
await self._ensure_page_ready(page)
|
|
|
mapped_keys = [CUA_KEY_TO_PLAYWRIGHT_KEY.get(key.lower(), key) for key in keys]
|
|
|
try:
|
|
|
for key in mapped_keys:
|
|
|
await page.keyboard.down(key)
|
|
|
for key in reversed(mapped_keys):
|
|
|
await page.keyboard.up(key)
|
|
|
except Exception as e:
|
|
|
raise RuntimeError(
|
|
|
f"I tried to keypress(keys={keys}), but I got an error: {e}"
|
|
|
) from None
|
|
|
|
|
|
@handle_target_closed()
|
|
|
async def wait_for_load_state(
|
|
|
self, page: Page, state: str = "load", timeout: Optional[int] = None
|
|
|
) -> None:
|
|
|
"""Wait for the page to reach a specific load state."""
|
|
|
await page.wait_for_load_state(state, timeout=timeout)
|
|
|
|
|
|
@handle_target_closed()
|
|
|
async def get_page_url(self, page: Page) -> str:
|
|
|
"""Get the current page URL."""
|
|
|
await self._ensure_page_ready(page)
|
|
|
return page.url
|
|
|
|