Skip to content

Commit

Permalink
Merge branch 'master' into guard-dxpy-imports
Browse files Browse the repository at this point in the history
  • Loading branch information
jtratner authored Apr 19, 2024
2 parents 932d85a + ff05b17 commit b087797
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ env:
jobs:
test:

runs-on: ubuntu-latest
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: [3.6, 3.7, 3.8]
Expand Down
8 changes: 6 additions & 2 deletions docs/release_notes.rst
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
Release Notes
=============

v4.0.3
v4.2.0
------
* Guard dxpy imports in utils so you can use stor without dxpy installed.
* Guard dxpy imports in utils so you can use stor without either dxpy or swift installed.

v4.1.0
------
* Replace ``multiprocessing.ThreadPool`` with ``concurrent.futures.ThreadPoolExecutor``
to support using stor on AWS Lambda with a Python 3.8 runtime.

v4.0.2
------
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "stor"
version = "4.0.3"
version = "4.2.0"
description = "Cross-compatible API for accessing Posix and OBS storage systems"
authors = ["Counsyl Inc. <[email protected]>"]
license = "MIT"
Expand Down
85 changes: 44 additions & 41 deletions stor/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
from functools import partial
import logging
from multiprocessing.pool import ThreadPool
from concurrent.futures import as_completed, ThreadPoolExecutor
import os
import tempfile
import threading
Expand Down Expand Up @@ -580,28 +580,30 @@ def download(self, dest, condition=None, use_manifest=False, **kwargs):

downloaded = {'completed': [], 'failed': []}
with S3DownloadLogger(len(files_to_download)) as dl:
pool = ThreadPool(options['object_threads'])
try:
result_iter = pool.imap_unordered(download_w_config, files_to_download)
while True:
with ThreadPoolExecutor(max_workers=options.get("object_threads")) as executor:
futures = {
executor.submit(download_w_config, file_to_download): file_to_download
for file_to_download in files_to_download
}
for fut in as_completed(futures.keys()):
try:
result = result_iter.next(0xFFFF)
if result['success']:
dl.add_result(result)
downloaded['completed'].append(result)
else:
downloaded['failed'].append(result)
except StopIteration:
break
pool.close()
except BaseException:
pool.terminate()
raise
finally:
pool.join()

if downloaded['failed']:
raise exceptions.FailedDownloadError('an error occurred while downloading', downloaded)
result = fut.result()
except Exception as e:
raise exceptions.FailedDownloadError(
"An exception occured while attempting to download file "
f'{futures[fut]["source"]}: {e}'
)

if result["success"]:
dl.add_result(result)
downloaded["completed"].append(result)
else:
downloaded["failed"].append(result)

if downloaded["failed"]:
raise exceptions.FailedDownloadError(
f"An error occurred while downloading the following files: {downloaded}"
)

utils.check_condition(condition, [r['source'] for r in downloaded['completed']])
return downloaded
Expand Down Expand Up @@ -723,29 +725,30 @@ def upload(self, source, condition=None, use_manifest=False, headers=None, **kwa

uploaded = {'completed': [], 'failed': []}
with S3UploadLogger(len(files_to_upload)) as ul:
pool = ThreadPool(options['object_threads'])
try:
result_iter = pool.imap_unordered(upload_w_config, files_to_upload)
while True:
with ThreadPoolExecutor(max_workers=options.get("object_threads")) as executor:
futures = {
executor.submit(upload_w_config, file_to_upload): file_to_upload
for file_to_upload in files_to_upload
}
for fut in as_completed(futures.keys()):
try:
result = result_iter.next(0xFFFF)
if result['success']:
ul.add_result(result)
uploaded['completed'].append(result)
else:
uploaded['failed'].append(result)
except StopIteration:
break
pool.close()
except BaseException:
pool.terminate()
raise
finally:
pool.join()
result = fut.result()
except Exception as e:
raise exceptions.FailedUploadError(
"An exception occured while attempting to upload file "
f"{futures[fut].source}: {e}"
)

if result["success"]:
ul.add_result(result)
uploaded["completed"].append(result)
else:
uploaded["failed"].append(result)

if uploaded['failed']:
raise exceptions.FailedUploadError(
'an error occurred while uploading, info={info}'.format(info=uploaded))
f"An error occurred while uploading the following files: {uploaded}"
)

