diff --git a/pardata/_dataset.py b/pardata/_dataset.py index 66a6c48..15d1c7d 100644 --- a/pardata/_dataset.py +++ b/pardata/_dataset.py @@ -157,7 +157,26 @@ def _extract_as_tar(self, archive_fp: typing_.PathLike) -> None: # We do not specify 'utf-8' here to match the default encoding used by the OS, which also likely # uses this encoding for accessing the filesystem. json.dump(members, f, indent=2) - tar.extractall(path=self._data_dir) + def is_within_directory(directory, target): + + abs_directory = os.path.abspath(directory) + abs_target = os.path.abspath(target) + + prefix = os.path.commonprefix([abs_directory, abs_target]) + + return prefix == abs_directory + + def safe_extract(tar, path=".", members=None, *, numeric_owner=False): + + for member in tar.getmembers(): + member_path = os.path.join(path, member.name) + if not is_within_directory(path, member_path): + raise Exception("Attempted Path Traversal in Tar File") + + tar.extractall(path, members, numeric_owner=numeric_owner) + + + safe_extract(tar, path=self._data_dir) def _extract_as_zip(self, archive_fp: typing_.PathLike) -> None: """Extract ``archive_fp`` as tar. Raise the :exception:`zipfile.BadZipFile` object raised by diff --git a/tests/conftest.py b/tests/conftest.py index 221bc64..275bb2f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -186,7 +186,26 @@ def _make_zip_copy(tar_path: Path, zip_path: Path): with TemporaryDirectory() as tmpdir: with tarfile.open(tar_path) as f: - f.extractall(tmpdir) + def is_within_directory(directory, target): + + abs_directory = os.path.abspath(directory) + abs_target = os.path.abspath(target) + + prefix = os.path.commonprefix([abs_directory, abs_target]) + + return prefix == abs_directory + + def safe_extract(tar, path=".", members=None, *, numeric_owner=False): + + for member in tar.getmembers(): + member_path = os.path.join(path, member.name) + if not is_within_directory(path, member_path): + raise Exception("Attempted Path Traversal in Tar File") + + tar.extractall(path, members, numeric_owner=numeric_owner) + + + safe_extract(f, tmpdir) shutil.make_archive(base_name=zip_path.with_suffix(''), format='zip', root_dir=tmpdir) # Calculate sha512sum of the zip archive