diff --git a/nomad/archive/query_reader.py b/nomad/archive/query_reader.py index 4ea91ed14..d02859228 100644 --- a/nomad/archive/query_reader.py +++ b/nomad/archive/query_reader.py @@ -713,8 +713,7 @@ def load_archive(self, upload_id: str, entry_id: str) -> ArchiveDict: self.upload_pool[upload_id] = upload.upload_files try: - with self.upload_pool[upload_id].read_archive(entry_id) as archive: - return archive[entry_id] + return self.upload_pool[upload_id].read_archive(entry_id)[entry_id] except KeyError: raise ArchiveError(f'Archive {entry_id} does not exist in upload {entry_id}.') diff --git a/nomad/archive/required.py b/nomad/archive/required.py index 42bb2d6cf..9724889fd 100644 --- a/nomad/archive/required.py +++ b/nomad/archive/required.py @@ -30,6 +30,7 @@ Package from .query import ArchiveQueryError, to_json, _query_archive_key_pattern, _extract_key_and_index, _extract_child from .storage import ArchiveReader, ArchiveList, ArchiveError, ArchiveDict +from .storage_v2 import ArchiveDict as NewArchiveDict from ..datamodel.context import parse_path, ServerContext @@ -443,10 +444,13 @@ def _apply_required( if archive_item is None: return None # type: ignore - archive_item = to_json(archive_item) result: dict = {} - if isinstance(archive_item, dict) and 'm_def' in archive_item: + # avoid the bug in the old reader that primitive key-value is not included in toc + if isinstance(archive_item, ArchiveDict): + archive_item = to_json(archive_item) + + if isinstance(archive_item, (dict, NewArchiveDict)) and 'm_def' in archive_item: dataset = dataset.replace(definition=self._resolve_definition( dataset.upload_id, archive_item['m_def'].split('@')[0], dataset.archive_root)) result['m_def'] = archive_item['m_def'] @@ -459,7 +463,7 @@ def _apply_required( return self._resolve_refs(dataset.definition, archive_item, dataset) if directive in ['*', 'include']: - return archive_item + return to_json(archive_item) raise ArchiveQueryError(f'unknown directive {required}') diff --git a/nomad/archive/storage.py b/nomad/archive/storage.py index 342433789..be55df9db 100644 --- a/nomad/archive/storage.py +++ b/nomad/archive/storage.py @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os from typing import Iterable, Any, Tuple, Dict, BinaryIO, Union, List, cast from io import BytesIO, BufferedReader from collections.abc import Mapping, Sequence @@ -26,7 +25,7 @@ import struct import json -from nomad import utils, config +from nomad import utils from nomad.config import archive __packer = msgpack.Packer(autoreset=True, use_bin_type=True) @@ -552,11 +551,6 @@ def read_archive(file_or_path: Union[str, BytesIO], **kwargs) -> ArchiveReader: ''' from .storage_v2 import ArchiveWriter as ArchiveWriterNew, ArchiveReader as ArchiveReaderNew - # if the file is smaller than the threshold, just read it into memory to avoid multiple small reads - if isinstance(file_or_path, str) and os.path.getsize(file_or_path) < 4 * config.archive.read_buffer_size: - with open(file_or_path, 'rb') as f: - file_or_path = BytesIO(f.read()) - if isinstance(file_or_path, str): with open(file_or_path, 'rb') as f: magic = f.read(ArchiveWriterNew.magic_len) diff --git a/nomad/cli/admin/uploads.py b/nomad/cli/admin/uploads.py index 9189a9947..238c09361 100644 --- a/nomad/cli/admin/uploads.py +++ b/nomad/cli/admin/uploads.py @@ -268,6 +268,83 @@ def _query_uploads( return final_query, proc.Upload.objects(final_query) +@uploads.command(help='List selected uploads') +@click.argument('UPLOADS', nargs=-1) +@click.option('--required', type=str, help='The required in JSON format') +@click.option('-o', '--output', type=str, help='The file to write data to') +@click.pass_context +def export(ctx, uploads, required, output: str): + import sys + from nomad.processing import Entry + from nomad.utils import get_logger + from nomad.files import UploadFiles + from nomad.archive import ArchiveQueryError, RequiredReader + import time + import zipfile + + logger = get_logger(__name__) + + if not output: + logger.error('no output given') + sys.exit(1) + + if not output.endswith('.zip'): + logger.error('only zip output is supported') + sys.exit(1) + + output_file = zipfile.ZipFile(output, 'w', allowZip64=True) + + def write(entry_id, archive_data): + archive_json = json.dumps(archive_data) + output_file.writestr(f'{entry_id}.json', archive_json, compress_type=zipfile.ZIP_DEFLATED) + + _, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs) + + try: + required_data = json.loads(required) + except Exception as e: + logger.error('could not parse required', exc_info=e) + sys.exit(1) + + try: + required_reader = RequiredReader(required_data) + except Exception as e: + logger.error('could not validate required', exc_info=e) + sys.exit(1) + + def get_rss(): + return time.time() + + start_time = get_rss() + + upload_count = 0 + total_count = 0 + for upload in uploads: + upload_id = upload.upload_id + upload_files = UploadFiles.get(upload_id) + upload_count += 1 + entry_ids = list(entry.entry_id for entry in Entry.objects(upload_id=upload_id)) + entry_count = 0 + for entry_id in entry_ids: + entry_count += 1 + total_count += 1 + try: + archive = upload_files.read_archive(entry_id, use_blocked_toc=False) + archive_data = required_reader.read(archive, entry_id, upload_id) + write(entry_id, archive_data) + except ArchiveQueryError as e: + logger.error('could not read archive', exc_info=e, entry_id=entry_id) + except KeyError as e: + logger.error('missing archive', exc_info=e, entry_id=entry_id) + + if total_count % 100 == 0: + print(f'{upload_count:5}/{len(uploads)} {entry_count:5}/{len(entry_ids)} {total_count:5} {((get_rss() - start_time))} {upload_id}') + + upload_files.close() + + output_file.close() + + @uploads.command(help='List selected uploads') @click.argument('UPLOADS', nargs=-1) @click.option('-e', '--entries', is_flag=True, help='Show details about entries.')