Skip to content

Commit

Permalink
WIP: repair fileio
Browse files Browse the repository at this point in the history
  • Loading branch information
albu-diku committed Nov 5, 2024
1 parent 171b092 commit 87548dd
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 58 deletions.
32 changes: 13 additions & 19 deletions mig/shared/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,19 @@
from __future__ import print_function
from __future__ import absolute_import

import codecs
import errno
import fcntl
import os
import shutil
import sys
import tempfile
import traceback
import time
import zipfile

PY2 = sys.version_info[0] == 2

# NOTE: We expose optimized walk function directly for ease and efficiency.
# Requires stand-alone scandir module on python 2 whereas the native os
# functions are built-in and optimized similarly on python 3+
Expand Down Expand Up @@ -70,19 +74,8 @@
exit(1)


def _auto_adjust_mode(data, mode):
"""Select suitable file open mode based on string type of data. I.e. whether
to use binary or text mode depending on data in bytes or unicode format.
"""

# NOTE: detect byte/unicode writes and handle explicitly in a portable way
if isinstance(data, bytes):
if 'b' not in mode:
mode = "%sb" % mode # appended to avoid mode ordering error on PY2
else:
if 'b' in mode:
mode = mode.strip('b')
return mode
def _is_string(value):
return isinstance(value, unicode) if PY2 else isinstance(value, str)


def _write_chunk(path, chunk, offset, logger=None, mode='r+b',
Expand All @@ -100,6 +93,12 @@ def _write_chunk(path, chunk, offset, logger=None, mode='r+b',
(offset, path))
return False

if _is_string(chunk):
chunk = bytearray(chunk, 'utf8')
# detect byte writes and handle explicitly in a portable way
if isinstance(chunk, (bytes, bytearray)) and 'b' not in mode:
mode = "%sb" % mode # appended to avoid mode ordering error on PY2

# create dir and file if it does not exists

(head, _) = os.path.split(path)
Expand Down Expand Up @@ -140,6 +139,7 @@ def _write_chunk(path, chunk, offset, logger=None, mode='r+b',
# logger.debug("file %r chunk written at %d" % (path, offset))
return True
except Exception as err:
#traceback.print_exc()
logger.error("could not write %r chunk at %d: %s" %
(path, offset, err))
return False
Expand All @@ -152,9 +152,6 @@ def write_chunk(path, chunk, offset, logger, mode='r+b', force_string=False):
if not logger:
logger = null_logger("dummy")

# TODO: enable this again once throuroughly tested and assured py2+3 safe
# mode = _auto_adjust_mode(chunk, mode)

return _write_chunk(path, chunk, offset, logger, mode,
force_string=force_string)

Expand All @@ -169,9 +166,6 @@ def write_file(content, path, logger, mode='w', make_parent=True, umask=None,
if umask is not None:
old_umask = os.umask(umask)

# TODO: enable this again once throuroughly tested and assured py2+3 safe
#mode = _auto_adjust_mode(content, mode)

retval = _write_chunk(path, content, offset=0, logger=logger, mode=mode,
make_parent=make_parent, create_file=False,
force_string=force_string)
Expand Down
101 changes: 62 additions & 39 deletions tests/test_mig_shared_fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
"""Unit test fileio functions"""

import binascii
import codecs
import os
import sys
import unittest

# NOTE: wrap next imports in try except to prevent autopep8 shuffling up
try:
from tests.support import MigTestCase, cleanpath, temppath, testmain
from tests.support import PY2, MigTestCase, cleanpath, temppath, testmain
import mig.shared.fileio as fileio
except ImportError as ioe:
print("Failed to import mig core modules: %s" % ioe)
Expand All @@ -43,15 +44,47 @@
DUMMY_BYTES = binascii.unhexlify('DEADBEEF') # 4 bytes
DUMMY_BYTES_LENGTH = 4
DUMMY_UNICODE = u'UniCode123½¾µßðþđŋħĸþł@ª€£$¥©®'
DUMMY_UNICODE_LENGTH = len(DUMMY_UNICODE)
DUMMY_UNICODE_BYTES = bytearray(DUMMY_UNICODE, 'utf8')
DUMMY_UNICODE_BYTES_LENGTH = len(DUMMY_UNICODE_BYTES)
DUMMY_FILE_WRITECHUNK = 'fileio/write_chunk'
DUMMY_FILE_WRITEFILE = 'fileio/write_file'

assert isinstance(DUMMY_BYTES, bytes)


def _as_unicode_string(value):
assert isinstance(value, bytearray)
return unicode(codecs.decode(value, 'utf8')) if PY2 else str(value, 'utf8')


class _ByteText:
"""File-like object that allows interacting with a text file as a bytearray.
Supports reading and directly usable as a context manager."""

def __init__(self, path, mode='r'):
self._file = None
self._path = path
self._mode = "%s%s" % (mode, 'b') if 'b' not in mode else mode

def read(self, size):
content = self._file.read(size)
# always read back the content as though it was raw bytes
return bytearray(content)

def __enter__(self):
self._file = open(self._path, mode=self._mode)
return self

def __exit__(self, *args):
self._file.close()
if args[1] is not None:
raise args[1]


class MigSharedFileio__write_chunk(MigTestCase):
# TODO: Add docstrings to this class and its methods
"""Coverage of the write_chunk() function."""

def setUp(self):
super(MigSharedFileio__write_chunk, self).setUp()
self.tmp_path = temppath(DUMMY_FILE_WRITECHUNK, self, skip_clean=True)
Expand All @@ -60,9 +93,7 @@ def setUp(self):
def test_return_false_on_invalid_data(self):
self.logger.forgive_errors()

# NOTE: we make sure to disable any forced stringification here
did_succeed = fileio.write_chunk(self.tmp_path, 1234, 0, self.logger,
force_string=False)
did_succeed = fileio.write_chunk(self.tmp_path, 1234, 0, self.logger)
self.assertFalse(did_succeed)

def test_return_false_on_invalid_offset(self):
Expand Down Expand Up @@ -106,7 +137,6 @@ def test_store_bytes_at_offset(self):
"expected a hole was left")
self.assertEqual(content[3:], DUMMY_BYTES)

@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
def test_store_bytes_in_text_mode(self):
fileio.write_chunk(self.tmp_path, DUMMY_BYTES, 0, self.logger,
mode="r+")
Expand All @@ -116,28 +146,29 @@ def test_store_bytes_in_text_mode(self):
self.assertEqual(len(content), DUMMY_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_BYTES)

@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
def test_store_unicode(self):
fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger,
mode='r+')
did_succeed = fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger,
mode='r+')
self.assertTrue(did_succeed)

