From 3c2079e8f40ac3fad2ab013f7de2e5c2962c4bd5 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Fri, 19 Jan 2024 12:40:48 +0100 Subject: [PATCH] Adapt piped tests for binary reading only --- tests/test_piped.py | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/tests/test_piped.py b/tests/test_piped.py index 7242f06..114d879 100644 --- a/tests/test_piped.py +++ b/tests/test_piped.py @@ -47,8 +47,8 @@ base = os.path.join(os.path.dirname(__file__), "file.txt") files = [base + ext for ext in extensions] TEST_DIR = Path(__file__).parent -CONTENT_LINES = ["Testing, testing ...\n", "The second line.\n"] -CONTENT = "".join(CONTENT_LINES) +CONTENT_LINES = [b"Testing, testing ...\n", b"The second line.\n"] +CONTENT = b"".join(CONTENT_LINES) def available_gzip_readers_and_writers(): @@ -147,7 +147,7 @@ def writer(request): def test_reader_readinto(reader): opener, extension = reader - content = CONTENT.encode("utf-8") + content = CONTENT with opener(TEST_DIR / f"file.txt{extension}", "rb") as f: b = bytearray(len(content) + 100) length = f.readinto(b) @@ -159,22 +159,16 @@ def test_reader_textiowrapper(reader): opener, extension = reader with opener(TEST_DIR / f"file.txt{extension}", "rb") as f: wrapped = io.TextIOWrapper(f, encoding="utf-8") - assert wrapped.read() == CONTENT + assert wrapped.read() == CONTENT.decode("utf-8") def test_reader_readline(reader): opener, extension = reader - first_line = CONTENT_LINES[0].encode("utf-8") + first_line = CONTENT_LINES[0] with opener(TEST_DIR / f"file.txt{extension}", "rb") as f: assert f.readline() == first_line -def test_reader_readline_text(reader): - opener, extension = reader - with opener(TEST_DIR / f"file.txt{extension}", "r") as f: - assert f.readline() == CONTENT_LINES[0] - - def test_reader_readlines(reader): opener, extension = reader with opener(TEST_DIR / f"file.txt{extension}", "r") as f: @@ -216,11 +210,10 @@ def test_reader_iter_without_with(reader): f.close() -@pytest.mark.parametrize("mode", ["rb", "rt"]) -def test_reader_close(mode, reader, create_large_file): +def test_reader_close(reader, create_large_file): reader, extension = reader large_file = create_large_file(extension) - with reader(large_file, mode=mode) as f: + with reader(large_file, mode="rb") as f: f.readline() time.sleep(0.2) # The subprocess should be properly terminated now @@ -249,7 +242,7 @@ def test_invalid_zstd_compression_level(tmp_path): def test_readers_read(reader): opener, extension = reader - with opener(TEST_DIR / f"file.txt{extension}", "rt") as f: + with opener(TEST_DIR / f"file.txt{extension}", "rb") as f: assert f.read() == CONTENT @@ -337,13 +330,12 @@ def test_piped_compression_reader_seek_and_tell(reader): assert f.read(8) == b"Testing," -@pytest.mark.parametrize("mode", ["r", "rt"]) +@pytest.mark.parametrize("mode", ["r", "rb"]) def test_piped_compression_reader_peek_text(reader, mode): opener, extension = reader compressed_file = TEST_DIR / f"file.txt{extension}" with opener(compressed_file, mode) as read_h: - with pytest.raises(AttributeError): - read_h.peek(1) + assert read_h.peek(1)[0] == CONTENT[0] def writers_and_levels():