From bf9ee3cefb4c07c698079ef00ea4f285220de1c9 Mon Sep 17 00:00:00 2001 From: Pablo Panero Date: Fri, 21 Jan 2022 14:28:19 +0100 Subject: [PATCH 1/4] datastreams: implement asynchronous writer --- invenio_vocabularies/datastreams/tasks.py | 25 +++++++++++++ invenio_vocabularies/datastreams/writers.py | 19 ++++++++++ tests/datastreams/conftest.py | 11 ++++-- tests/datastreams/test_datastreams_tasks.py | 34 +++++++++++++++++ tests/datastreams/test_writers.py | 41 ++++++++++++++++++++- 5 files changed, 125 insertions(+), 5 deletions(-) create mode 100644 invenio_vocabularies/datastreams/tasks.py create mode 100644 tests/datastreams/test_datastreams_tasks.py diff --git a/invenio_vocabularies/datastreams/tasks.py b/invenio_vocabularies/datastreams/tasks.py new file mode 100644 index 00000000..9407c051 --- /dev/null +++ b/invenio_vocabularies/datastreams/tasks.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Data Streams Celery tasks.""" + +from celery import shared_task + +from ..datastreams import StreamEntry +from ..datastreams.factories import WriterFactory + + +@shared_task(ignore_result=True) +def write_entry(writer, entry): + """Write an entry. + + :param writer: writer configuration as accepted by the WriterFactory. + :param entry: dictionary, StreamEntry is not serializable. + """ + writer = WriterFactory.create(config=writer) + writer.write(StreamEntry(entry)) diff --git a/invenio_vocabularies/datastreams/writers.py b/invenio_vocabularies/datastreams/writers.py index d33c7763..abb63dca 100644 --- a/invenio_vocabularies/datastreams/writers.py +++ b/invenio_vocabularies/datastreams/writers.py @@ -20,6 +20,7 @@ from .datastreams import StreamEntry from .errors import WriterError +from .tasks import write_entry class BaseWriter(ABC): @@ -106,3 +107,21 @@ def write(self, stream_entry, *args, **kwargs): yaml.safe_dump([stream_entry.entry], file, allow_unicode=True) return stream_entry + + +class AsyncWriter(BaseWriter): + """Writes the entries asynchronously (celery task).""" + + def __init__(self, writer, *args, **kwargs): + """Constructor. + + :param writer: writer to use. + """ + self._writer = writer + super().__init__(*args, **kwargs) + + def write(self, stream_entry, *args, **kwargs): + """Launches a celery task to write an entry.""" + write_entry.delay(self._writer, stream_entry.entry) + + return stream_entry diff --git a/tests/datastreams/conftest.py b/tests/datastreams/conftest.py index f9c01b98..9c31960c 100644 --- a/tests/datastreams/conftest.py +++ b/tests/datastreams/conftest.py @@ -18,6 +18,8 @@ import pytest +from invenio_vocabularies.config import VOCABULARIES_DATASTREAM_READERS, \ + VOCABULARIES_DATASTREAM_TRANSFORMERS, VOCABULARIES_DATASTREAM_WRITERS from invenio_vocabularies.datastreams.errors import TransformerError, WriterError from invenio_vocabularies.datastreams.readers import BaseReader, JsonReader, ZipReader from invenio_vocabularies.datastreams.transformers import BaseTransformer @@ -75,12 +77,15 @@ def write(self, stream_entry, *args, **kwargs): def app_config(app_config): """Mimic an instance's configuration.""" app_config["VOCABULARIES_DATASTREAM_READERS"] = { - "json": JsonReader, + **VOCABULARIES_DATASTREAM_READERS, "test": TestReader, - "zip": ZipReader, } - app_config["VOCABULARIES_DATASTREAM_TRANSFORMERS"] = {"test": TestTransformer} + app_config["VOCABULARIES_DATASTREAM_TRANSFORMERS"] = { + **VOCABULARIES_DATASTREAM_TRANSFORMERS, + "test": TestTransformer + } app_config["VOCABULARIES_DATASTREAM_WRITERS"] = { + **VOCABULARIES_DATASTREAM_WRITERS, "test": TestWriter, "fail": FailingTestWriter, } diff --git a/tests/datastreams/test_datastreams_tasks.py b/tests/datastreams/test_datastreams_tasks.py new file mode 100644 index 00000000..d907422f --- /dev/null +++ b/tests/datastreams/test_datastreams_tasks.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021-2022 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Data Streams tasks tests.""" + +from pathlib import Path + +import yaml + +from invenio_vocabularies.datastreams import StreamEntry +from invenio_vocabularies.datastreams.tasks import write_entry + + +def test_write_entry(app): + filepath = 'writer_test.yaml' + yaml_writer_config = { + "type": "yaml", + "args": { + "filepath": filepath + } + } + entry = {"key_one": [{"inner_one": 1}]} + write_entry(yaml_writer_config, entry) + + filepath = Path(filepath) + with open(filepath) as file: + assert yaml.safe_load(file) == [entry] + + filepath.unlink() diff --git a/tests/datastreams/test_writers.py b/tests/datastreams/test_writers.py index ec9cafc8..d25d2c2f 100644 --- a/tests/datastreams/test_writers.py +++ b/tests/datastreams/test_writers.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2021 CERN. +# Copyright (C) 2021-2022 CERN. # # Invenio-Vocabularies is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -16,7 +16,12 @@ from invenio_vocabularies.datastreams import StreamEntry from invenio_vocabularies.datastreams.errors import WriterError -from invenio_vocabularies.datastreams.writers import ServiceWriter, YamlWriter +from invenio_vocabularies.datastreams.writers import AsyncWriter, \ + ServiceWriter, YamlWriter + +## +# Service Writer +## def test_service_writer_non_existing(lang_type, lang_data, service, identity): @@ -67,6 +72,10 @@ def test_service_writer_update_non_existing(lang_type, lang_data, service, ident assert dict(record, **updated_lang) == record +## +# YAML Writer +## + def test_yaml_writer(): filepath = Path("writer_test.yaml") @@ -80,3 +89,31 @@ def test_yaml_writer(): assert yaml.safe_load(file) == test_output filepath.unlink() + +## +# Async Writer +## + + +def test_async_writer(app): + filepath = 'writer_test.yaml' + yaml_writer_config = { + "type": "yaml", + "args": { + "filepath": filepath + } + } + async_writer = AsyncWriter(yaml_writer_config) + + test_output = [ + {"key_one": [{"inner_one": 1}]}, + {"key_two": [{"inner_two": "two"}]} + ] + for output in test_output: + async_writer.write(stream_entry=StreamEntry(output)) + + filepath = Path(filepath) + with open(filepath) as file: + assert yaml.safe_load(file) == test_output + + filepath.unlink() From 68ce854594a2232f65f9139efb5dd92921fb3e02 Mon Sep 17 00:00:00 2001 From: Javier Romero Castro Date: Fri, 19 Jul 2024 12:24:26 +0200 Subject: [PATCH 2/4] datastreams: minor fixes --- invenio_vocabularies/cli.py | 5 ++++- invenio_vocabularies/config.py | 3 ++- tests/datastreams/conftest.py | 9 +++++--- tests/datastreams/test_datastreams_tasks.py | 10 ++------- tests/datastreams/test_writers.py | 23 +++++++++------------ 5 files changed, 24 insertions(+), 26 deletions(-) diff --git a/invenio_vocabularies/cli.py b/invenio_vocabularies/cli.py index a9e1e55a..3743675b 100644 --- a/invenio_vocabularies/cli.py +++ b/invenio_vocabularies/cli.py @@ -101,7 +101,10 @@ def update(vocabulary, filepath=None, origin=None): config = vc.get_config(filepath, origin) for w_conf in config["writers"]: - w_conf["args"]["update"] = True + if w_conf["type"] == "async": + w_conf["args"]["writer"]["args"]["update"] = True + else: + w_conf["args"]["update"] = True success, errored, filtered = _process_vocab(config) diff --git a/invenio_vocabularies/config.py b/invenio_vocabularies/config.py index 27ac282d..98a1d7fa 100644 --- a/invenio_vocabularies/config.py +++ b/invenio_vocabularies/config.py @@ -24,7 +24,7 @@ ZipReader, ) from .datastreams.transformers import XMLTransformer -from .datastreams.writers import ServiceWriter, YamlWriter +from .datastreams.writers import AsyncWriter, ServiceWriter, YamlWriter from .resources import VocabulariesResourceConfig from .services.config import VocabulariesServiceConfig @@ -134,6 +134,7 @@ VOCABULARIES_DATASTREAM_WRITERS = { "service": ServiceWriter, "yaml": YamlWriter, + "async": AsyncWriter, } """Data Streams writers.""" diff --git a/tests/datastreams/conftest.py b/tests/datastreams/conftest.py index 9c31960c..f727ed30 100644 --- a/tests/datastreams/conftest.py +++ b/tests/datastreams/conftest.py @@ -18,8 +18,11 @@ import pytest -from invenio_vocabularies.config import VOCABULARIES_DATASTREAM_READERS, \ - VOCABULARIES_DATASTREAM_TRANSFORMERS, VOCABULARIES_DATASTREAM_WRITERS +from invenio_vocabularies.config import ( + VOCABULARIES_DATASTREAM_READERS, + VOCABULARIES_DATASTREAM_TRANSFORMERS, + VOCABULARIES_DATASTREAM_WRITERS, +) from invenio_vocabularies.datastreams.errors import TransformerError, WriterError from invenio_vocabularies.datastreams.readers import BaseReader, JsonReader, ZipReader from invenio_vocabularies.datastreams.transformers import BaseTransformer @@ -82,7 +85,7 @@ def app_config(app_config): } app_config["VOCABULARIES_DATASTREAM_TRANSFORMERS"] = { **VOCABULARIES_DATASTREAM_TRANSFORMERS, - "test": TestTransformer + "test": TestTransformer, } app_config["VOCABULARIES_DATASTREAM_WRITERS"] = { **VOCABULARIES_DATASTREAM_WRITERS, diff --git a/tests/datastreams/test_datastreams_tasks.py b/tests/datastreams/test_datastreams_tasks.py index d907422f..e478a05a 100644 --- a/tests/datastreams/test_datastreams_tasks.py +++ b/tests/datastreams/test_datastreams_tasks.py @@ -12,18 +12,12 @@ import yaml -from invenio_vocabularies.datastreams import StreamEntry from invenio_vocabularies.datastreams.tasks import write_entry def test_write_entry(app): - filepath = 'writer_test.yaml' - yaml_writer_config = { - "type": "yaml", - "args": { - "filepath": filepath - } - } + filepath = "writer_test.yaml" + yaml_writer_config = {"type": "yaml", "args": {"filepath": filepath}} entry = {"key_one": [{"inner_one": 1}]} write_entry(yaml_writer_config, entry) diff --git a/tests/datastreams/test_writers.py b/tests/datastreams/test_writers.py index d25d2c2f..1f70cb57 100644 --- a/tests/datastreams/test_writers.py +++ b/tests/datastreams/test_writers.py @@ -16,8 +16,11 @@ from invenio_vocabularies.datastreams import StreamEntry from invenio_vocabularies.datastreams.errors import WriterError -from invenio_vocabularies.datastreams.writers import AsyncWriter, \ - ServiceWriter, YamlWriter +from invenio_vocabularies.datastreams.writers import ( + AsyncWriter, + ServiceWriter, + YamlWriter, +) ## # Service Writer @@ -72,6 +75,7 @@ def test_service_writer_update_non_existing(lang_type, lang_data, service, ident assert dict(record, **updated_lang) == record + ## # YAML Writer ## @@ -90,25 +94,18 @@ def test_yaml_writer(): filepath.unlink() + ## # Async Writer ## def test_async_writer(app): - filepath = 'writer_test.yaml' - yaml_writer_config = { - "type": "yaml", - "args": { - "filepath": filepath - } - } + filepath = "writer_test.yaml" + yaml_writer_config = {"type": "yaml", "args": {"filepath": filepath}} async_writer = AsyncWriter(yaml_writer_config) - test_output = [ - {"key_one": [{"inner_one": 1}]}, - {"key_two": [{"inner_two": "two"}]} - ] + test_output = [{"key_one": [{"inner_one": 1}]}, {"key_two": [{"inner_two": "two"}]}] for output in test_output: async_writer.write(stream_entry=StreamEntry(output)) From 7f5d38e923de92ef5a2be6d39995e84fd1734205 Mon Sep 17 00:00:00 2001 From: Javier Romero Castro Date: Fri, 19 Jul 2024 13:43:28 +0200 Subject: [PATCH 3/4] names: add orcid public data sync * Adds delete all values of a vocab to CLI --- invenio_vocabularies/cli.py | 15 ++- invenio_vocabularies/config.py | 11 ++ .../contrib/names/datastreams.py | 120 ++++++++++++++++-- .../datastreams/datastreams.py | 54 ++++++-- invenio_vocabularies/datastreams/readers.py | 6 +- invenio_vocabularies/datastreams/tasks.py | 16 ++- invenio_vocabularies/datastreams/writers.py | 52 +++++++- invenio_vocabularies/services/tasks.py | 8 +- setup.cfg | 1 + tests/datastreams/test_datastreams_tasks.py | 14 +- 10 files changed, 264 insertions(+), 33 deletions(-) diff --git a/invenio_vocabularies/cli.py b/invenio_vocabularies/cli.py index 3743675b..b7d91f5e 100644 --- a/invenio_vocabularies/cli.py +++ b/invenio_vocabularies/cli.py @@ -143,18 +143,27 @@ def convert(vocabulary, filepath=None, origin=None, target=None, num_samples=Non type=click.STRING, help="Identifier of the vocabulary item to delete.", ) -@click.option("--all", is_flag=True, default=False, help="Not supported yet.") +@click.option("--all", is_flag=True, default=False) @with_appcontext def delete(vocabulary, identifier, all): """Delete all items or a specific one of the vocabulary.""" - if not id and not all: + if not identifier and not all: click.secho("An identifier or the --all flag must be present.", fg="red") exit(1) + vc = get_vocabulary_config(vocabulary) service = vc.get_service() if identifier: try: - if service.delete(identifier, system_identity): + if service.delete(system_identity, identifier): click.secho(f"{identifier} deleted from {vocabulary}.", fg="green") except (PIDDeletedError, PIDDoesNotExistError): click.secho(f"PID {identifier} not found.") + elif all: + items = service.scan(system_identity) + for item in items.hits: + try: + if service.delete(system_identity, item["id"]): + click.secho(f"{item['id']} deleted from {vocabulary}.", fg="green") + except (PIDDeletedError, PIDDoesNotExistError): + click.secho(f"PID {item['id']} not found.") diff --git a/invenio_vocabularies/config.py b/invenio_vocabularies/config.py index 98a1d7fa..a1fdaa64 100644 --- a/invenio_vocabularies/config.py +++ b/invenio_vocabularies/config.py @@ -155,3 +155,14 @@ "sort": ["name", "count"], } """Vocabulary type search configuration.""" + +VOCABULARIES_ORCID_ACCESS_KEY = "TODO" +"""ORCID access key to access the s3 bucket.""" +VOCABULARIES_ORCID_SECRET_KEY = "TODO" +"""ORCID secret key to access the s3 bucket.""" +VOCABULARIES_ORCID_SUMMARIES_BUCKET = "v3.0-summaries" +"""ORCID summaries bucket name.""" +VOCABULARIES_ORCID_SYNC_MAX_WORKERS = 32 +"""ORCID max number of simultaneous workers/connections.""" +VOCABULARIES_ORCID_SYNC_DAYS = 1 +"""ORCID number of days to sync.""" diff --git a/invenio_vocabularies/contrib/names/datastreams.py b/invenio_vocabularies/contrib/names/datastreams.py index ce050f3e..4f6c1e93 100644 --- a/invenio_vocabularies/contrib/names/datastreams.py +++ b/invenio_vocabularies/contrib/names/datastreams.py @@ -8,15 +8,115 @@ """Names datastreams, transformers, writers and readers.""" -from invenio_access.permissions import system_identity +import io +import tarfile +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timedelta + +import s3fs +from flask import current_app from invenio_records.dictutils import dict_lookup from ...datastreams.errors import TransformerError -from ...datastreams.readers import SimpleHTTPReader +from ...datastreams.readers import BaseReader, SimpleHTTPReader from ...datastreams.transformers import BaseTransformer from ...datastreams.writers import ServiceWriter +class OrcidDataSyncReader(BaseReader): + """ORCiD Data Sync Reader.""" + + def _fetch_orcid_data(self, orcid_to_sync, fs, bucket): + """Fetches a single ORCiD record from S3.""" + # The ORCiD file key is located in a folder which name corresponds to the last three digits of the ORCiD + suffix = orcid_to_sync[-3:] + key = f"{suffix}/{orcid_to_sync}.xml" + try: + with fs.open(f"s3://{bucket}/{key}", "rb") as f: + file_response = f.read() + return file_response + except Exception as e: + # TODO: log + return None + + def _process_lambda_file(self, fileobj): + """Process the ORCiD lambda file and returns a list of ORCiDs to sync. + + The decoded fileobj looks like the following: + orcid,last_modified,created + 0000-0001-5109-3700,2021-08-02 15:00:00.000,2021-08-02 15:00:00.000 + + Yield ORCiDs to sync until the last sync date is reached. + """ + date_format = "%Y-%m-%d %H:%M:%S.%f" + date_format_no_millis = "%Y-%m-%d %H:%M:%S" + + last_sync = datetime.now() - timedelta(days=current_app.config["VOCABULARIES_ORCID_SYNC_DAYS"]) + + file_content = fileobj.read().decode("utf-8") + + for line in file_content.splitlines()[1:]: # Skip the header line + elements = line.split(",") + orcid = elements[0] + + # Lambda file is ordered by last modified date + last_modified_str = elements[3] + try: + last_modified_date = datetime.strptime(last_modified_str, date_format) + except ValueError: + last_modified_date = datetime.strptime( + last_modified_str, date_format_no_millis + ) + + if last_modified_date >= last_sync: + yield orcid + else: + break + + def _iter(self, orcids, fs): + """Iterates over the ORCiD records yielding each one.""" + + with ThreadPoolExecutor( + max_workers=current_app.config["VOCABULARIES_ORCID_SYNC_MAX_WORKERS"] + ) as executor: + futures = [ + executor.submit( + self._fetch_orcid_data, + orcid, + fs, + current_app.config["VOCABULARIES_ORCID_SUMMARIES_BUCKET"], + ) + for orcid in orcids + ] + for future in as_completed(futures): + result = future.result() + if result is not None: + yield result + + def read(self, item=None, *args, **kwargs): + """Streams the ORCiD lambda file, process it to get the ORCiDS to sync and yields it's data.""" + fs = s3fs.S3FileSystem( + key=current_app.config["VOCABULARIES_ORCID_ACCESS_KEY"], + secret=current_app.config["VOCABULARIES_ORCID_SECRET_KEY"], + ) + # Read the file from S3 + with fs.open("s3://orcid-lambda-file/last_modified.csv.tar", "rb") as f: + tar_content = f.read() + + orcids_to_sync = [] + # Opens tar file and process it + with tarfile.open(fileobj=io.BytesIO(tar_content)) as tar: + # Iterate over each member (file or directory) in the tar file + for member in tar.getmembers(): + # Extract the file + extracted_file = tar.extractfile(member) + if extracted_file: + # Process the file and get the ORCiDs to sync + orcids_to_sync.extend(self._process_lambda_file(extracted_file)) + + yield from self._iter(orcids_to_sync, fs) + + class OrcidHTTPReader(SimpleHTTPReader): """ORCiD HTTP Reader.""" @@ -42,6 +142,8 @@ def apply(self, stream_entry, **kwargs): name = person.get("name") if name is None: raise TransformerError(f"Name not found in ORCiD entry.") + if name.get("family-name") is None: + raise TransformerError(f"Family name not found in ORCiD entry.") entry = { "id": orcid_id, @@ -89,6 +191,7 @@ def _entry_id(self, entry): VOCABULARIES_DATASTREAM_READERS = { "orcid-http": OrcidHTTPReader, + "orcid-data-sync": OrcidDataSyncReader, } @@ -107,22 +210,23 @@ def _entry_id(self, entry): DATASTREAM_CONFIG = { "readers": [ { - "type": "tar", - "args": { - "regex": "\\.xml$", - }, + "type": "orcid-data-sync", }, {"type": "xml"}, ], "transformers": [{"type": "orcid"}], "writers": [ { - "type": "names-service", + "type": "async", "args": { - "identity": system_identity, + "writer": { + "type": "names-service", + } }, } ], + "batch_size": 1000, + "write_many": True, } """ORCiD Data Stream configuration. diff --git a/invenio_vocabularies/datastreams/datastreams.py b/invenio_vocabularies/datastreams/datastreams.py index 3fc2d1e4..459a9744 100644 --- a/invenio_vocabularies/datastreams/datastreams.py +++ b/invenio_vocabularies/datastreams/datastreams.py @@ -14,11 +14,17 @@ class StreamEntry: """Object to encapsulate streams processing.""" - def __init__(self, entry, errors=None): - """Constructor.""" + def __init__(self, entry, errors=None, op_type=None): + """Constructor for the StreamEntry class. + + :param entry (object): The entry object, usually a record dict. + :param errors (list, optional): List of errors. Defaults to None. + :param op_type (str, optional): The operation type. Defaults to None. + """ self.entry = entry self.filtered = False self.errors = errors or [] + self.op_type = op_type class DataStream: @@ -39,15 +45,9 @@ def filter(self, stream_entry, *args, **kwargs): """Checks if an stream_entry should be filtered out (skipped).""" return False - def process(self, *args, **kwargs): - """Iterates over the entries. - - Uses the reader to get the raw entries and transforms them. - It will iterate over the `StreamEntry` objects returned by - the reader, apply the transformations and yield the result of - writing it. - """ - for stream_entry in self.read(): + def process_batch(self, batch, write_many=False): + transformed_entries = [] + for stream_entry in batch: if stream_entry.errors: yield stream_entry # reading errors else: @@ -58,7 +58,32 @@ def process(self, *args, **kwargs): transformed_entry.filtered = True yield transformed_entry else: - yield self.write(transformed_entry) + transformed_entries.append(transformed_entry) + if transformed_entries: + if write_many: + yield from self.batch_write(transformed_entries) + else: + yield from (self.write(entry) for entry in transformed_entries) + + def process(self, batch_size=100, write_many=False, *args, **kwargs): + """Iterates over the entries. + + Uses the reader to get the raw entries and transforms them. + It will iterate over the `StreamEntry` objects returned by + the reader, apply the transformations and yield the result of + writing it. + """ + + batch = [] + for stream_entry in self.read(): + batch.append(stream_entry) + if len(batch) >= batch_size: + yield from self.process_batch(batch, write_many=write_many) + batch = [] + + # Process any remaining entries in the last batch + if batch: + yield from self.process_batch(batch, write_many=write_many) def read(self): """Recursively read the entries.""" @@ -107,6 +132,11 @@ def write(self, stream_entry, *args, **kwargs): return stream_entry + def batch_write(self, stream_entries, *args, **kwargs): + """Apply the transformations to an stream_entry. Errors are handler in the service layer.""" + for writer in self._writers: + yield from writer.write_many(stream_entries) + def total(self, *args, **kwargs): """The total of entries obtained from the origin.""" raise NotImplementedError() diff --git a/invenio_vocabularies/datastreams/readers.py b/invenio_vocabularies/datastreams/readers.py index 736c44ca..cbef525e 100644 --- a/invenio_vocabularies/datastreams/readers.py +++ b/invenio_vocabularies/datastreams/readers.py @@ -21,7 +21,7 @@ import requests import yaml from lxml import etree -from lxml.html import parse as html_parse +from lxml.html import fromstring from .errors import ReaderError from .xml import etree_to_dict @@ -226,8 +226,8 @@ class XMLReader(BaseReader): def _iter(self, fp, *args, **kwargs): """Read and parse an XML file to dict.""" # NOTE: We parse HTML, to skip XML validation and strip XML namespaces - xml_tree = html_parse(fp).getroot() - record = etree_to_dict(xml_tree)["html"]["body"].get("record") + xml_tree = fromstring(fp) + record = etree_to_dict(xml_tree).get("record") if not record: raise ReaderError(f"Record not found in XML entry.") diff --git a/invenio_vocabularies/datastreams/tasks.py b/invenio_vocabularies/datastreams/tasks.py index 9407c051..0f33d040 100644 --- a/invenio_vocabularies/datastreams/tasks.py +++ b/invenio_vocabularies/datastreams/tasks.py @@ -15,11 +15,23 @@ @shared_task(ignore_result=True) -def write_entry(writer, entry): +def write_entry(writer_config, entry): """Write an entry. :param writer: writer configuration as accepted by the WriterFactory. :param entry: dictionary, StreamEntry is not serializable. """ - writer = WriterFactory.create(config=writer) + writer = WriterFactory.create(config=writer_config) writer.write(StreamEntry(entry)) + + +@shared_task(ignore_result=True) +def write_many_entry(writer_config, entries): + """Write many entries. + + :param writer: writer configuration as accepted by the WriterFactory. + :param entry: lisf ot dictionaries, StreamEntry is not serializable. + """ + writer = WriterFactory.create(config=writer_config) + stream_entries = [StreamEntry(entry) for entry in entries] + writer.write_many(stream_entries) diff --git a/invenio_vocabularies/datastreams/writers.py b/invenio_vocabularies/datastreams/writers.py index abb63dca..1d6f6883 100644 --- a/invenio_vocabularies/datastreams/writers.py +++ b/invenio_vocabularies/datastreams/writers.py @@ -20,12 +20,17 @@ from .datastreams import StreamEntry from .errors import WriterError -from .tasks import write_entry +from .tasks import write_entry, write_many_entry class BaseWriter(ABC): """Base writer.""" + def __init__(self, *args, **kwargs): + """Base initialization logic.""" + # Add any base initialization here if needed + pass + @abstractmethod def write(self, stream_entry, *args, **kwargs): """Writes the input stream entry to the target output. @@ -36,6 +41,15 @@ def write(self, stream_entry, *args, **kwargs): """ pass + def write_many(self, stream_entries, *args, **kwargs): + """Writes the input streams entry to the target output. + + :returns: A List of StreamEntry. The result of writing the entry. + Raises WriterException in case of errors. + + """ + pass + class ServiceWriter(BaseWriter): """Writes the entries to an RDM instance using a Service object.""" @@ -86,6 +100,23 @@ def write(self, stream_entry, *args, **kwargs): # TODO: Check if we can get the error message easier raise WriterError([{"InvalidRelationValue": err.args[0]}]) + def write_many(self, stream_entries, *args, **kwargs): + entries = [entry.entry for entry in stream_entries] + entries_with_id = [(self._entry_id(entry), entry) for entry in entries] + records = self._service.create_or_update_many(self._identity, entries_with_id) + stream_entries_processed = [] + for op_type, record, errors in records: + if errors == []: + stream_entries_processed.append( + StreamEntry(entry=record, op_type=op_type) + ) + else: + stream_entries_processed.append( + StreamEntry(entry=record, errors=errors, op_type=op_type) + ) + + return stream_entries_processed + class YamlWriter(BaseWriter): """Writes the entries to a YAML file.""" @@ -107,7 +138,14 @@ def write(self, stream_entry, *args, **kwargs): yaml.safe_dump([stream_entry.entry], file, allow_unicode=True) return stream_entry - + + def write_many(self, stream_entries, *args, **kwargs): + with open(self._filepath, "a") as file: + yaml.safe_dump( + [stream_entry.entry for stream_entry in stream_entries], + file, + allow_unicode=True, + ) class AsyncWriter(BaseWriter): """Writes the entries asynchronously (celery task).""" @@ -117,11 +155,19 @@ def __init__(self, writer, *args, **kwargs): :param writer: writer to use. """ - self._writer = writer super().__init__(*args, **kwargs) + self._writer = writer def write(self, stream_entry, *args, **kwargs): """Launches a celery task to write an entry.""" write_entry.delay(self._writer, stream_entry.entry) return stream_entry + + def write_many(self, stream_entries, *args, **kwargs): + """Launches a celery task to write an entry.""" + write_many_entry.delay( + self._writer, [stream_entry.entry for stream_entry in stream_entries] + ) + + return stream_entries diff --git a/invenio_vocabularies/services/tasks.py b/invenio_vocabularies/services/tasks.py index 059fb8b6..629ad0dc 100644 --- a/invenio_vocabularies/services/tasks.py +++ b/invenio_vocabularies/services/tasks.py @@ -15,8 +15,14 @@ @shared_task(ignore_result=True) -def process_datastream(config): +def process_datastream(stream): """Process a datastream from config.""" + vc_config = get_vocabulary_config(stream) + config = vc_config.get_config() + + if not config: + raise ValueError("Invalid stream configuration") + ds = DataStreamFactory.create( readers_config=config["readers"], transformers_config=config.get("transformers"), diff --git a/setup.cfg b/setup.cfg index eb426a95..3ad2bfdf 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,7 @@ install_requires = invenio-administration>=2.0.0,<3.0.0 lxml>=4.5.0 PyYAML>=5.4.1 + s3fs>=2024.6.1 [options.extras_require] oaipmh = diff --git a/tests/datastreams/test_datastreams_tasks.py b/tests/datastreams/test_datastreams_tasks.py index e478a05a..01d66de7 100644 --- a/tests/datastreams/test_datastreams_tasks.py +++ b/tests/datastreams/test_datastreams_tasks.py @@ -12,7 +12,7 @@ import yaml -from invenio_vocabularies.datastreams.tasks import write_entry +from invenio_vocabularies.datastreams.tasks import write_entry, write_many_entry def test_write_entry(app): @@ -26,3 +26,15 @@ def test_write_entry(app): assert yaml.safe_load(file) == [entry] filepath.unlink() + +def test_write_many_entry(app): + filepath = "writer_test.yaml" + yaml_writer_config = {"type": "yaml", "args": {"filepath": filepath}} + entries = [{"key_one": [{"inner_one": 1}]}, {"key_two": [{"inner_two": 2}]}] + write_many_entry(yaml_writer_config, entries) + + filepath = Path(filepath) + with open(filepath) as file: + assert yaml.safe_load(file) == entries + + filepath.unlink() \ No newline at end of file From 00535e0a4c03eb7e58ee16d80d97da9a167bb134 Mon Sep 17 00:00:00 2001 From: Javier Romero Castro Date: Fri, 19 Jul 2024 15:43:13 +0200 Subject: [PATCH 4/4] datastreams: add logging --- invenio_vocabularies/cli.py | 21 ++++++- .../datastreams/datastreams.py | 9 ++- invenio_vocabularies/datastreams/tasks.py | 24 ++++++-- invenio_vocabularies/services/tasks.py | 59 ++++++++++++++----- 4 files changed, 91 insertions(+), 22 deletions(-) diff --git a/invenio_vocabularies/cli.py b/invenio_vocabularies/cli.py index b7d91f5e..cd8a7391 100644 --- a/invenio_vocabularies/cli.py +++ b/invenio_vocabularies/cli.py @@ -13,6 +13,7 @@ import click from flask.cli import with_appcontext from invenio_access.permissions import system_identity +from invenio_logging.structlog import LoggerFactory from invenio_pidstore.errors import PIDDeletedError, PIDDoesNotExistError from .datastreams import DataStreamFactory @@ -31,22 +32,38 @@ def _process_vocab(config, num_samples=None): transformers_config=config.get("transformers"), writers_config=config["writers"], ) - + cli_logger = LoggerFactory.get_logger("cli") + cli_logger.info("Starting processing") success, errored, filtered = 0, 0, 0 left = num_samples or -1 - for result in ds.process(): + batch_size = config.get("batch_size", 1000) + write_many = config.get("write_many", False) + + for result in ds.process(batch_size=batch_size, write_many=write_many): left = left - 1 if result.filtered: filtered += 1 + cli_logger.info("Filtered", entry=result.entry, operation=result.op_type) if result.errors: for err in result.errors: click.secho(err, fg="red") + cli_logger.error( + "Error", + entry=result.entry, + operation=result.op_type, + errors=result.errors, + ) errored += 1 else: success += 1 + cli_logger.info("Success", entry=result.entry, operation=result.op_type) if left == 0: click.secho(f"Number of samples reached {num_samples}", fg="green") break + cli_logger.info( + "Finished processing", success=success, errored=errored, filtered=filtered + ) + return success, errored, filtered diff --git a/invenio_vocabularies/datastreams/datastreams.py b/invenio_vocabularies/datastreams/datastreams.py index 459a9744..34000224 100644 --- a/invenio_vocabularies/datastreams/datastreams.py +++ b/invenio_vocabularies/datastreams/datastreams.py @@ -8,6 +8,8 @@ """Base data stream.""" +from invenio_logging.structlog import LoggerFactory + from .errors import ReaderError, TransformerError, WriterError @@ -65,7 +67,7 @@ def process_batch(self, batch, write_many=False): else: yield from (self.write(entry) for entry in transformed_entries) - def process(self, batch_size=100, write_many=False, *args, **kwargs): + def process(self, batch_size=100, write_many=False, logger=None, *args, **kwargs): """Iterates over the entries. Uses the reader to get the raw entries and transforms them. @@ -73,8 +75,13 @@ def process(self, batch_size=100, write_many=False, *args, **kwargs): the reader, apply the transformations and yield the result of writing it. """ + if not logger: + logger = LoggerFactory.get_logger("datastreams") batch = [] + logger.info( + f"Start reading datastream with batch_size={batch_size} and write_many={write_many}" + ) for stream_entry in self.read(): batch.append(stream_entry) if len(batch) >= batch_size: diff --git a/invenio_vocabularies/datastreams/tasks.py b/invenio_vocabularies/datastreams/tasks.py index 0f33d040..5fb890d8 100644 --- a/invenio_vocabularies/datastreams/tasks.py +++ b/invenio_vocabularies/datastreams/tasks.py @@ -9,29 +9,43 @@ """Data Streams Celery tasks.""" from celery import shared_task +from invenio_logging.structlog import LoggerFactory from ..datastreams import StreamEntry from ..datastreams.factories import WriterFactory -@shared_task(ignore_result=True) +@shared_task(ignore_result=True, logger=None) def write_entry(writer_config, entry): """Write an entry. :param writer: writer configuration as accepted by the WriterFactory. :param entry: dictionary, StreamEntry is not serializable. """ + if not logger: + logger = LoggerFactory.get_logger("write_entry") writer = WriterFactory.create(config=writer_config) - writer.write(StreamEntry(entry)) - + stream_entry_processed = writer.write(StreamEntry(entry)) + if stream_entry_processed.errors: + logger.error("Error writing entry", entry=entry, errors=stream_entry_processed.errors) + else: + logger.info("Entry written", entry=entry) @shared_task(ignore_result=True) -def write_many_entry(writer_config, entries): +def write_many_entry(writer_config, entries, logger=None): """Write many entries. :param writer: writer configuration as accepted by the WriterFactory. :param entry: lisf ot dictionaries, StreamEntry is not serializable. """ + if not logger: + logger = LoggerFactory.get_logger("write_many_entry") writer = WriterFactory.create(config=writer_config) stream_entries = [StreamEntry(entry) for entry in entries] - writer.write_many(stream_entries) + stream_entries_processed = writer.write_many(stream_entries) + errored = [entry for entry in stream_entries_processed if entry.errors] + succeeded = len(stream_entries_processed) - len(errored) + logger.info("Entries written", succeeded=succeeded) + if errored: + for entry in errored: + logger.error("Error writing entry", entry=entry.entry, errors=entry.errors) diff --git a/invenio_vocabularies/services/tasks.py b/invenio_vocabularies/services/tasks.py index 629ad0dc..7a93fc4d 100644 --- a/invenio_vocabularies/services/tasks.py +++ b/invenio_vocabularies/services/tasks.py @@ -9,6 +9,7 @@ from celery import shared_task from flask import current_app +from invenio_logging.structlog import LoggerFactory from ..datastreams.factories import DataStreamFactory from ..factories import get_vocabulary_config @@ -17,22 +18,52 @@ @shared_task(ignore_result=True) def process_datastream(stream): """Process a datastream from config.""" - vc_config = get_vocabulary_config(stream) - config = vc_config.get_config() + try: + stream_logger = LoggerFactory.get_logger("datastreams-" + stream) + stream_logger.info("Starting processing") + vc_config = get_vocabulary_config(stream) + config = vc_config.get_config() - if not config: - raise ValueError("Invalid stream configuration") + if not config: + stream_logger.error("Invalid stream configuration") + raise ValueError("Invalid stream configuration") - ds = DataStreamFactory.create( - readers_config=config["readers"], - transformers_config=config.get("transformers"), - writers_config=config["writers"], - ) - - for result in ds.process(): - if result.errors: - for err in result.errors: - current_app.logger.error(err) + ds = DataStreamFactory.create( + readers_config=config["readers"], + transformers_config=config.get("transformers"), + writers_config=config["writers"], + ) + stream_logger.info("Datastream created") + stream_logger.info("Processing Datastream") + success, errored, filtered = 0, 0, 0 + for result in ds.process( + batch_size=config.get("batch_size", 100), + write_many=config.get("write_many", False), + logger=stream_logger, + ): + if result.filtered: + filtered += 1 + stream_logger.info( + "Filtered", entry=result.entry, operation=result.op_type + ) + if result.errors: + errored += 1 + stream_logger.error( + "Error", + entry=result.entry, + operation=result.op_type, + errors=result.errors, + ) + else: + success += 1 + stream_logger.info( + "Success", entry=result.entry, operation=result.op_type + ) + stream_logger.info( + "Finished processing", success=success, errored=errored, filtered=filtered + ) + except Exception as e: + stream_logger.exception("Error processing stream", error=e) @shared_task()