From 87548dde794e219278d29d1c33cc4f5c6a3b2510 Mon Sep 17 00:00:00 2001 From: Alex Burke Date: Fri, 31 May 2024 11:11:01 +0200 Subject: [PATCH] WIP: repair fileio --- mig/shared/fileio.py | 32 ++++------ tests/test_mig_shared_fileio.py | 101 ++++++++++++++++++++------------ 2 files changed, 75 insertions(+), 58 deletions(-) diff --git a/mig/shared/fileio.py b/mig/shared/fileio.py index 3e2233687..c4b213da7 100644 --- a/mig/shared/fileio.py +++ b/mig/shared/fileio.py @@ -30,15 +30,19 @@ from __future__ import print_function from __future__ import absolute_import +import codecs import errno import fcntl import os import shutil import sys import tempfile +import traceback import time import zipfile +PY2 = sys.version_info[0] == 2 + # NOTE: We expose optimized walk function directly for ease and efficiency. # Requires stand-alone scandir module on python 2 whereas the native os # functions are built-in and optimized similarly on python 3+ @@ -70,19 +74,8 @@ exit(1) -def _auto_adjust_mode(data, mode): - """Select suitable file open mode based on string type of data. I.e. whether - to use binary or text mode depending on data in bytes or unicode format. - """ - - # NOTE: detect byte/unicode writes and handle explicitly in a portable way - if isinstance(data, bytes): - if 'b' not in mode: - mode = "%sb" % mode # appended to avoid mode ordering error on PY2 - else: - if 'b' in mode: - mode = mode.strip('b') - return mode +def _is_string(value): + return isinstance(value, unicode) if PY2 else isinstance(value, str) def _write_chunk(path, chunk, offset, logger=None, mode='r+b', @@ -100,6 +93,12 @@ def _write_chunk(path, chunk, offset, logger=None, mode='r+b', (offset, path)) return False + if _is_string(chunk): + chunk = bytearray(chunk, 'utf8') + # detect byte writes and handle explicitly in a portable way + if isinstance(chunk, (bytes, bytearray)) and 'b' not in mode: + mode = "%sb" % mode # appended to avoid mode ordering error on PY2 + # create dir and file if it does not exists (head, _) = os.path.split(path) @@ -140,6 +139,7 @@ def _write_chunk(path, chunk, offset, logger=None, mode='r+b', # logger.debug("file %r chunk written at %d" % (path, offset)) return True except Exception as err: + #traceback.print_exc() logger.error("could not write %r chunk at %d: %s" % (path, offset, err)) return False @@ -152,9 +152,6 @@ def write_chunk(path, chunk, offset, logger, mode='r+b', force_string=False): if not logger: logger = null_logger("dummy") - # TODO: enable this again once throuroughly tested and assured py2+3 safe - # mode = _auto_adjust_mode(chunk, mode) - return _write_chunk(path, chunk, offset, logger, mode, force_string=force_string) @@ -169,9 +166,6 @@ def write_file(content, path, logger, mode='w', make_parent=True, umask=None, if umask is not None: old_umask = os.umask(umask) - # TODO: enable this again once throuroughly tested and assured py2+3 safe - #mode = _auto_adjust_mode(content, mode) - retval = _write_chunk(path, content, offset=0, logger=logger, mode=mode, make_parent=make_parent, create_file=False, force_string=force_string) diff --git a/tests/test_mig_shared_fileio.py b/tests/test_mig_shared_fileio.py index e368a3051..4ad9fd2b6 100644 --- a/tests/test_mig_shared_fileio.py +++ b/tests/test_mig_shared_fileio.py @@ -28,13 +28,14 @@ """Unit test fileio functions""" import binascii +import codecs import os import sys import unittest # NOTE: wrap next imports in try except to prevent autopep8 shuffling up try: - from tests.support import MigTestCase, cleanpath, temppath, testmain + from tests.support import PY2, MigTestCase, cleanpath, temppath, testmain import mig.shared.fileio as fileio except ImportError as ioe: print("Failed to import mig core modules: %s" % ioe) @@ -43,15 +44,47 @@ DUMMY_BYTES = binascii.unhexlify('DEADBEEF') # 4 bytes DUMMY_BYTES_LENGTH = 4 DUMMY_UNICODE = u'UniCode123½¾µßðþđŋħĸþł@ª€£$¥©®' -DUMMY_UNICODE_LENGTH = len(DUMMY_UNICODE) +DUMMY_UNICODE_BYTES = bytearray(DUMMY_UNICODE, 'utf8') +DUMMY_UNICODE_BYTES_LENGTH = len(DUMMY_UNICODE_BYTES) DUMMY_FILE_WRITECHUNK = 'fileio/write_chunk' DUMMY_FILE_WRITEFILE = 'fileio/write_file' assert isinstance(DUMMY_BYTES, bytes) +def _as_unicode_string(value): + assert isinstance(value, bytearray) + return unicode(codecs.decode(value, 'utf8')) if PY2 else str(value, 'utf8') + + +class _ByteText: + """File-like object that allows interacting with a text file as a bytearray. + + Supports reading and directly usable as a context manager.""" + + def __init__(self, path, mode='r'): + self._file = None + self._path = path + self._mode = "%s%s" % (mode, 'b') if 'b' not in mode else mode + + def read(self, size): + content = self._file.read(size) + # always read back the content as though it was raw bytes + return bytearray(content) + + def __enter__(self): + self._file = open(self._path, mode=self._mode) + return self + + def __exit__(self, *args): + self._file.close() + if args[1] is not None: + raise args[1] + + class MigSharedFileio__write_chunk(MigTestCase): - # TODO: Add docstrings to this class and its methods + """Coverage of the write_chunk() function.""" + def setUp(self): super(MigSharedFileio__write_chunk, self).setUp() self.tmp_path = temppath(DUMMY_FILE_WRITECHUNK, self, skip_clean=True) @@ -60,9 +93,7 @@ def setUp(self): def test_return_false_on_invalid_data(self): self.logger.forgive_errors() - # NOTE: we make sure to disable any forced stringification here - did_succeed = fileio.write_chunk(self.tmp_path, 1234, 0, self.logger, - force_string=False) + did_succeed = fileio.write_chunk(self.tmp_path, 1234, 0, self.logger) self.assertFalse(did_succeed) def test_return_false_on_invalid_offset(self): @@ -106,7 +137,6 @@ def test_store_bytes_at_offset(self): "expected a hole was left") self.assertEqual(content[3:], DUMMY_BYTES) - @unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select") def test_store_bytes_in_text_mode(self): fileio.write_chunk(self.tmp_path, DUMMY_BYTES, 0, self.logger, mode="r+") @@ -116,28 +146,29 @@ def test_store_bytes_in_text_mode(self): self.assertEqual(len(content), DUMMY_BYTES_LENGTH) self.assertEqual(content[:], DUMMY_BYTES) - @unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select") def test_store_unicode(self): - fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger, - mode='r+') + did_succeed = fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger, + mode='r+') + self.assertTrue(did_succeed) - with open(self.tmp_path, 'r') as file: + with _ByteText(self.tmp_path) as file: content = file.read(1024) - self.assertEqual(len(content), DUMMY_UNICODE_LENGTH) - self.assertEqual(content[:], DUMMY_UNICODE) + self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH) + self.assertEqual(content[:], DUMMY_UNICODE_BYTES) - @unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select") def test_store_unicode_in_binary_mode(self): fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger, mode='r+b') - with open(self.tmp_path, 'r') as file: + with _ByteText(self.tmp_path) as file: content = file.read(1024) - self.assertEqual(len(content), DUMMY_UNICODE_LENGTH) - self.assertEqual(content[:], DUMMY_UNICODE) + self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH) + self.assertEqual(_as_unicode_string(content[:]), DUMMY_UNICODE) class MigSharedFileio__write_file(MigTestCase): + """Coverage of the write_file() function.""" + def setUp(self): super(MigSharedFileio__write_file, self).setUp() self.tmp_path = temppath(DUMMY_FILE_WRITEFILE, self, skip_clean=True) @@ -146,9 +177,7 @@ def setUp(self): def test_return_false_on_invalid_data(self): self.logger.forgive_errors() - # NOTE: we make sure to disable any forced stringification here - did_succeed = fileio.write_file(1234, self.tmp_path, self.logger, - force_string=False) + did_succeed = fileio.write_file(1234, self.tmp_path, self.logger) self.assertFalse(did_succeed) def test_return_false_on_invalid_dir(self): @@ -156,7 +185,8 @@ def test_return_false_on_invalid_dir(self): os.makedirs(self.tmp_path) - did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger) + did_succeed = fileio.write_file( + DUMMY_BYTES, self.tmp_path, self.logger) self.assertFalse(did_succeed) def test_return_false_on_missing_dir(self): @@ -167,20 +197,16 @@ def test_return_false_on_missing_dir(self): self.assertFalse(did_succeed) def test_creates_directory(self): - # TODO: temporarily use empty string to avoid any byte/unicode issues - # did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger) - did_succeed = fileio.write_file('', self.tmp_path, self.logger) + did_succeed = fileio.write_file( + DUMMY_BYTES, self.tmp_path, self.logger) self.assertTrue(did_succeed) path_kind = self.assertPathExists(DUMMY_FILE_WRITEFILE) self.assertEqual(path_kind, "file") def test_store_bytes(self): - mode = 'w' - # TODO: remove next once we have auto adjust mode in write helper - mode = fileio._auto_adjust_mode(DUMMY_BYTES, mode) - did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger, - mode=mode) + did_succeed = fileio.write_file( + DUMMY_BYTES, self.tmp_path, self.logger) self.assertTrue(did_succeed) with open(self.tmp_path, 'rb') as file: @@ -188,7 +214,6 @@ def test_store_bytes(self): self.assertEqual(len(content), DUMMY_BYTES_LENGTH) self.assertEqual(content[:], DUMMY_BYTES) - @unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select") def test_store_bytes_in_text_mode(self): did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger, mode="w") @@ -199,28 +224,26 @@ def test_store_bytes_in_text_mode(self): self.assertEqual(len(content), DUMMY_BYTES_LENGTH) self.assertEqual(content[:], DUMMY_BYTES) - @unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select") def test_store_unicode(self): did_succeed = fileio.write_file(DUMMY_UNICODE, self.tmp_path, self.logger, mode='w') self.assertTrue(did_succeed) - with open(self.tmp_path, 'r') as file: + with _ByteText(self.tmp_path, 'r') as file: content = file.read(1024) - self.assertEqual(len(content), DUMMY_UNICODE_LENGTH) - self.assertEqual(content[:], DUMMY_UNICODE) + self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH) + self.assertEqual(content[:], DUMMY_UNICODE_BYTES) - @unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select") def test_store_unicode_in_binary_mode(self): did_succeed = fileio.write_file(DUMMY_UNICODE, self.tmp_path, self.logger, mode='wb') self.assertTrue(did_succeed) - with open(self.tmp_path, 'r') as file: + with _ByteText(self.tmp_path, 'r') as file: content = file.read(1024) - self.assertEqual(len(content), DUMMY_UNICODE_LENGTH) - self.assertEqual(content[:], DUMMY_UNICODE) + self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH) + self.assertEqual(content[:], DUMMY_UNICODE_BYTES) if __name__ == '__main__': - testmain() + testmain(failfast=True)