diff --git a/pex/compatibility.py b/pex/compatibility.py index d89c7391b..80b8610df 100644 --- a/pex/compatibility.py +++ b/pex/compatibility.py @@ -120,10 +120,13 @@ def exec_function(ast, globals_map): if PY3: + from http.client import HTTPConnection as HTTPConnection + from http.client import HTTPResponse as HTTPResponse from urllib import parse as _url_parse from urllib.error import HTTPError as HTTPError from urllib.parse import quote as _url_quote from urllib.parse import unquote as _url_unquote + from urllib.request import AbstractHTTPHandler as AbstractHTTPHandler from urllib.request import FileHandler as FileHandler from urllib.request import HTTPBasicAuthHandler as HTTPBasicAuthHandler from urllib.request import HTTPDigestAuthHandler as HTTPDigestAuthHandler @@ -137,6 +140,9 @@ def exec_function(ast, globals_map): from urllib import unquote as _url_unquote import urlparse as _url_parse + from httplib import HTTPConnection as HTTPConnection + from httplib import HTTPResponse as HTTPResponse + from urllib2 import AbstractHTTPHandler as AbstractHTTPHandler from urllib2 import FileHandler as FileHandler from urllib2 import HTTPBasicAuthHandler as HTTPBasicAuthHandler from urllib2 import HTTPDigestAuthHandler as HTTPDigestAuthHandler diff --git a/pex/fetcher.py b/pex/fetcher.py index b1614a6d2..425a2f840 100644 --- a/pex/fetcher.py +++ b/pex/fetcher.py @@ -5,6 +5,7 @@ import contextlib import os +import socket import sys import threading import time @@ -13,16 +14,21 @@ from pex import asserts from pex.auth import PasswordDatabase, PasswordEntry from pex.compatibility import ( + PY2, + AbstractHTTPHandler, FileHandler, HTTPBasicAuthHandler, + HTTPConnection, HTTPDigestAuthHandler, HTTPError, HTTPPasswordMgrWithDefaultRealm, + HTTPResponse, HTTPSHandler, ProxyHandler, Request, build_opener, in_main_thread, + urlparse, ) from pex.network_configuration import NetworkConfiguration from pex.typing import TYPE_CHECKING, cast @@ -30,7 +36,7 @@ if TYPE_CHECKING: from ssl import SSLContext - from typing import BinaryIO, Dict, Iterable, Iterator, Mapping, Optional, Text + from typing import Any, BinaryIO, Dict, Iterable, Iterator, Mapping, Optional, Text import attr # vendor:skip else: @@ -148,6 +154,68 @@ def initialize_ssl_context(network_configuration=None): initialize_ssl_context() +class UnixHTTPConnection(HTTPConnection): + def __init__( + self, + *args, # type: Any + **kwargs # type: Any + ): + # type: (...) -> None + path = kwargs.pop("path") + super(UnixHTTPConnection, self).__init__(*args, **kwargs) + self.path = path + + def connect(self): + # type: () -> None + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(self.path) + self.sock = sock + + +class UnixHTTPHandler(AbstractHTTPHandler): + # N.B.: The naming scheme here is _; thus `unix` captures unix:// URLs and + # `open` captures the open event for unix:// URLs. + def unix_open(self, req): + # type: (Request) -> HTTPResponse + url_info = urlparse.urlparse(req.get_full_url()) + + path = "" + unix_socket_path = url_info.path + while not os.path.basename(unix_socket_path).endswith(".sock"): + path = os.path.join(path, os.path.basename(unix_socket_path)) + new_unix_socket_path = os.path.dirname(unix_socket_path) + if new_unix_socket_path == unix_socket_path: + # There was no *.sock component, so just use the full path. + path = "" + unix_socket_path = url_info.path + break + unix_socket_path = new_unix_socket_path + + # :///;?# + url = urlparse.urlunparse( + ("unix", "localhost", path, url_info.params, url_info.query, url_info.fragment) + ) + kwargs = {} if PY2 else {"method": req.get_method()} + modified_req = Request( + url, + data=req.data, + headers=req.headers, + # N.B.: MyPy for Python 2.7 needs the cast. + origin_req_host=cast(str, req.origin_req_host), + unverifiable=req.unverifiable, + **kwargs + ) + + # The stdlib actually sets timeout this way - it is not a constructor argument in any + # Python version. + modified_req.timeout = req.timeout + + # N.B.: MyPy for Python 2.7 needs the cast. + return cast( + HTTPResponse, self.do_open(UnixHTTPConnection, modified_req, path=unix_socket_path) + ) + + class URLFetcher(object): USER_AGENT = "pex/{version}".format(version=__version__) @@ -171,6 +239,7 @@ def __init__( handlers = [ ProxyHandler(proxies), HTTPSHandler(context=get_ssl_context(network_configuration=network_configuration)), + UnixHTTPHandler(), ] if handle_file_urls: handlers.append(FileHandler()) diff --git a/testing/data/locks/issue-2415.lock.json b/testing/data/locks/issue-2415.lock.json index ce9d2d6c4..59fc6d3bc 100644 --- a/testing/data/locks/issue-2415.lock.json +++ b/testing/data/locks/issue-2415.lock.json @@ -1491,7 +1491,7 @@ "requirements": [ "flask", "gevent>=1.3.4", - "gunicorn" + "gunicorn[gevent]" ], "requires_python": [ "<3.13,>=3.8" diff --git a/tests/integration/test_issue_2415.py b/tests/integration/test_issue_2415.py index de6ef27e8..0b0dbf000 100644 --- a/tests/integration/test_issue_2415.py +++ b/tests/integration/test_issue_2415.py @@ -3,27 +3,20 @@ import atexit import os.path -import re import subprocess -import threading +import time from textwrap import dedent -from threading import Event -from typing import Optional import pytest -from pex.common import safe_open +from pex.common import safe_mkdir, safe_open from pex.fetcher import URLFetcher from pex.typing import TYPE_CHECKING -from testing import IS_PYPY, PY_VER, data, run_pex_command +from testing import IS_MAC, IS_PYPY, PY_VER, data, run_pex_command if TYPE_CHECKING: from typing import Any - import attr # vendor:skip -else: - from pex.third_party import attr - @pytest.mark.skipif( IS_PYPY or PY_VER < (3, 8) or PY_VER >= (3, 13), @@ -39,18 +32,15 @@ def test_gevent_monkeypatch(tmpdir): app_fp.write( dedent( """\ - from gevent import monkey - monkey.patch_all() - from flask import Flask app = Flask(__name__) - @app.route("/") - def hello_world(): - return "Hello, World!" + @app.route("/") + def hello_world(username): + return "Hello, {}!".format(username) """ ) ) @@ -67,10 +57,11 @@ def hello_world(): # --pip-version latest \ # --style universal \ # --interpreter-constraint ">=3.8,<3.13" \ + # --no-build \ # --indent 2 \ # flask \ # "gevent>=1.3.4" \ - # gunicorn + # gunicorn[gevent] lock = data.path("locks", "issue-2415.lock.json") run_pex_command( @@ -86,58 +77,45 @@ def hello_world(): "-c", "gunicorn", "--inject-args", - "app:app", + "--worker-class gevent app:app", "-o", pex, ], cwd=str(tmpdir), ).assert_success() - log = os.path.join(str(tmpdir), "log") - os.mkfifo(log) - - @attr.s - class LogScanner(object): - port_seen = attr.ib(factory=Event, init=False) # type: Event - _port = attr.ib(default=None) # type: Optional[int] - - def scan_log(self): - # type: () -> None - - with open(log) as log_fp: - for line in log_fp: - if self._port is None: - match = re.search(r"Listening at: http://127.0.0.1:(?P\d{1,5})", line) - if match: - self._port = int(match.group("port")) - self.port_seen.set() - - @property - def port(self): - # type: () -> int - self.port_seen.wait() - assert self._port is not None - return self._port - - log_scanner = LogScanner() - log_scan_thread = threading.Thread(target=log_scanner.scan_log) - log_scan_thread.daemon = True - log_scan_thread.start() - + socket = os.path.join( + safe_mkdir(os.path.expanduser("~/Library/Caches/TemporaryItems")) + if IS_MAC + else str(tmpdir), + "gunicorn.sock", + ) with open(os.path.join(str(tmpdir), "stderr"), "wb+") as stderr_fp: gunicorn = subprocess.Popen( - args=[pex, "--bind", "127.0.0.1:0", "--log-file", log], stderr=stderr_fp + args=[pex, "--bind", "unix:{socket}".format(socket=socket)], stderr=stderr_fp ) atexit.register(gunicorn.kill) - with URLFetcher().get_body_stream( - "http://127.0.0.1:{port}".format(port=log_scanner.port) - ) as http_fp: - assert b"Hello, World!" == http_fp.read().strip() + start = time.time() + while not os.path.exists(socket): + if time.time() - start > 60: + break + # Local testing on an unloaded system shows gunicorn takes about a second to start up. + time.sleep(1.0) + assert os.path.exists(socket), ( + "Timed out after waiting {time:.3f}s for gunicorn to start and open a unix socket at " + "{socket}".format(time=time.time() - start, socket=socket) + ) + print( + "Waited {time:.3f}s for gunicorn to start and open a unix socket at {socket}".format( + time=time.time() - start, socket=socket + ) + ) + with URLFetcher().get_body_stream("unix://{socket}/World".format(socket=socket)) as http_fp: + assert b"Hello, World!" == http_fp.read().strip() gunicorn.kill() - log_scan_thread.join() - stderr_fp.flush() + stderr_fp.seek(0) stderr = stderr_fp.read() assert b"MonkeyPatchWarning: Monkey-patching ssl after ssl " not in stderr, stderr.decode(