|
| 1 | +import binascii |
1 | 2 | import codecs |
2 | 3 | import datetime |
3 | 4 | from email.message import Message |
4 | 5 | from typing import Any, Callable, Optional, TypeVar, Union |
5 | 6 |
|
6 | | -from tls_requests.exceptions import HTTPError |
| 7 | +from tls_requests.exceptions import Base64DecodeError, HTTPError |
7 | 8 | from tls_requests.models.cookies import Cookies |
8 | 9 | from tls_requests.models.encoders import StreamEncoder |
9 | 10 | from tls_requests.models.headers import Headers |
|
12 | 13 | from tls_requests.models.tls import TLSResponse |
13 | 14 | from tls_requests.settings import CHUNK_SIZE |
14 | 15 | from tls_requests.types import CookieTypes, HeaderTypes, ResponseHistory |
15 | | -from tls_requests.utils import chardet, to_json |
| 16 | +from tls_requests.utils import b64decode, chardet, to_json |
16 | 17 |
|
17 | 18 | __all__ = ["Response"] |
18 | 19 |
|
@@ -231,10 +232,21 @@ async def aclose(self) -> None: |
231 | 232 | return self.close() |
232 | 233 |
|
233 | 234 | @classmethod |
234 | | - def from_tls_response(cls, response: TLSResponse) -> "Response": |
| 235 | + def from_tls_response(cls, response: TLSResponse, is_byte_response: bool = False) -> "Response": |
| 236 | + def _parse_response_body(value: Optional[str]) -> bytes: |
| 237 | + if value: |
| 238 | + if is_byte_response: |
| 239 | + try: |
| 240 | + value = b64decode(value.split(",")[-1]) |
| 241 | + return value |
| 242 | + except (binascii.Error, AssertionError): |
| 243 | + raise Base64DecodeError("Couldn't decode the base64 string into bytes.") |
| 244 | + return value.encode("utf-8") |
| 245 | + return b"" |
| 246 | + |
235 | 247 | ret = cls( |
236 | 248 | status_code=response.status, |
237 | | - body=response.body.encode("utf-8") if response.body else b"", |
| 249 | + body=_parse_response_body(response.body), |
238 | 250 | headers=response.headers, |
239 | 251 | cookies=response.cookies, |
240 | 252 | ) |
|
0 commit comments