Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repair fileio. #80

Open
wants to merge 1 commit into
base: edge
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 11 additions & 20 deletions mig/shared/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
import time
import zipfile

from mig.shared.compat import PY2

# NOTE: We expose optimized walk function directly for ease and efficiency.
# Requires stand-alone scandir module on python 2 whereas the native os
# functions are built-in and optimized similarly on python 3+
slow_walk, slow_listdir = False, False
if sys.version_info[0] > 2:
if not PY2:
from os import walk, listdir
else:
try:
Expand All @@ -70,19 +72,8 @@
exit(1)


def _auto_adjust_mode(data, mode):
"""Select suitable file open mode based on string type of data. I.e. whether
to use binary or text mode depending on data in bytes or unicode format.
"""

# NOTE: detect byte/unicode writes and handle explicitly in a portable way
if isinstance(data, bytes):
if 'b' not in mode:
mode = "%sb" % mode # appended to avoid mode ordering error on PY2
else:
if 'b' in mode:
mode = mode.strip('b')
return mode
def _is_string(value):
return isinstance(value, unicode) if PY2 else isinstance(value, str)


def _write_chunk(path, chunk, offset, logger=None, mode='r+b',
Expand All @@ -100,6 +91,12 @@ def _write_chunk(path, chunk, offset, logger=None, mode='r+b',
(offset, path))
return False

if _is_string(chunk):
chunk = bytearray(chunk, 'utf8')
# detect byte writes and handle explicitly in a portable way
if isinstance(chunk, (bytes, bytearray)) and 'b' not in mode:
mode = "%sb" % mode # appended to avoid mode ordering error on PY2

# create dir and file if it does not exists

(head, _) = os.path.split(path)
Expand Down Expand Up @@ -152,9 +149,6 @@ def write_chunk(path, chunk, offset, logger, mode='r+b', force_string=False):
if not logger:
logger = null_logger("dummy")

# TODO: enable this again once throuroughly tested and assured py2+3 safe
# mode = _auto_adjust_mode(chunk, mode)

return _write_chunk(path, chunk, offset, logger, mode,
force_string=force_string)

Expand All @@ -169,9 +163,6 @@ def write_file(content, path, logger, mode='w', make_parent=True, umask=None,
if umask is not None:
old_umask = os.umask(umask)

# TODO: enable this again once throuroughly tested and assured py2+3 safe
#mode = _auto_adjust_mode(content, mode)

retval = _write_chunk(path, content, offset=0, logger=logger, mode=mode,
make_parent=make_parent, create_file=False,
force_string=force_string)
Expand Down
99 changes: 61 additions & 38 deletions tests/test_mig_shared_fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
"""Unit test fileio functions"""

import binascii
import codecs
import os
import sys
import unittest

# NOTE: wrap next imports in try except to prevent autopep8 shuffling up
try:
from tests.support import MigTestCase, cleanpath, temppath, testmain
from tests.support import PY2, MigTestCase, cleanpath, temppath, testmain
import mig.shared.fileio as fileio
except ImportError as ioe:
print("Failed to import mig core modules: %s" % ioe)
Expand All @@ -43,15 +44,47 @@
DUMMY_BYTES = binascii.unhexlify('DEADBEEF') # 4 bytes
DUMMY_BYTES_LENGTH = 4
DUMMY_UNICODE = u'UniCode123½¾µßðþđŋħĸþł@ª€£$¥©®'
DUMMY_UNICODE_LENGTH = len(DUMMY_UNICODE)
DUMMY_UNICODE_BYTES = bytearray(DUMMY_UNICODE, 'utf8')
DUMMY_UNICODE_BYTES_LENGTH = len(DUMMY_UNICODE_BYTES)
DUMMY_FILE_WRITECHUNK = 'fileio/write_chunk'
DUMMY_FILE_WRITEFILE = 'fileio/write_file'

assert isinstance(DUMMY_BYTES, bytes)


def _as_unicode_string(value):
assert isinstance(value, bytearray)
return unicode(codecs.decode(value, 'utf8')) if PY2 else str(value, 'utf8')


class _ByteText:
"""File-like object that allows interacting with a text file as a bytearray.

Supports reading and directly usable as a context manager."""

def __init__(self, path, mode='r'):
self._file = None
self._path = path
self._mode = "%s%s" % (mode, 'b') if 'b' not in mode else mode

def read(self, size):
content = self._file.read(size)
# always read back the content as though it was raw bytes
return bytearray(content)

def __enter__(self):
self._file = open(self._path, mode=self._mode)
return self

def __exit__(self, *args):
self._file.close()
if args[1] is not None:
raise args[1]


class MigSharedFileio__write_chunk(MigTestCase):
# TODO: Add docstrings to this class and its methods
"""Coverage of the write_chunk() function."""

def setUp(self):
super(MigSharedFileio__write_chunk, self).setUp()
self.tmp_path = temppath(DUMMY_FILE_WRITECHUNK, self, skip_clean=True)
Expand All @@ -60,9 +93,7 @@ def setUp(self):
def test_return_false_on_invalid_data(self):
self.logger.forgive_errors()

# NOTE: we make sure to disable any forced stringification here
did_succeed = fileio.write_chunk(self.tmp_path, 1234, 0, self.logger,
force_string=False)
did_succeed = fileio.write_chunk(self.tmp_path, 1234, 0, self.logger)
self.assertFalse(did_succeed)

def test_return_false_on_invalid_offset(self):
Expand Down Expand Up @@ -106,7 +137,6 @@ def test_store_bytes_at_offset(self):
"expected a hole was left")
self.assertEqual(content[3:], DUMMY_BYTES)

@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
def test_store_bytes_in_text_mode(self):
fileio.write_chunk(self.tmp_path, DUMMY_BYTES, 0, self.logger,
mode="r+")
Expand All @@ -116,28 +146,29 @@ def test_store_bytes_in_text_mode(self):
self.assertEqual(len(content), DUMMY_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_BYTES)

@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
def test_store_unicode(self):
fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger,
mode='r+')
did_succeed = fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger,
mode='r+')
self.assertTrue(did_succeed)

