diff --git a/kiss_headers/api.py b/kiss_headers/api.py index b1fc370..6c697ec 100644 --- a/kiss_headers/api.py +++ b/kiss_headers/api.py @@ -1,6 +1,6 @@ from email.message import Message from email.parser import HeaderParser -from io import RawIOBase +from io import BufferedReader, RawIOBase from typing import Any, Iterable, List, Mapping, Optional, Tuple, Type, TypeVar, Union from kiss_headers.models import Header, Headers @@ -31,7 +31,11 @@ def parse_it(raw_headers: Any) -> Headers: if isinstance(raw_headers, str): headers = HeaderParser().parsestr(raw_headers, headersonly=True).items() - elif isinstance(raw_headers, bytes) or isinstance(raw_headers, RawIOBase): + elif ( + isinstance(raw_headers, bytes) + or isinstance(raw_headers, RawIOBase) + or isinstance(raw_headers, BufferedReader) + ): decoded, not_decoded = extract_encoded_headers( raw_headers if isinstance(raw_headers, bytes) else raw_headers.read() or b"" ) diff --git a/kiss_headers/models.py b/kiss_headers/models.py index af74685..b59d510 100644 --- a/kiss_headers/models.py +++ b/kiss_headers/models.py @@ -4,6 +4,7 @@ from kiss_headers.structures import AttributeBag, CaseInsensitiveDict from kiss_headers.utils import ( + escape_double_quote, extract_comments, header_content_split, header_name_to_class, @@ -11,6 +12,7 @@ normalize_list, normalize_str, prettify_header_name, + unescape_double_quote, unfold, unpack_protected_keyword, unquote, @@ -1203,7 +1205,7 @@ def __init__(self, members: List[str]): self.insert(unquote(member), None) continue - self.insert(key, unquote(value)) + self.insert(key, unescape_double_quote(unquote(value))) continue self.insert(unquote(member), None) @@ -1220,7 +1222,9 @@ def __str__(self) -> str: if value is not None: content += '{semi_colon_r}{key}="{value}"'.format( - key=key, value=value, semi_colon_r="; " if content != "" else "", + key=key, + value=escape_double_quote(value), + semi_colon_r="; " if content != "" else "", ) else: content += "; " + key if content != "" else key diff --git a/kiss_headers/utils.py b/kiss_headers/utils.py index d35e809..ac427a1 100644 --- a/kiss_headers/utils.py +++ b/kiss_headers/utils.py @@ -1,5 +1,5 @@ from email.header import decode_header -from re import findall, search +from re import findall, search, sub from typing import Any, Iterable, List, Optional, Set, Tuple, Type RESERVED_KEYWORD: Set[str] = { @@ -89,6 +89,8 @@ def header_content_split(string: str, delimiter: str) -> List[str]: ['Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:50.0) Gecko/20100101 Firefox/50.0'] >>> header_content_split("text/html; charset=UTF-8", ";") ['text/html', 'charset=UTF-8'] + >>> header_content_split('text/html; charset="UTF-\\\"8"', ";") + ['text/html', 'charset="UTF-"8"'] """ if len(delimiter) != 1 or delimiter not in {";", ",", " "}: raise ValueError("Delimiter should be either semi-colon, a coma or a space.") @@ -394,16 +396,12 @@ def extract_comments(content: str) -> List[str]: def unfold(content: str) -> str: - """Some header content may have folded content (LF + 9 spaces, LF + 7 spaces, or LF + 1 spaces) in it, making your job at reading them a little more difficult. + r"""Some header content may have folded content (CRLF + n spaces) in it, making your job at reading them a little more difficult. This function undoes the folding in the given content. - >>> unfold("eqHS2AQD+hfNNlTiLej73CiBUGVQifX4watAaxUkdjGeH578i7n3Wwcdw2nLz+U0bH\\n ehSe/2QytZGWM5CewwNdumT1IVGzjFs+cRgfK0V6JlEIOoV3bRXxnjenWFfWdVNXtw8s") - 'eqHS2AQD+hfNNlTiLej73CiBUGVQifX4watAaxUkdjGeH578i7n3Wwcdw2nLz+U0bHehSe/2QytZGWM5CewwNdumT1IVGzjFs+cRgfK0V6JlEIOoV3bRXxnjenWFfWdVNXtw8s' + >>> unfold("___utmvbtouVBFmB=gZg\r\n XbNOjalT: Lte; path=/; Max-Age=900") + '___utmvbtouVBFmB=gZg XbNOjalT: Lte; path=/; Max-Age=900' """ - return ( - content.replace("\n" + (9 * " "), "") - .replace("\n" + (7 * " "), " ") - .replace("\n" + (1 * " "), " ") - ) + return sub(r"\r\n[ ]+", " ", content) def extract_encoded_headers(payload: bytes) -> Tuple[str, bytes]: @@ -427,3 +425,25 @@ def extract_encoded_headers(payload: bytes) -> Tuple[str, bytes]: break return result, b"\r\n".join(lines[index + 1 :]) + + +def unescape_double_quote(content: str) -> str: + """ + Replace escaped double quote in content by removing the backslash. + >>> unescape_double_quote(r'UTF\"-8') + 'UTF"-8' + >>> unescape_double_quote(r'UTF"-8') + 'UTF"-8' + """ + return content.replace(r"\"", '"') + + +def escape_double_quote(content: str) -> str: + r""" + Replace not escaped double quote in content by adding a backslash beforehand. + >>> escape_double_quote(r'UTF\"-8') + 'UTF\\"-8' + >>> escape_double_quote(r'UTF"-8') + 'UTF\\"-8' + """ + return unescape_double_quote(content).replace('"', r"\"") diff --git a/kiss_headers/version.py b/kiss_headers/version.py index 40ebdb0..e9dab66 100644 --- a/kiss_headers/version.py +++ b/kiss_headers/version.py @@ -2,5 +2,5 @@ Expose version """ -__version__ = "2.2.2" +__version__ = "2.2.3" VERSION = __version__.split(".") diff --git a/tests/test_attributes.py b/tests/test_attributes.py index d728ef0..5b0f0f0 100644 --- a/tests/test_attributes.py +++ b/tests/test_attributes.py @@ -1,6 +1,7 @@ import unittest from kiss_headers import Attributes +from kiss_headers.utils import header_content_split class AttributesTestCase(unittest.TestCase): @@ -22,6 +23,19 @@ def test_eq(self): self.assertNotEqual(attr_a, attr_e) + def test_esc_double_quote(self): + + with self.subTest( + "Ensure that the double quote character is handled correctly." + ): + attributes = Attributes( + header_content_split(r'text/html; charset="UTF-\"8"', ";") + ) + + self.assertEqual(attributes["charset"], 'UTF-"8') + + self.assertEqual(str(attributes), r'text/html; charset="UTF-\"8"') + if __name__ == "__main__": unittest.main()