Skip to content

Commit

Permalink
Merge branch 'data-export' into 'develop'
Browse files Browse the repository at this point in the history
CLI utility to export archive data

See merge request nomad-lab/nomad-FAIR!1452
  • Loading branch information
markus1978 committed Sep 19, 2023
2 parents 9d907b5 + 88c59a9 commit def05d2
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 12 deletions.
3 changes: 1 addition & 2 deletions nomad/archive/query_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,8 +713,7 @@ def load_archive(self, upload_id: str, entry_id: str) -> ArchiveDict:
self.upload_pool[upload_id] = upload.upload_files

try:
with self.upload_pool[upload_id].read_archive(entry_id) as archive:
return archive[entry_id]
return self.upload_pool[upload_id].read_archive(entry_id)[entry_id]
except KeyError:
raise ArchiveError(f'Archive {entry_id} does not exist in upload {entry_id}.')

Expand Down
10 changes: 7 additions & 3 deletions nomad/archive/required.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
Package
from .query import ArchiveQueryError, to_json, _query_archive_key_pattern, _extract_key_and_index, _extract_child
from .storage import ArchiveReader, ArchiveList, ArchiveError, ArchiveDict
from .storage_v2 import ArchiveDict as NewArchiveDict
from ..datamodel.context import parse_path, ServerContext


Expand Down Expand Up @@ -443,10 +444,13 @@ def _apply_required(
if archive_item is None:
return None # type: ignore

archive_item = to_json(archive_item)
result: dict = {}

if isinstance(archive_item, dict) and 'm_def' in archive_item:
# avoid the bug in the old reader that primitive key-value is not included in toc
if isinstance(archive_item, ArchiveDict):
archive_item = to_json(archive_item)

if isinstance(archive_item, (dict, NewArchiveDict)) and 'm_def' in archive_item:
dataset = dataset.replace(definition=self._resolve_definition(
dataset.upload_id, archive_item['m_def'].split('@')[0], dataset.archive_root))
result['m_def'] = archive_item['m_def']
Expand All @@ -459,7 +463,7 @@ def _apply_required(
return self._resolve_refs(dataset.definition, archive_item, dataset)

if directive in ['*', 'include']:
return archive_item
return to_json(archive_item)

raise ArchiveQueryError(f'unknown directive {required}')

Expand Down
8 changes: 1 addition & 7 deletions nomad/archive/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
from typing import Iterable, Any, Tuple, Dict, BinaryIO, Union, List, cast
from io import BytesIO, BufferedReader
from collections.abc import Mapping, Sequence
Expand All @@ -26,7 +25,7 @@
import struct
import json

from nomad import utils, config
from nomad import utils
from nomad.config import archive

__packer = msgpack.Packer(autoreset=True, use_bin_type=True)
Expand Down Expand Up @@ -552,11 +551,6 @@ def read_archive(file_or_path: Union[str, BytesIO], **kwargs) -> ArchiveReader:
'''
from .storage_v2 import ArchiveWriter as ArchiveWriterNew, ArchiveReader as ArchiveReaderNew

# if the file is smaller than the threshold, just read it into memory to avoid multiple small reads
if isinstance(file_or_path, str) and os.path.getsize(file_or_path) < 4 * config.archive.read_buffer_size:
with open(file_or_path, 'rb') as f:
file_or_path = BytesIO(f.read())

if isinstance(file_or_path, str):
with open(file_or_path, 'rb') as f:
magic = f.read(ArchiveWriterNew.magic_len)
Expand Down
77 changes: 77 additions & 0 deletions nomad/cli/admin/uploads.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,83 @@ def _query_uploads(
return final_query, proc.Upload.objects(final_query)


@uploads.command(help='List selected uploads')
@click.argument('UPLOADS', nargs=-1)
@click.option('--required', type=str, help='The required in JSON format')
@click.option('-o', '--output', type=str, help='The file to write data to')
@click.pass_context
def export(ctx, uploads, required, output: str):
import sys
from nomad.processing import Entry
from nomad.utils import get_logger
from nomad.files import UploadFiles
from nomad.archive import ArchiveQueryError, RequiredReader
import time
import zipfile

logger = get_logger(__name__)

if not output:
logger.error('no output given')
sys.exit(1)

if not output.endswith('.zip'):
logger.error('only zip output is supported')
sys.exit(1)

output_file = zipfile.ZipFile(output, 'w', allowZip64=True)

def write(entry_id, archive_data):
archive_json = json.dumps(archive_data)
output_file.writestr(f'{entry_id}.json', archive_json, compress_type=zipfile.ZIP_DEFLATED)

_, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs)

try:
required_data = json.loads(required)
except Exception as e:
logger.error('could not parse required', exc_info=e)
sys.exit(1)

try:
required_reader = RequiredReader(required_data)
except Exception as e:
logger.error('could not validate required', exc_info=e)
sys.exit(1)

def get_rss():
return time.time()

start_time = get_rss()

upload_count = 0
total_count = 0
for upload in uploads:
upload_id = upload.upload_id
upload_files = UploadFiles.get(upload_id)
upload_count += 1
entry_ids = list(entry.entry_id for entry in Entry.objects(upload_id=upload_id))
entry_count = 0
for entry_id in entry_ids:
entry_count += 1
total_count += 1
try:
archive = upload_files.read_archive(entry_id, use_blocked_toc=False)
archive_data = required_reader.read(archive, entry_id, upload_id)
write(entry_id, archive_data)
except ArchiveQueryError as e:
logger.error('could not read archive', exc_info=e, entry_id=entry_id)
except KeyError as e:
logger.error('missing archive', exc_info=e, entry_id=entry_id)

if total_count % 100 == 0:
print(f'{upload_count:5}/{len(uploads)} {entry_count:5}/{len(entry_ids)} {total_count:5} {((get_rss() - start_time))} {upload_id}')

upload_files.close()

output_file.close()


@uploads.command(help='List selected uploads')
@click.argument('UPLOADS', nargs=-1)
@click.option('-e', '--entries', is_flag=True, help='Show details about entries.')
Expand Down

0 comments on commit def05d2

Please sign in to comment.