|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import asyncio |
| 4 | +import logging |
| 5 | +from typing import TYPE_CHECKING, Optional |
| 6 | + |
| 7 | +from zendriver import cdp, util |
| 8 | +from zendriver.core.element import Element |
| 9 | + |
| 10 | +if TYPE_CHECKING: |
| 11 | + from zendriver.core.tab import Tab |
| 12 | + |
| 13 | + |
| 14 | +logger = logging.getLogger(__name__) |
| 15 | + |
| 16 | + |
| 17 | +async def cf_find_interactive_challenge( |
| 18 | + tab: Tab, |
| 19 | +) -> tuple[Element | None, Element | None, Element | None]: |
| 20 | + """ |
| 21 | + Finds the Cloudflare interactive challenge elements. |
| 22 | +
|
| 23 | + This function scans the DOM for shadow roots and iframes that match the |
| 24 | + signature of a Cloudflare challenge. |
| 25 | +
|
| 26 | + Returns: |
| 27 | + A tuple containing the host element, the shadow root element, and the |
| 28 | + challenge iframe element if found, otherwise (None, None, None). |
| 29 | + """ |
| 30 | + logger.debug("Searching for Cloudflare interactive challenge elements...") |
| 31 | + doc = await tab.send(cdp.dom.get_document(-1, True)) |
| 32 | + if not doc: |
| 33 | + logger.debug("DOM document not found.") |
| 34 | + return None, None, None |
| 35 | + |
| 36 | + # Find all nodes that have a "shadow root" (an isolated DOM). |
| 37 | + shadow_host_nodes = util.filter_recurse_all( |
| 38 | + doc, lambda n: hasattr(n, "shadow_roots") and bool(n.shadow_roots) |
| 39 | + ) |
| 40 | + logger.debug(f"Found {len(shadow_host_nodes)} shadow host nodes.") |
| 41 | + |
| 42 | + # Iterate over each shadow DOM host node. |
| 43 | + for host_node in shadow_host_nodes: |
| 44 | + if not host_node.shadow_roots: |
| 45 | + continue |
| 46 | + |
| 47 | + # Iterate over each shadow root within the host. |
| 48 | + for shadow_root_node in host_node.shadow_roots: |
| 49 | + # Create an Element object for the shadow root. |
| 50 | + # The "tree" for a shadow root element is the shadow root node itself. |
| 51 | + shadow_root_element = Element(shadow_root_node, tab, shadow_root_node) |
| 52 | + |
| 53 | + # Check if the shadow root content is the Cloudflare challenge. |
| 54 | + html_content = await shadow_root_element.get_html() |
| 55 | + if "challenges.cloudflare.com" in html_content: |
| 56 | + logger.debug("Found Cloudflare challenge in a shadow root.") |
| 57 | + # If the shadow root is found, search for the specific iframe within it. |
| 58 | + for child_element in shadow_root_element.children: |
| 59 | + if "challenges.cloudflare.com" in await child_element.get_html(): |
| 60 | + # Found! Create the host element and return everything. |
| 61 | + logger.debug("Found challenge iframe.") |
| 62 | + host_element = Element(host_node, tab, doc) |
| 63 | + challenge_iframe = child_element |
| 64 | + return host_element, shadow_root_element, challenge_iframe |
| 65 | + |
| 66 | + # If the loops finish without finding anything, return None. |
| 67 | + logger.debug("Cloudflare interactive challenge not found.") |
| 68 | + return None, None, None |
| 69 | + |
| 70 | + |
| 71 | +async def cf_wait_for_interactive_challenge( |
| 72 | + tab: Tab, timeout: float = 5 |
| 73 | +) -> tuple[Element | None, Element | None, Element | None]: |
| 74 | + """ |
| 75 | + Waits for the Cloudflare challenge iframe to appear and be visible. |
| 76 | +
|
| 77 | + Args: |
| 78 | + timeout: The maximum time in seconds to wait for the elements. |
| 79 | +
|
| 80 | + Returns: |
| 81 | + A tuple containing the host element, shadow root, and iframe element |
| 82 | + if found and visible, otherwise (None, None, None). |
| 83 | + """ |
| 84 | + loop = asyncio.get_running_loop() |
| 85 | + start_time = loop.time() |
| 86 | + |
| 87 | + while loop.time() - start_time < timeout: |
| 88 | + logger.debug("Waiting for challenge elements to appear...") |
| 89 | + ( |
| 90 | + host_element, |
| 91 | + shadow_root_element, |
| 92 | + challenge_iframe, |
| 93 | + ) = await cf_find_interactive_challenge(tab) |
| 94 | + if challenge_iframe and "display: none" not in challenge_iframe.attrs.get( |
| 95 | + "style", "" |
| 96 | + ): |
| 97 | + logger.debug("Cloudflare challenge elements found and visible.") |
| 98 | + return host_element, shadow_root_element, challenge_iframe |
| 99 | + await asyncio.sleep(0.5) |
| 100 | + |
| 101 | + logger.warning( |
| 102 | + f"Timeout: Cloudflare challenge elements not found or not visible within {timeout} seconds." |
| 103 | + ) |
| 104 | + return None, None, None |
| 105 | + |
| 106 | + |
| 107 | +async def cf_is_interactive_challenge_present(tab: Tab, timeout: float = 5) -> bool: |
| 108 | + """ |
| 109 | + Checks if a Cloudflare interactive challenge is present and visible on the page. |
| 110 | +
|
| 111 | + Args: |
| 112 | + timeout: The maximum time in seconds to wait for the challenge. |
| 113 | +
|
| 114 | + Returns: |
| 115 | + True if the challenge is present and visible, False otherwise. |
| 116 | + """ |
| 117 | + logger.debug( |
| 118 | + f"Checking for Cloudflare challenge with a timeout of {timeout} seconds." |
| 119 | + ) |
| 120 | + _, _, challenge_iframe = await cf_wait_for_interactive_challenge(tab, timeout) |
| 121 | + is_present = challenge_iframe is not None |
| 122 | + logger.debug(f"Challenge present: {is_present}") |
| 123 | + return is_present |
| 124 | + |
| 125 | + |
| 126 | +async def verify_cf( |
| 127 | + tab: Tab, |
| 128 | + click_delay: float = 5, |
| 129 | + timeout: float = 15, |
| 130 | + challenge_selector: Optional[str] = None, |
| 131 | + flash_corners: bool = False, |
| 132 | +) -> None: |
| 133 | + """ |
| 134 | + Finds and solves the Cloudflare checkbox challenge. |
| 135 | +
|
| 136 | + The total time for finding and clicking is governed by `timeout`. |
| 137 | +
|
| 138 | + Args: |
| 139 | + click_delay: The delay in seconds between clicks. |
| 140 | + timeout: The total time in seconds to wait for the challenge and solve it. |
| 141 | + challenge_selector: An optional CSS selector for the challenge input element. |
| 142 | + flash_corners: If True, flash the corners of the challenge element. |
| 143 | +
|
| 144 | + Raises: |
| 145 | + TimeoutError: If the checkbox is not found or solved within the timeout. |
| 146 | + """ |
| 147 | + logger.debug("Waiting for Cloudflare checkbox...") |
| 148 | + loop = asyncio.get_running_loop() |
| 149 | + start_time = loop.time() |
| 150 | + |
| 151 | + ( |
| 152 | + host_element, |
| 153 | + shadow_root_element, |
| 154 | + challenge_iframe, |
| 155 | + ) = await cf_wait_for_interactive_challenge(tab, timeout) |
| 156 | + |
| 157 | + if not challenge_iframe: |
| 158 | + raise TimeoutError( |
| 159 | + f"Cloudflare checkbox not found or not visible within {timeout} seconds." |
| 160 | + ) |
| 161 | + |
| 162 | + logger.debug("Cloudflare checkbox found, starting clicks.") |
| 163 | + |
| 164 | + await challenge_iframe.scroll_into_view() |
| 165 | + |
| 166 | + # To get the element's dimensions, its numeric 'node_id' is needed. |
| 167 | + # This ID is obtained from the underlying node object. |
| 168 | + logger.debug( |
| 169 | + f"Getting box model for challenge iframe (node_id: {challenge_iframe.node.node_id})" |
| 170 | + ) |
| 171 | + box_model_result = await tab.send( |
| 172 | + cdp.dom.get_box_model(node_id=challenge_iframe.node.node_id) |
| 173 | + ) |
| 174 | + # 'content_quad' is a list of 8 numbers representing the (x, y) coordinates |
| 175 | + # of the four corners of the element's "content-box": [x1, y1, x2, y2, x3, y3, x4, y4]. |
| 176 | + content_quad = box_model_result.content |
| 177 | + |
| 178 | + # Extract only the x coordinates of the 4 corners |
| 179 | + x_coords = content_quad[0::2] |
| 180 | + # Extract only the y coordinates of the 4 corners |
| 181 | + y_coords = content_quad[1::2] |
| 182 | + |
| 183 | + min_x = min(x_coords) |
| 184 | + max_x = max(x_coords) |
| 185 | + min_y = min(y_coords) |
| 186 | + max_y = max(y_coords) |
| 187 | + |
| 188 | + click_x = min_x + (max_x - min_x) * 0.15 |
| 189 | + click_y = min_y + (max_y - min_y) / 2 |
| 190 | + |
| 191 | + logger.debug( |
| 192 | + f"Checkbox dimensions (content box): width={max_x - min_x}, height={max_y - min_y}" |
| 193 | + ) |
| 194 | + |
| 195 | + if flash_corners: |
| 196 | + logger.debug("Showing flash_point at the 4 corners.") |
| 197 | + corners = list(zip(x_coords, y_coords)) |
| 198 | + for x_corner, y_corner in corners: |
| 199 | + await tab.flash_point(x=x_corner, y=y_corner, duration=10) |
| 200 | + |
| 201 | + if not host_element: |
| 202 | + return |
| 203 | + |
| 204 | + input_element = None |
| 205 | + current_selector = None |
| 206 | + |
| 207 | + # selector priority |
| 208 | + # 1. challenge_selector |
| 209 | + # 2. input[name=cf-turnstile-response] |
| 210 | + # 3. input[name=cf_challenge_response] |
| 211 | + |
| 212 | + # sometimes turnstile challenge have inputs 2 and 3. |
| 213 | + # input 2 is default for turnstile challenge. |
| 214 | + |
| 215 | + current_selector = ( |
| 216 | + challenge_selector |
| 217 | + if challenge_selector |
| 218 | + else "input[name=cf-turnstile-response]" |
| 219 | + ) |
| 220 | + input_element = await host_element.query_selector(current_selector) |
| 221 | + |
| 222 | + if not input_element and not challenge_selector: |
| 223 | + current_selector = "input[name=cf_challenge_response]" |
| 224 | + input_element = await host_element.query_selector(current_selector) |
| 225 | + |
| 226 | + if not input_element: |
| 227 | + return |
| 228 | + |
| 229 | + checkbox_clicked = False |
| 230 | + |
| 231 | + async def check_input( |
| 232 | + input_el: Element, current_sltr: str, host_el: Element, ckbx_clckd: bool |
| 233 | + ) -> bool: |
| 234 | + """Checks if the input element is still present and without a value.""" |
| 235 | + if not input_el: |
| 236 | + return False |
| 237 | + try: |
| 238 | + await input_el |
| 239 | + fresh_input = await host_el.query_selector(current_sltr) |
| 240 | + except Exception as e: |
| 241 | + raise Exception(f"Error checking input element: {e}.") |
| 242 | + if (input_el.attrs.get("value") or not fresh_input) and ckbx_clckd: |
| 243 | + # If the input disappears or gets a value, assume it's successfully completed. |
| 244 | + logger.debug("Input element check successful (disappeared or has value).") |
| 245 | + return False |
| 246 | + return True |
| 247 | + |
| 248 | + while await check_input( |
| 249 | + input_el=input_element, |
| 250 | + current_sltr=current_selector, |
| 251 | + host_el=host_element, |
| 252 | + ckbx_clckd=checkbox_clicked, |
| 253 | + ): |
| 254 | + if loop.time() - start_time >= timeout: |
| 255 | + raise TimeoutError( |
| 256 | + f"Could not solve the checkbox in {timeout} seconds (timeout during clicks)." |
| 257 | + ) |
| 258 | + try: |
| 259 | + await tab.mouse_click(click_x, click_y) |
| 260 | + await asyncio.sleep(click_delay) |
| 261 | + checkbox_clicked = True |
| 262 | + except Exception as e: |
| 263 | + if "could not find position" in str(e) and checkbox_clicked: |
| 264 | + logger.debug("Checkbox disappeared after click. Assuming success.") |
| 265 | + break |
| 266 | + raise Exception(f"Error clicking checkbox: {e}.") |
| 267 | + |
| 268 | + logger.debug("Checkbox challenge completed. ✔") |
| 269 | + return |
0 commit comments