with open(self.tmp_path, 'r') as file:
with _ByteText(self.tmp_path) as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_UNICODE_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE)
self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE_BYTES)

@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
def test_store_unicode_in_binary_mode(self):
fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger,
mode='r+b')

with open(self.tmp_path, 'r') as file:
with _ByteText(self.tmp_path) as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_UNICODE_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE)
self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH)
self.assertEqual(_as_unicode_string(content[:]), DUMMY_UNICODE)


class MigSharedFileio__write_file(MigTestCase):
"""Coverage of the write_file() function."""

def setUp(self):
super(MigSharedFileio__write_file, self).setUp()
self.tmp_path = temppath(DUMMY_FILE_WRITEFILE, self, skip_clean=True)
Expand All @@ -146,17 +177,16 @@ def setUp(self):
def test_return_false_on_invalid_data(self):
self.logger.forgive_errors()

# NOTE: we make sure to disable any forced stringification here
did_succeed = fileio.write_file(1234, self.tmp_path, self.logger,
force_string=False)
did_succeed = fileio.write_file(1234, self.tmp_path, self.logger)
self.assertFalse(did_succeed)

def test_return_false_on_invalid_dir(self):
self.logger.forgive_errors()

os.makedirs(self.tmp_path)

did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger)
did_succeed = fileio.write_file(
DUMMY_BYTES, self.tmp_path, self.logger)
self.assertFalse(did_succeed)

def test_return_false_on_missing_dir(self):
Expand All @@ -167,28 +197,23 @@ def test_return_false_on_missing_dir(self):
self.assertFalse(did_succeed)

def test_creates_directory(self):
# TODO: temporarily use empty string to avoid any byte/unicode issues
# did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger)
did_succeed = fileio.write_file('', self.tmp_path, self.logger)
did_succeed = fileio.write_file(
DUMMY_BYTES, self.tmp_path, self.logger)
self.assertTrue(did_succeed)

path_kind = self.assertPathExists(DUMMY_FILE_WRITEFILE)
self.assertEqual(path_kind, "file")

def test_store_bytes(self):
mode = 'w'
# TODO: remove next once we have auto adjust mode in write helper
mode = fileio._auto_adjust_mode(DUMMY_BYTES, mode)
did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger,
mode=mode)
did_succeed = fileio.write_file(
DUMMY_BYTES, self.tmp_path, self.logger)
self.assertTrue(did_succeed)

with open(self.tmp_path, 'rb') as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_BYTES)

@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
def test_store_bytes_in_text_mode(self):
did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger,
mode="w")
Expand All @@ -199,27 +224,25 @@ def test_store_bytes_in_text_mode(self):
self.assertEqual(len(content), DUMMY_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_BYTES)

@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
def test_store_unicode(self):
did_succeed = fileio.write_file(DUMMY_UNICODE, self.tmp_path,
self.logger, mode='w')
self.assertTrue(did_succeed)

with open(self.tmp_path, 'r') as file:
with _ByteText(self.tmp_path, 'r') as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_UNICODE_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE)
self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE_BYTES)

@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
def test_store_unicode_in_binary_mode(self):
did_succeed = fileio.write_file(DUMMY_UNICODE, self.tmp_path,
self.logger, mode='wb')
self.assertTrue(did_succeed)

with open(self.tmp_path, 'r') as file:
with _ByteText(self.tmp_path, 'r') as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_UNICODE_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE)
self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_UNICODE_BYTES)


if __name__ == '__main__':
Expand Down
Loading