with open(self.tmp_path, 'r') as file:
with _ByteText(self.tmp_path) as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_UNICODE_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE)
self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE_BYTES)

@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
def test_store_unicode_in_binary_mode(self):
fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger,
mode='r+b')

with open(self.tmp_path, 'r') as file:
with _ByteText(self.tmp_path) as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_UNICODE_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE)
self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH)
self.assertEqual(_as_unicode_string(content[:]), DUMMY_UNICODE)


class MigSharedFileio__write_file(MigTestCase):
"""Coverage of the write_file() function."""

def setUp(self):
super(MigSharedFileio__write_file, self).setUp()
self.tmp_path = temppath(DUMMY_FILE_WRITEFILE, self, skip_clean=True)
Expand All @@ -146,17 +177,16 @@ def setUp(self):
def test_return_false_on_invalid_data(self):
self.logger.forgive_errors()

# NOTE: we make sure to disable any forced stringification here
did_succeed = fileio.write_file(1234, self.tmp_path, self.logger,
force_string=False)
did_succeed = fileio.write_file(1234, self.tmp_path, self.logger)
self.assertFalse(did_succeed)

def test_return_false_on_invalid_dir(self):
self.logger.forgive_errors()

os.makedirs(self.tmp_path)

did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger)
did_succeed = fileio.write_file(
DUMMY_BYTES, self.tmp_path, self.logger)
self.assertFalse(did_succeed)

def test_return_false_on_missing_dir(self):
Expand All @@ -167,28 +197,23 @@ def test_return_false_on_missing_dir(self):
self.assertFalse(did_succeed)

def test_creates_directory(self):
# TODO: temporarily use empty string to avoid any byte/unicode issues
# did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger)
did_succeed = fileio.write_file('', self.tmp_path, self.logger)
did_succeed = fileio.write_file(
DUMMY_BYTES, self.tmp_path, self.logger)
self.assertTrue(did_succeed)

path_kind = self.assertPathExists(DUMMY_FILE_WRITEFILE)
self.assertEqual(path_kind, "file")

def test_store_bytes(self):
mode = 'w'
# TODO: remove next once we have auto adjust mode in write helper
mode = fileio._auto_adjust_mode(DUMMY_BYTES, mode)
did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger,
mode=mode)
did_succeed = fileio.write_file(
DUMMY_BYTES, self.tmp_path, self.logger)
self.assertTrue(did_succeed)

with open(self.tmp_path, 'rb') as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_BYTES)

@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
def test_store_bytes_in_text_mode(self):
did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger,
mode="w")
Expand All @@ -199,28 +224,26 @@ def test_store_bytes_in_text_mode(self):
self.assertEqual(len(content), DUMMY_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_BYTES)

@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
def test_store_unicode(self):
did_succeed = fileio.write_file(DUMMY_UNICODE, self.tmp_path,
self.logger, mode='w')
self.assertTrue(did_succeed)

with open(self.tmp_path, 'r') as file:
with _ByteText(self.tmp_path, 'r') as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_UNICODE_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE)
self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE_BYTES)

@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
def test_store_unicode_in_binary_mode(self):
did_succeed = fileio.write_file(DUMMY_UNICODE, self.tmp_path,
self.logger, mode='wb')
self.assertTrue(did_succeed)

with open(self.tmp_path, 'r') as file:
with _ByteText(self.tmp_path, 'r') as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_UNICODE_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE)
self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE_BYTES)


if __name__ == '__main__':
testmain()
testmain(failfast=True)

0 comments on commit 87548dd

Please sign in to comment.