diff --git a/katdal/chunkstore.py b/katdal/chunkstore.py index fb35372c..ad6938b8 100644 --- a/katdal/chunkstore.py +++ b/katdal/chunkstore.py @@ -38,6 +38,11 @@ class StoreUnavailable(OSError, ChunkStoreError): """Could not access underlying storage medium (offline, auth failed, etc).""" +class UnsupportedStoreFeature(ChunkStoreError): + """The underlying store does not support the requested operation + (e.g. minio doesn't support lifecycle policies on buckets)""" + + class ChunkNotFound(KeyError, ChunkStoreError): """The store was accessible but a chunk with the given name was not found.""" diff --git a/katdal/chunkstore_s3.py b/katdal/chunkstore_s3.py index 4555c94f..ffd786d9 100644 --- a/katdal/chunkstore_s3.py +++ b/katdal/chunkstore_s3.py @@ -24,7 +24,7 @@ standard_library.install_aliases() # noqa: E402 from builtins import object import future.utils -from future.utils import raise_, bytes_to_native_str +from future.utils import raise_, bytes_to_native_str, raise_from import contextlib import io @@ -53,9 +53,11 @@ botocore = None from .chunkstore import (ChunkStore, StoreUnavailable, ChunkNotFound, BadChunk, - npy_header_and_body) + UnsupportedStoreFeature, npy_header_and_body) from .sensordata import to_str +from . import schemas + # Lifecycle policies unfortunately use XML encoding rather than JSON # Following path of least resistance we simply .format() this string @@ -180,6 +182,8 @@ def _raise_for_status(response): except requests.HTTPError as error: if response.status_code == 404: raise ChunkNotFound(str(error)) + elif response.status_code == 501: + raise_from(UnsupportedStoreFeature(str(error)), error) else: raise StoreUnavailable(str(error)) @@ -280,6 +284,11 @@ class S3ChunkStore(ChunkStore): expiry_days : int If set to a value greater than 0 will set a future expiry time in days for any new buckets created. + validate_xml_policies : bool + If set to true, S3 operations that use XML policies will be validated + against the inbuilt schemas. Note that these are relatively minimal + and not a guarantee of operation success on passing validation. + Requires lxml Raises @@ -288,7 +297,7 @@ class S3ChunkStore(ChunkStore): If requests is not installed (it's an optional dependency otherwise) """ - def __init__(self, session_factory, url, public_read=False, expiry_days=0): + def __init__(self, session_factory, url, public_read=False, expiry_days=0, validate_xml_policies=False): try: # Quick smoke test to see if the S3 server is available, by listing # buckets. Depending on the server in use, this may return a 403 @@ -309,9 +318,10 @@ def __init__(self, session_factory, url, public_read=False, expiry_days=0): self._url = to_str(url) self.public_read = public_read self.expiry_days = int(expiry_days) + self.validate_xml_policies = validate_xml_policies @classmethod - def _from_url(cls, url, timeout, token, credentials, public_read, expiry_days): + def _from_url(cls, url, timeout, token, credentials, public_read, expiry_days, validate_xml_policies): """Construct S3 chunk store from endpoint URL (see :meth:`from_url`).""" if token is not None: parsed = urllib.parse.urlparse(url) @@ -333,12 +343,12 @@ def session_factory(): session.mount(url, adapter) return session - return cls(session_factory, url, public_read, expiry_days) + return cls(session_factory, url, public_read, expiry_days, validate_xml_policies) @classmethod def from_url(cls, url, timeout=300, extra_timeout=10, token=None, credentials=None, public_read=False, - expiry_days=0, **kwargs): + expiry_days=0, validate_xml_policies=False, **kwargs): """Construct S3 chunk store from endpoint URL. Parameters @@ -360,6 +370,11 @@ def from_url(cls, url, timeout=300, extra_timeout=10, expiry_days : int If set to a value greater than 0 will set a future expiry time in days for any new buckets created. + validate_xml_policies : bool + If set to true, S3 operations that use XML policies will be validated + against the inbuilt schemas. Note that these are relatively minimal + and not a guarantee of operation success on passing validation. + Requires lxml kwargs : dict Extra keyword arguments (unused) @@ -375,15 +390,16 @@ def from_url(cls, url, timeout=300, extra_timeout=10, # (avoiding extra dependency on Python 2, revisit when Python 3 only) q = queue.Queue() - def _from_url(url, timeout, token, credentials, public_read, expiry_days): + def _from_url(url, timeout, token, credentials, public_read, expiry_days, validate_xml_policies): """Construct chunk store and return it (or exception) via queue.""" try: - q.put(cls._from_url(url, timeout, token, credentials, public_read, expiry_days)) + q.put(cls._from_url(url, timeout, token, credentials, public_read, expiry_days, validate_xml_policies)) except BaseException: q.put(sys.exc_info()) thread = threading.Thread(target=_from_url, - args=(url, timeout, token, credentials, public_read, expiry_days)) + args=(url, timeout, token, credentials, public_read, expiry_days, + validate_xml_policies)) thread.daemon = True thread.start() if timeout is not None: @@ -467,6 +483,10 @@ def create_array(self, array_name): if self.expiry_days > 0: xml_payload = _BASE_LIFECYCLE_POLICY.format(self.expiry_days) + if self.validate_xml_policies: + if not schemas.has_lxml: + raise ImportError("XML schema validation requires lxml to be installed.") + schemas.validate('MINIMAL_LIFECYCLE_POLICY', xml_payload) b64_md5 = base64.b64encode(hashlib.md5(xml_payload.encode('utf-8')).digest()).decode('utf-8') lifecycle_headers = {'Content-Type': 'text/xml', 'Content-MD5': b64_md5} with self.request(None, 'PUT', url, params='lifecycle', data=xml_payload, headers=lifecycle_headers): diff --git a/katdal/schemas/__init__.py b/katdal/schemas/__init__.py new file mode 100644 index 00000000..c4f65c28 --- /dev/null +++ b/katdal/schemas/__init__.py @@ -0,0 +1,58 @@ +"""Makes packaged XSD schemas available as validators.""" + +import pkg_resources +from future.utils import raise_from + +has_lxml = False +validators = {} + +try: + from lxml import etree + has_lxml = True +except ImportError: + pass + + +class ValidatorWithLog(object): + def __init__(self, validator): + self.validator = validator + + def validate(self, xml_string): + """Validates a supplied XML string against the instantiated validator. + + Parameters + --------- + xml_string : str + String representation of the XML to be turned into a document + and validated. + + Raises + ------ + etree.DocumentInvalid + if `xml_string` does not validate against the XSD schema + ValueError + if `xml_string` cannot be parsed into a valid XML document + """ + try: + xml_doc = etree.fromstring(bytes(bytearray(xml_string, encoding='utf-8'))) + except etree.XMLSyntaxError as e: + raise_from(ValueError("Supplied string cannot be parsed as XML"), e) + if not self.validator.validate(xml_doc): + log = self.validator.error_log + raise etree.DocumentInvalid(log.last_error) + return True + + +def validate(validator_name, string_to_validate): + try: + return validators[validator_name].validate(string_to_validate) + except KeyError: + raise_from(KeyError("Specified validator {} doesn't map to an installed" + " schema.".format(validator_name)), None) + + +for name in pkg_resources.resource_listdir(__name__, '.'): + if name.endswith('.xsd') and has_lxml: + xmlschema_doc = etree.parse(pkg_resources.resource_stream(__name__, name)) + xml_validator = etree.XMLSchema(xmlschema_doc) + validators[name[:-4].upper()] = ValidatorWithLog(xml_validator) diff --git a/katdal/schemas/minimal_lifecycle_policy.xsd b/katdal/schemas/minimal_lifecycle_policy.xsd new file mode 100644 index 00000000..87feaea5 --- /dev/null +++ b/katdal/schemas/minimal_lifecycle_policy.xsd @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/katdal/test/test_chunkstore_s3.py b/katdal/test/test_chunkstore_s3.py index b914642b..49de308b 100644 --- a/katdal/test/test_chunkstore_s3.py +++ b/katdal/test/test_chunkstore_s3.py @@ -45,6 +45,7 @@ import contextlib import io import warnings +import lxml import numpy as np from nose import SkipTest @@ -53,9 +54,15 @@ import requests from katdal.chunkstore_s3 import S3ChunkStore, _AWSAuth, read_array -from katdal.chunkstore import StoreUnavailable +from katdal.chunkstore import StoreUnavailable, UnsupportedStoreFeature from katdal.test.test_chunkstore import ChunkStoreTestBase +# No expiration rule included +_INVALID_LIFECYCLE_POLICY = """ + +katdal_expiry_{0}_daysEnabled +""" + def gethostbyname_slow(host): """Mock DNS lookup that is meant to be slow.""" @@ -231,6 +238,28 @@ def test_public_read(self): y = reader.get_chunk('public', slices, x.dtype) np.testing.assert_array_equal(x, y) + def test_bucket_expiry(self): + # NOTE: Minimum bucket expiry time is 1 day so real world testing is impractical. + # We expect not supported since minio doesn't allow lifecycle policies + test_store = self.from_url(self.url, expiry_days=1) + assert_raises(UnsupportedStoreFeature, test_store.create_array, 'test-expiry') + + def test_bucket_expiry_with_validation(self): + test_store = self.from_url(self.url, expiry_days=1, validate_xml_policies=True) + assert_raises(UnsupportedStoreFeature, test_store.create_array, 'test-expiry') + + @mock.patch('katdal.chunkstore_s3._BASE_LIFECYCLE_POLICY', _INVALID_LIFECYCLE_POLICY) + def test_bucket_expiry_invalid_schema(self): + # Now test with an invalid policy + test_store = self.from_url(self.url, expiry_days=1, validate_xml_policies=True) + assert_raises(lxml.etree.DocumentInvalid, test_store.create_array, 'test-expiry') + + @mock.patch('katdal.chunkstore_s3._BASE_LIFECYCLE_POLICY', "") + def test_bucket_expiry_not_xml(self): + # Code path coverage to test a policy that is not even valid XML + test_store = self.from_url(self.url, expiry_days=1, validate_xml_policies=True) + assert_raises(ValueError, test_store.create_array, 'test-expiry') + @timed(0.1 + 0.05) def test_store_unavailable_invalid_url(self): # Ensure that timeouts work diff --git a/setup.py b/setup.py index 0cdf1dd5..2de98238 100755 --- a/setup.py +++ b/setup.py @@ -35,6 +35,8 @@ author='Ludwig Schwardt', author_email='ludwig@ska.ac.za', packages=find_packages(), + package_data={'katdal': ['schemas/*']}, + include_package_data=True, scripts=[ 'scripts/h5list.py', 'scripts/h5toms.py', diff --git a/test-requirements.txt b/test-requirements.txt index d2b201f3..5df7ec00 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,5 +1,6 @@ coverage funcsigs +lxml==4.3.3 mock nose pbr