utils.check_condition(condition, [r['dest'] for r in uploaded['completed']])
return uploaded
Expand Down
70 changes: 67 additions & 3 deletions stor/test.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from concurrent.futures import Executor
import inspect
import unittest
import os
import sys
from threading import Lock
import unittest
from unittest import mock
import uuid

import dxpy
import vcr

from unittest import mock

from stor import Path
from stor import s3
from stor.s3 import S3Path
Expand Down Expand Up @@ -264,6 +265,10 @@ def setUp(self):
except AttributeError:
pass

def tearDown(self):
super(S3TestCase, self).tearDown()
self.doCleanups()


class DXTestCase(DXTestMixin, unittest.TestCase):
"""A TestCase class that sets up DNAnexus vars and provides additional assertions.
Expand Down Expand Up @@ -345,3 +350,62 @@ def teardown_posix_files(self):
def teardown_project(self):
self.project_handler.destroy()
self.project_handler = None


class MockFuture():
"""A class to minimally mock methods for Future objects in ThreadPoolExecutor.
This mock Future class returns a completed result immediately to prevent test hanging.
'dest' and 'source' keys are expected in the inputs provided to set the result value;
this follows the result construction of Futures in file download and upload."""
def __init__(self, **kwargs):
self.__result = {
"success": "complete",
"dest": kwargs.get("dest"),
"source": kwargs.get("source")
}

def result(self):
return self.__result


class MockExecutor(Executor):
"""An Executor class to minimally mock methods for ThreadPoolExecutor.
This mock executor returns mock Future objects to avoid additional complexity in creating a
queue and executing passed-in functions. This prevents unit tests from hanging."""
def __init__(self):
self._shutdown = False
self._shutdownLock = Lock()

def submit(self, fn, *args, **kwargs):
"""Mock the executor.submit function to immediately return a done mocked Future.
Does not implement a work queue nor use the passed in function to calculate results.
Based on the object submitted for the job, the Future will have specific dict keys to
be used in testing to confirm used of the upload/download information passed in.
The object submitted for download is a dict with keys "source" and "dest".
The object submitted for upload is an OBSUploadObject.
Args:
fn: function used to get a result value
args: args to be passed into fn
kwargs: kwargs to be passed into fn
Returns:
MockFuture: a mock Future class with a completed result containing a success status,
a source value, and a dest value
"""
obj = args[0]
if isinstance(obj, dict):
dest = obj.get("dest")
source = obj.get("source")
else:
dest = None
source = obj.source

return MockFuture(dest=dest, source=source)

def shutdown(self, wait=True):
with self._shutdownLock:
self._shutdown = True
98 changes: 73 additions & 25 deletions stor/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from stor import settings
from stor import s3
from stor.s3 import S3Path
from stor.test import S3TestCase
from stor.test import MockExecutor, S3TestCase
from stor.tests.shared_obs import SharedOBSFileCases
from stor import utils

Expand Down Expand Up @@ -1085,19 +1085,39 @@ def test_upload_w_use_manifest_single_file(self, mock_getsize, mock_files):
S3Path('s3://bucket/path').upload(['file'],
use_manifest=True)

@mock.patch('stor.s3.ThreadPool', autospec=True)
def test_upload_object_threads(self, mock_pool, mock_getsize, mock_files):
@mock.patch(
"stor.s3.as_completed",
autospec=True,
side_effect=lambda futures: [fut for fut in futures]
)
@mock.patch("stor.s3.ThreadPoolExecutor", autospec=True, return_value=MockExecutor())
def test_upload_object_threads(
self, mock_pool, mock_completed, mock_getsize, mock_files
):
mock_files.return_value = {
'file%s' % i: 20
f"file{i}": 20
for i in range(20)
}
mock_getsize.return_value = 20
mock_pool.return_value.imap_unordered.return_value.next.side_effect = StopIteration

s3_p = S3Path('s3://bucket')
with settings.use({'s3:upload': {'object_threads': 20}}):
s3_p.upload(['test'])
mock_pool.assert_called_once_with(20)
s3_p = S3Path("s3://bucket")
with settings.use({"s3:upload": {"object_threads": 20}}):
s3_p.upload(["test"])

# confirm ThreadPoolExecutor called with expected args
mock_pool.assert_called_once_with(max_workers=20)

# confirm as_completed called with expected args
mock_completed.assert_called_once()
# check length of MockFutures list passed in to as_completed
mock_completed_called_with_futures = mock_completed.call_args[0][0]
assert len(mock_completed_called_with_futures) == 20
assert all(
[
fut.result()["dest"] is None and fut.result()["source"] == f"file{idx}"
for idx, fut in enumerate(mock_completed_called_with_futures)
]
)

def test_upload_remote_error(self, mock_getsize, mock_files):
mock_files.return_value = {
Expand All @@ -1115,11 +1135,14 @@ def test_upload_remote_error(self, mock_getsize, mock_files):
def test_upload_other_error(self, mock_getsize, mock_files):
mock_files.return_value = {
'file1': 20,
'file2': 10
'file2': 10,
}
self.mock_s3_transfer.upload_file.side_effect = [None, ValueError]
self.mock_s3_transfer.upload_file.side_effect = [None, ValueError("Error information")]

with self.assertRaises(ValueError):
with self.assertRaisesRegex(
exceptions.FailedUploadError,
f"{list(mock_files.return_value.keys())[1]}"
):
S3Path('s3://bucket/path').upload(['test'])

def test_upload_multipart_settings(self, mock_getsize, mock_files):
Expand Down Expand Up @@ -1275,19 +1298,42 @@ def test_download_w_condition_and_use_manifest(self, mock_stream, mock_list, moc
condition=lambda results: len(results) == 3)
self.assertEquals(self.mock_s3_transfer.download_file.call_count, 3)

@mock.patch.object(S3Path, 'list', autospec=True)
@mock.patch('stor.s3.ThreadPool', autospec=True)
def test_download_object_threads(self, mock_pool, mock_list, mock_getsize,
mock_make_dest_dir):
@mock.patch.object(S3Path, "list", autospec=True)
@mock.patch(
"stor.s3.as_completed",
autospec=True,
side_effect=lambda futures: [fut for fut in futures]
)
@mock.patch("stor.s3.ThreadPoolExecutor", autospec=True, return_value=MockExecutor())
def test_download_object_threads(
self, mock_pool, mock_completed, mock_list, mock_getsize, mock_make_dest_dir
):
mock_list.return_value = [
S3Path('s3://bucket/file%s' % i)
S3Path(f"s3://bucket/file{i}")
for i in range(20)
]
mock_pool.return_value.imap_unordered.return_value.next.side_effect = StopIteration
s3_p = S3Path('s3://bucket')
with settings.use({'s3:download': {'object_threads': 20}}):
s3_p.download(['test'])
mock_pool.assert_called_once_with(20)
s3_p = S3Path("s3://bucket")

with settings.use({"s3:download": {"object_threads": 20}}):
s3_p.download(["test"])

# confirm ThreadPoolExecutor called with expected args
mock_pool.assert_called_once_with(max_workers=20)

# confirm as_completed called with expected args
mock_completed.assert_called_once()
# check length of MockFutures list passed in to as_completed
mock_completed_called_with_futures = mock_completed.call_args[0][0]
assert len(mock_completed_called_with_futures) == 20
assert all(
[
(
fut.result()["dest"] == ["test"] and
fut.result()["source"] == f"s3://bucket/file{idx}"
)
for idx, fut in enumerate(mock_completed_called_with_futures)
]
)

@mock.patch.object(S3Path, 'list', autospec=True)
def test_download_remote_error(self, mock_list, mock_getsize, mock_make_dest_dir):
Expand All @@ -1306,11 +1352,13 @@ def test_download_other_error(self, mock_list, mock_getsize, mock_make_dest_dir)
mock_list.return_value = [
S3Path('s3://bucket/my/obj1'),
S3Path('s3://bucket/my/obj2'),
S3Path('s3://bucket/my/obj3')
]
self.mock_s3_transfer.download_file.side_effect = [None, ValueError]
self.mock_s3_transfer.download_file.side_effect = [None, ValueError("Error information")]

with self.assertRaises(ValueError):
with self.assertRaisesRegex(
exceptions.FailedDownloadError,
f"{mock_list.return_value[1]}"
):
S3Path('s3://bucket/path').download('test')

@mock.patch.object(S3Path, 'list', autospec=True)
Expand Down

0 comments on commit b087797

Please sign in to comment.