diff --git a/invenio_vocabularies/cli.py b/invenio_vocabularies/cli.py index d50a9d13..aded3193 100644 --- a/invenio_vocabularies/cli.py +++ b/invenio_vocabularies/cli.py @@ -101,9 +101,14 @@ def update(vocabulary, filepath=None, origin=None): for w_conf in config["writers"]: if w_conf["type"] == "async": - w_conf["args"]["writer"]["args"]["update"] = True + w_conf_update = w_conf["args"]["writer"] else: - w_conf["args"]["update"] = True + w_conf_update = w_conf + + if "args" in w_conf_update: + w_conf_update["args"]["update"] = True + else: + w_conf_update["args"] = {"update": True} success, errored, filtered = _process_vocab(config) diff --git a/invenio_vocabularies/config.py b/invenio_vocabularies/config.py index f92880c6..e749d947 100644 --- a/invenio_vocabularies/config.py +++ b/invenio_vocabularies/config.py @@ -43,8 +43,17 @@ } """"Generic identifier schemes, usable by other vocabularies.""" + +def is_pic(val): + """Test if argument is a Participant Identification Code (PIC).""" + if len(val) != 9: + return False + return val.isdigit() + + VOCABULARIES_AFFILIATION_SCHEMES = { **VOCABULARIES_IDENTIFIER_SCHEMES, + "pic": {"label": _("PIC"), "validator": is_pic}, } """Affiliations allowed identifier schemes.""" diff --git a/invenio_vocabularies/contrib/affiliations/datastreams.py b/invenio_vocabularies/contrib/affiliations/datastreams.py index 381bce2b..dd20c278 100644 --- a/invenio_vocabularies/contrib/affiliations/datastreams.py +++ b/invenio_vocabularies/contrib/affiliations/datastreams.py @@ -10,8 +10,9 @@ """Affiliations datastreams, transformers, writers and readers.""" from flask import current_app -from invenio_i18n import lazy_gettext as _ +from ...datastreams.errors import TransformerError, WriterError +from ...datastreams.transformers import BaseTransformer from ...datastreams.writers import ServiceWriter from ..common.ror.datastreams import RORTransformer @@ -46,16 +47,78 @@ def __init__( ) +class OpenAIREOrganizationTransformer(BaseTransformer): + """OpenAIRE Organization Transformer.""" + + def apply(self, stream_entry, **kwargs): + """Applies the transformation to the stream entry.""" + record = stream_entry.entry + + if "id" not in record: + raise TransformerError([f"No id for: {record}"]) + + if not record["id"].startswith("openorgs____::"): + raise TransformerError([f"Not valid OpenAIRE OpenOrgs id for: {record}"]) + + if "pid" not in record: + raise TransformerError([f"No alternative identifiers for: {record}"]) + + organization = {} + + for pid in record["pid"]: + if pid["scheme"] == "ROR": + organization["id"] = pid["value"].removeprefix("https://ror.org/") + elif pid["scheme"] == "PIC": + organization["identifiers"] = [ + { + "scheme": "pic", + "identifier": pid["value"], + } + ] + + stream_entry.entry = organization + return stream_entry + + +class OpenAIREAffiliationsServiceWriter(ServiceWriter): + """OpenAIRE Affiliations service writer.""" + + def __init__(self, *args, **kwargs): + """Constructor.""" + service_or_name = kwargs.pop("service_or_name", "affiliations") + # Here we only update and we do not insert, since OpenAIRE data is used to augment existing affiliations + # (with PIC identifiers) and is not used to create new affiliations. + super().__init__( + service_or_name=service_or_name, insert=False, update=True, *args, **kwargs + ) + + def _entry_id(self, entry): + """Get the id from an entry.""" + return entry["id"] + + def write(self, stream_entry, *args, **kwargs): + """Writes the input entry using a given service.""" + entry = stream_entry.entry + + return super().write(stream_entry, *args, **kwargs) + + def write_many(self, stream_entries, *args, **kwargs): + """Writes the input entries using a given service.""" + return super().write_many(stream_entries, *args, **kwargs) + + VOCABULARIES_DATASTREAM_READERS = {} """Affiliations datastream readers.""" VOCABULARIES_DATASTREAM_WRITERS = { "affiliations-service": AffiliationsServiceWriter, + "openaire-affiliations-service": OpenAIREAffiliationsServiceWriter, } """Affiliations datastream writers.""" VOCABULARIES_DATASTREAM_TRANSFORMERS = { "ror-affiliations": AffiliationsRORTransformer, + "openaire-organization": OpenAIREOrganizationTransformer, } """Affiliations datastream transformers.""" @@ -90,3 +153,34 @@ def __init__( An origin is required for the reader. """ + +DATASTREAM_CONFIG_OPENAIRE = { + "readers": [ + {"type": "openaire-http", "args": {"tar_href": "/organization.tar"}}, + { + "type": "tar", + "args": { + "regex": "\\.json.gz$", + "mode": "r", + }, + }, + {"type": "gzip"}, + {"type": "jsonl"}, + ], + "transformers": [ + { + "type": "openaire-organization", + }, + ], + "writers": [ + { + "type": "async", + "args": { + "writer": { + "type": "openaire-affiliations-service", + } + }, + } + ], +} +"""Alternative Data Stream configuration for OpenAIRE Affiliations.""" diff --git a/invenio_vocabularies/contrib/awards/awards.py b/invenio_vocabularies/contrib/awards/awards.py index c111e7b1..3440c3b1 100644 --- a/invenio_vocabularies/contrib/awards/awards.py +++ b/invenio_vocabularies/contrib/awards/awards.py @@ -18,24 +18,35 @@ from invenio_records.dumpers import SearchDumper from invenio_records.dumpers.indexedat import IndexedAtDumperExt from invenio_records.dumpers.relations import RelationDumperExt -from invenio_records.systemfields import RelationsField +from invenio_records.systemfields import MultiRelationsField from invenio_records_resources.factories.factory import RecordTypeFactory -from invenio_records_resources.records.systemfields import ModelPIDField, PIDRelation +from invenio_records_resources.records.systemfields import ( + ModelPIDField, + PIDListRelation, + PIDRelation, +) from invenio_records_resources.resources.records.headers import etag_headers from ...services.permissions import PermissionPolicy from ..funders.api import Funder +from ..subjects.api import Subject from .config import AwardsSearchOptions, service_components from .schema import AwardSchema from .serializer import AwardL10NItemSchema -award_relations = RelationsField( +award_relations = MultiRelationsField( funders=PIDRelation( "funder", keys=["name"], pid_field=Funder.pid, cache_key="funder", - ) + ), + subjects=PIDListRelation( + "subjects", + keys=["subject", "scheme", "identifiers", "props"], + pid_field=Subject.pid, + cache_key="subjects", + ), ) record_type = RecordTypeFactory( diff --git a/invenio_vocabularies/contrib/awards/datastreams.py b/invenio_vocabularies/contrib/awards/datastreams.py index f4109bf0..b6197183 100644 --- a/invenio_vocabularies/contrib/awards/datastreams.py +++ b/invenio_vocabularies/contrib/awards/datastreams.py @@ -11,70 +11,19 @@ import io import requests +from flask import current_app from invenio_access.permissions import system_identity from invenio_i18n import lazy_gettext as _ -from ...datastreams.errors import ReaderError, TransformerError +from invenio_vocabularies.datastreams.errors import ReaderError + +from ...datastreams.errors import TransformerError from ...datastreams.readers import BaseReader from ...datastreams.transformers import BaseTransformer from ...datastreams.writers import ServiceWriter from .config import awards_ec_ror_id, awards_openaire_funders_mapping -class OpenAIREProjectHTTPReader(BaseReader): - """OpenAIRE Project HTTP Reader returning an in-memory binary stream of the latest OpenAIRE Graph Dataset project tar file.""" - - def _iter(self, fp, *args, **kwargs): - raise NotImplementedError( - "OpenAIREProjectHTTPReader downloads one file and therefore does not iterate through items" - ) - - def read(self, item=None, *args, **kwargs): - """Reads the latest OpenAIRE Graph Dataset project tar file from Zenodo and yields an in-memory binary stream of it.""" - if item: - raise NotImplementedError( - "OpenAIREProjectHTTPReader does not support being chained after another reader" - ) - - if self._origin == "full": - # OpenAIRE Graph Dataset - api_url = "https://zenodo.org/api/records/3516917" - elif self._origin == "diff": - # OpenAIRE Graph dataset: new collected projects - api_url = "https://zenodo.org/api/records/6419021" - else: - raise ReaderError("The --origin option should be either 'full' or 'diff'") - - # Call the signposting `linkset+json` endpoint for the Concept DOI (i.e. latest version) of the OpenAIRE Graph Dataset. - # See: https://github.com/inveniosoftware/rfcs/blob/master/rfcs/rdm-0071-signposting.md#provide-an-applicationlinksetjson-endpoint - headers = {"Accept": "application/linkset+json"} - api_resp = requests.get(api_url, headers=headers) - api_resp.raise_for_status() - - # Extract the Landing page Link Set Object located as the first (index 0) item. - landing_page_linkset = api_resp.json()["linkset"][0] - - # Extract the URL of the only project tar file linked to the record. - landing_page_project_tar_items = [ - item - for item in landing_page_linkset["item"] - if item["type"] == "application/x-tar" - and item["href"].endswith("/project.tar") - ] - if len(landing_page_project_tar_items) != 1: - raise ReaderError( - f"Expected 1 project tar item but got {len(landing_page_project_tar_items)}" - ) - file_url = landing_page_project_tar_items[0]["href"] - - # Download the project tar file and fully load the response bytes content in memory. - # The bytes content are then wrapped by a BytesIO to be file-like object (as required by `tarfile.open`). - # Using directly `file_resp.raw` is not possible since `tarfile.open` requires the file-like object to be seekable. - file_resp = requests.get(file_url) - file_resp.raise_for_status() - yield io.BytesIO(file_resp.content) - - class AwardsServiceWriter(ServiceWriter): """Funders service writer.""" @@ -125,10 +74,7 @@ def apply(self, stream_entry, **kwargs): funding = next(iter(record.get("funding", [])), None) if funding: - funding_stream_id = funding.get("funding_stream", {}).get("id", "") - # Example funding stream ID: `EC::HE::HORIZON-AG-UN`. We need the `EC` - # string, i.e. the second "part" of the identifier. - program = next(iter(funding_stream_id.split("::")[1:2]), "") + program = funding.get("fundingStream", {}).get("id", "") if program: award["program"] = program @@ -172,20 +118,170 @@ def apply(self, stream_entry, **kwargs): return stream_entry +class CORDISProjectHTTPReader(BaseReader): + """CORDIS Project HTTP Reader returning an in-memory binary stream of the latest CORDIS Horizon Europe project zip file.""" + + def _iter(self, fp, *args, **kwargs): + raise NotImplementedError( + "CORDISProjectHTTPReader downloads one file and therefore does not iterate through items" + ) + + def read(self, item=None, *args, **kwargs): + """Reads the latest CORDIS Horizon Europe project zip file and yields an in-memory binary stream of it.""" + if item: + raise NotImplementedError( + "CORDISProjectHTTPReader does not support being chained after another reader" + ) + + if self._origin == "HE": + file_url = "https://cordis.europa.eu/data/cordis-HORIZONprojects-xml.zip" + elif self._origin == "H2020": + file_url = "https://cordis.europa.eu/data/cordis-h2020projects-xml.zip" + elif self._origin == "FP7": + file_url = "https://cordis.europa.eu/data/cordis-fp7projects-xml.zip" + else: + raise ReaderError( + "The --origin option should be either 'HE' (for Horizon Europe) or 'H2020' (for Horizon 2020) or 'FP7'" + ) + + # Download the ZIP file and fully load the response bytes content in memory. + # The bytes content are then wrapped by a BytesIO to be file-like object (as required by `zipfile.ZipFile`). + # Using directly `file_resp.raw` is not possible since `zipfile.ZipFile` requires the file-like object to be seekable. + file_resp = requests.get(file_url) + file_resp.raise_for_status() + yield io.BytesIO(file_resp.content) + + +class CORDISProjectTransformer(BaseTransformer): + """Transforms a CORDIS project record into an award record.""" + + def apply(self, stream_entry, **kwargs): + """Applies the transformation to the stream entry.""" + record = stream_entry.entry + award = {} + + # Here `id` is the project ID, which will be used to attach the update to the existing project. + award["id"] = ( + f"{current_app.config['VOCABULARIES_AWARDS_EC_ROR_ID']}::{record['id']}" + ) + + categories = record.get("relations", {}).get("categories", {}).get("category") + if categories: + if isinstance(categories, dict): + categories = [categories] + + award["subjects"] = [ + {"id": f"euroscivoc:{vocab_id}"} + for category in categories + if category.get("@classification") == "euroSciVoc" + and (vocab_id := category["code"].split("/")[-1]).isdigit() + ] + + organizations = ( + record.get("relations", {}).get("associations", {}).get("organization") + ) + if organizations: + # Projects with a single organization are not wrapped in a list, + # so we do this here to be able to iterate over it. + organizations = ( + organizations if isinstance(organizations, list) else [organizations] + ) + award["organizations"] = [] + for organization in organizations: + # Some organizations in FP7 projects do not have a "legalname" key, + # for instance the 14th participant in "SAGE" https://cordis.europa.eu/project/id/999902. + # In this case, fully skip the organization entry. + if "legalname" not in organization: + continue + + organization_data = { + "organization": organization["legalname"], + } + + # Some organizations in FP7 projects do not have an "id" key (the PIC identifier), + # for instance "AIlGreenVehicles" in "MOTORBRAIN" https://cordis.europa.eu/project/id/270693. + # In this case, still store the name but skip the identifier part. + if "id" in organization: + organization_data.update( + { + "scheme": "pic", + "id": organization["id"], + } + ) + + award["organizations"].append(organization_data) + + stream_entry.entry = award + return stream_entry + + +class CORDISAwardsServiceWriter(ServiceWriter): + """CORDIS Awards service writer.""" + + def __init__(self, *args, **kwargs): + """Constructor.""" + service_or_name = kwargs.pop("service_or_name", "awards") + # Here we only update and we do not insert, since CORDIS data is used to augment existing awards + # (with subjects and organizations information) and is not used to create new awards. + super().__init__( + service_or_name=service_or_name, insert=False, update=True, *args, **kwargs + ) + + def _entry_id(self, entry): + """Get the id from an entry.""" + return entry["id"] + + VOCABULARIES_DATASTREAM_READERS = { - "openaire-project-http": OpenAIREProjectHTTPReader, + "cordis-project-http": CORDISProjectHTTPReader, } VOCABULARIES_DATASTREAM_TRANSFORMERS = { "openaire-award": OpenAIREProjectTransformer, + "cordis-award": CORDISProjectTransformer, } """ORCiD Data Streams transformers.""" VOCABULARIES_DATASTREAM_WRITERS = { "awards-service": AwardsServiceWriter, + "cordis-awards-service": CORDISAwardsServiceWriter, } """ORCiD Data Streams transformers.""" +DATASTREAM_CONFIG_CORDIS = { + "readers": [ + {"type": "cordis-project-http"}, + { + "type": "zip", + "args": { + "regex": "\\.xml$", + "mode": "r", + }, + }, + { + "type": "xml", + "args": { + "root_element": "project", + }, + }, + ], + "transformers": [ + {"type": "cordis-award"}, + ], + "writers": [ + { + "type": "cordis-awards-service", + "args": { + "identity": system_identity, + }, + } + ], +} +"""Data Stream configuration. + +An origin is required for the reader. +""" + DATASTREAM_CONFIG = { "readers": [ { diff --git a/invenio_vocabularies/contrib/awards/jsonschemas/awards/award-v1.0.0.json b/invenio_vocabularies/contrib/awards/jsonschemas/awards/award-v1.0.0.json index bdc2cfa1..ef0610f0 100644 --- a/invenio_vocabularies/contrib/awards/jsonschemas/awards/award-v1.0.0.json +++ b/invenio_vocabularies/contrib/awards/jsonschemas/awards/award-v1.0.0.json @@ -42,6 +42,61 @@ }, "program": { "type": "string" + }, + "subjects": { + "description": "Award's subjects.", + "type": "array", + "properties": { + "id": { + "$ref": "local://definitions-v1.0.0.json#/identifier" + }, + "scheme": { + "description": "Identifier of the subject scheme.", + "$ref": "local://definitions-v1.0.0.json#/identifier" + }, + "subject": { + "description": "Human readable label.", + "type": "string" + }, + "props": { + "type": "object", + "patternProperties": { + "^.*$": { + "type": "string" + } + } + }, + "identifiers": { + "description": "Alternate identifiers for the subject.", + "type": "array", + "items": { + "$ref": "local://definitions-v2.0.0.json#/identifiers_with_scheme" + }, + "uniqueItems": true + } + } + }, + "organizations": { + "description": "Award's organizations.", + "type": "array", + "items": { + "type": "object", + "additionalProperties": false, + "properties": { + "scheme": { + "description": "Identifier of the organization scheme.", + "$ref": "local://definitions-v1.0.0.json#/identifier" + }, + "id": { + "description": "Identifier of the organization for the given scheme.", + "$ref": "local://definitions-v1.0.0.json#/identifier" + }, + "organization": { + "description": "Human readable label.", + "type": "string" + } + } + } } } } diff --git a/invenio_vocabularies/contrib/awards/mappings/os-v1/awards/award-v1.0.0.json b/invenio_vocabularies/contrib/awards/mappings/os-v1/awards/award-v1.0.0.json index 90ec8049..0063b057 100644 --- a/invenio_vocabularies/contrib/awards/mappings/os-v1/awards/award-v1.0.0.json +++ b/invenio_vocabularies/contrib/awards/mappings/os-v1/awards/award-v1.0.0.json @@ -58,12 +58,55 @@ "acronym": { "type": "keyword", "fields": { - "text": { "type": "text"} + "text": { "type": "text" } } }, "program": { "type": "keyword" }, + "subjects": { + "properties": { + "@v": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "props": { + "type": "object", + "dynamic": "true" + }, + "subject": { + "type": "keyword" + }, + "scheme": { + "type": "keyword" + }, + "identifiers": { + "properties": { + "identifier": { + "type": "keyword" + }, + "scheme": { + "type": "keyword" + } + } + } + } + }, + "organizations": { + "properties": { + "scheme": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "organization": { + "type": "keyword" + } + } + }, "funder": { "type": "object", "properties": { diff --git a/invenio_vocabularies/contrib/awards/mappings/os-v2/awards/award-v1.0.0.json b/invenio_vocabularies/contrib/awards/mappings/os-v2/awards/award-v1.0.0.json index 90ec8049..0063b057 100644 --- a/invenio_vocabularies/contrib/awards/mappings/os-v2/awards/award-v1.0.0.json +++ b/invenio_vocabularies/contrib/awards/mappings/os-v2/awards/award-v1.0.0.json @@ -58,12 +58,55 @@ "acronym": { "type": "keyword", "fields": { - "text": { "type": "text"} + "text": { "type": "text" } } }, "program": { "type": "keyword" }, + "subjects": { + "properties": { + "@v": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "props": { + "type": "object", + "dynamic": "true" + }, + "subject": { + "type": "keyword" + }, + "scheme": { + "type": "keyword" + }, + "identifiers": { + "properties": { + "identifier": { + "type": "keyword" + }, + "scheme": { + "type": "keyword" + } + } + } + } + }, + "organizations": { + "properties": { + "scheme": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "organization": { + "type": "keyword" + } + } + }, "funder": { "type": "object", "properties": { diff --git a/invenio_vocabularies/contrib/awards/mappings/v7/awards/award-v1.0.0.json b/invenio_vocabularies/contrib/awards/mappings/v7/awards/award-v1.0.0.json index 90ec8049..0063b057 100644 --- a/invenio_vocabularies/contrib/awards/mappings/v7/awards/award-v1.0.0.json +++ b/invenio_vocabularies/contrib/awards/mappings/v7/awards/award-v1.0.0.json @@ -58,12 +58,55 @@ "acronym": { "type": "keyword", "fields": { - "text": { "type": "text"} + "text": { "type": "text" } } }, "program": { "type": "keyword" }, + "subjects": { + "properties": { + "@v": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "props": { + "type": "object", + "dynamic": "true" + }, + "subject": { + "type": "keyword" + }, + "scheme": { + "type": "keyword" + }, + "identifiers": { + "properties": { + "identifier": { + "type": "keyword" + }, + "scheme": { + "type": "keyword" + } + } + } + } + }, + "organizations": { + "properties": { + "scheme": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "organization": { + "type": "keyword" + } + } + }, "funder": { "type": "object", "properties": { diff --git a/invenio_vocabularies/contrib/awards/schema.py b/invenio_vocabularies/contrib/awards/schema.py index 487bafaf..9eb77fca 100644 --- a/invenio_vocabularies/contrib/awards/schema.py +++ b/invenio_vocabularies/contrib/awards/schema.py @@ -17,13 +17,24 @@ from ...services.schema import ( BaseVocabularySchema, + ContribVocabularyRelationSchema, ModePIDFieldVocabularyMixin, i18n_strings, ) from ..funders.schema import FunderRelationSchema +from ..subjects.schema import SubjectRelationSchema from .config import award_schemes +class AwardOrganizationRelationSchema(ContribVocabularyRelationSchema): + """Schema to define an organization relation in an award.""" + + ftf_name = "organization" + parent_field_name = "organizations" + organization = SanitizedUnicode() + scheme = SanitizedUnicode() + + class AwardSchema(BaseVocabularySchema, ModePIDFieldVocabularyMixin): """Award schema.""" @@ -46,6 +57,10 @@ class AwardSchema(BaseVocabularySchema, ModePIDFieldVocabularyMixin): program = SanitizedUnicode() + subjects = fields.List(fields.Nested(SubjectRelationSchema)) + + organizations = fields.List(fields.Nested(AwardOrganizationRelationSchema)) + id = SanitizedUnicode( validate=validate.Length(min=1, error=_("PID cannot be blank.")) ) diff --git a/invenio_vocabularies/contrib/awards/serializer.py b/invenio_vocabularies/contrib/awards/serializer.py index 32aa1188..4b7704d7 100644 --- a/invenio_vocabularies/contrib/awards/serializer.py +++ b/invenio_vocabularies/contrib/awards/serializer.py @@ -12,6 +12,9 @@ from invenio_vocabularies.resources import L10NString +from ..subjects.schema import SubjectRelationSchema +from .schema import AwardOrganizationRelationSchema + class IdentifierSchema(Schema): """Identifier scheme.""" @@ -37,4 +40,8 @@ class AwardL10NItemSchema(Schema): acronym = fields.String(dump_only=True) program = fields.String(dump_only=True) funder = fields.Nested(FunderRelationSchema, dump_only=True) + subjects = fields.List(fields.Nested(SubjectRelationSchema), dump_only=True) identifiers = fields.List(fields.Nested(IdentifierSchema), dump_only=True) + organizations = fields.List( + fields.Nested(AwardOrganizationRelationSchema), dump_only=True + ) diff --git a/invenio_vocabularies/contrib/common/openaire/__init__.py b/invenio_vocabularies/contrib/common/openaire/__init__.py new file mode 100644 index 00000000..a1cf1934 --- /dev/null +++ b/invenio_vocabularies/contrib/common/openaire/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""OpenAIRE-related module.""" diff --git a/invenio_vocabularies/contrib/common/openaire/datastreams.py b/invenio_vocabularies/contrib/common/openaire/datastreams.py new file mode 100644 index 00000000..95dba01c --- /dev/null +++ b/invenio_vocabularies/contrib/common/openaire/datastreams.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""OpenAIRE-related Datastreams Readers/Writers/Transformers module.""" + +import io + +import requests + +from invenio_vocabularies.datastreams.errors import ReaderError +from invenio_vocabularies.datastreams.readers import BaseReader + + +class OpenAIREHTTPReader(BaseReader): + """OpenAIRE HTTP Reader returning an in-memory binary stream of the latest OpenAIRE Graph Dataset tar file of a given type.""" + + def __init__(self, origin=None, mode="r", tar_href=None, *args, **kwargs): + """Constructor.""" + self.tar_href = tar_href + super().__init__(origin, mode, *args, **kwargs) + + def _iter(self, fp, *args, **kwargs): + raise NotImplementedError( + "OpenAIREHTTPReader downloads one file and therefore does not iterate through items" + ) + + def read(self, item=None, *args, **kwargs): + """Reads the latest OpenAIRE Graph Dataset tar file of a given type from Zenodo and yields an in-memory binary stream of it.""" + if item: + raise NotImplementedError( + "OpenAIREHTTPReader does not support being chained after another reader" + ) + + if self._origin == "full": + # OpenAIRE Graph Dataset + api_url = "https://zenodo.org/api/records/3516917" + elif self._origin == "diff": + # OpenAIRE Graph dataset: new collected projects + api_url = "https://zenodo.org/api/records/6419021" + else: + raise ReaderError("The --origin option should be either 'full' or 'diff'") + + # Call the signposting `linkset+json` endpoint for the Concept DOI (i.e. latest version) of the OpenAIRE Graph Dataset. + # See: https://github.com/inveniosoftware/rfcs/blob/master/rfcs/rdm-0071-signposting.md#provide-an-applicationlinksetjson-endpoint + headers = {"Accept": "application/linkset+json"} + api_resp = requests.get(api_url, headers=headers) + api_resp.raise_for_status() + + # Extract the Landing page Link Set Object located as the first (index 0) item. + landing_page_linkset = api_resp.json()["linkset"][0] + + # Extract the URL of the only tar file matching `tar_href` linked to the record. + landing_page_matching_tar_items = [ + item + for item in landing_page_linkset["item"] + if item["type"] == "application/x-tar" + and item["href"].endswith(self.tar_href) + ] + if len(landing_page_matching_tar_items) != 1: + raise ReaderError( + f"Expected 1 tar item matching {self.tar_href} but got {len(landing_page_matching_tar_items)}" + ) + file_url = landing_page_matching_tar_items[0]["href"] + + # Download the matching tar file and fully load the response bytes content in memory. + # The bytes content are then wrapped by a BytesIO to be file-like object (as required by `tarfile.open`). + # Using directly `file_resp.raw` is not possible since `tarfile.open` requires the file-like object to be seekable. + file_resp = requests.get(file_url) + file_resp.raise_for_status() + yield io.BytesIO(file_resp.content) + + +VOCABULARIES_DATASTREAM_READERS = { + "openaire-http": OpenAIREHTTPReader, +} + +VOCABULARIES_DATASTREAM_TRANSFORMERS = {} + +VOCABULARIES_DATASTREAM_WRITERS = {} diff --git a/invenio_vocabularies/contrib/names/datastreams.py b/invenio_vocabularies/contrib/names/datastreams.py index d682526b..9929d173 100644 --- a/invenio_vocabularies/contrib/names/datastreams.py +++ b/invenio_vocabularies/contrib/names/datastreams.py @@ -243,7 +243,12 @@ def _entry_id(self, entry): "regex": "\\.xml$", }, }, - {"type": "xml"}, + { + "type": "xml", + "args": { + "root_element": "record", + }, + }, ], "transformers": [{"type": "orcid"}], "writers": [ @@ -266,7 +271,12 @@ def _entry_id(self, entry): { "type": "orcid-data-sync", }, - {"type": "xml"}, + { + "type": "xml", + "args": { + "root_element": "record", + }, + }, ], "transformers": [{"type": "orcid"}], "writers": [ diff --git a/invenio_vocabularies/contrib/subjects/euroscivoc/datastreams.py b/invenio_vocabularies/contrib/subjects/euroscivoc/datastreams.py index 64aa59e6..6bab5bb3 100644 --- a/invenio_vocabularies/contrib/subjects/euroscivoc/datastreams.py +++ b/invenio_vocabularies/contrib/subjects/euroscivoc/datastreams.py @@ -85,7 +85,7 @@ def _get_notation(self, subject, rdf_graph): def _get_labels(self, subject, rdf_graph): """Extract prefLabel and altLabel languages for a subject.""" labels = { - label.language: label.value + label.language: label.value.capitalize() for _, _, label in rdf_graph.triples( (subject, self.SKOS_CORE.prefLabel, None) ) @@ -94,7 +94,7 @@ def _get_labels(self, subject, rdf_graph): for _, _, label in rdf_graph.triples( (subject, self.SKOS_CORE.altLabel, None) ): - labels.setdefault(label.language, label.value) + labels.setdefault(label.language, label.value.capitalize()) return labels def _find_parents(self, subject, rdf_graph): diff --git a/invenio_vocabularies/datastreams/readers.py b/invenio_vocabularies/datastreams/readers.py index 4f017eb0..0f817bf3 100644 --- a/invenio_vocabularies/datastreams/readers.py +++ b/invenio_vocabularies/datastreams/readers.py @@ -224,19 +224,30 @@ def _iter(self, fp, *args, **kwargs): class XMLReader(BaseReader): """XML reader.""" + def __init__(self, root_element=None, *args, **kwargs): + """Constructor.""" + self.root_element = root_element + super().__init__(*args, **kwargs) + def _iter(self, fp, *args, **kwargs): """Read and parse an XML file to dict.""" # NOTE: We parse HTML, to skip XML validation and strip XML namespaces record = None try: xml_tree = fromstring(fp) - record = etree_to_dict(xml_tree).get("record") + xml_dict = etree_to_dict(xml_tree) except Exception as e: xml_tree = html_parse(fp).getroot() - record = etree_to_dict(xml_tree)["html"]["body"].get("record") + xml_dict = etree_to_dict(xml_tree)["html"]["body"] - if not record: - raise ReaderError(f"Record not found in XML entry.") + if self.root_element: + record = xml_dict.get(self.root_element) + if not record: + raise ReaderError( + f"Root element '{self.root_element}' not found in XML entry." + ) + else: + record = xml_dict yield record diff --git a/invenio_vocabularies/datastreams/transformers.py b/invenio_vocabularies/datastreams/transformers.py index d4274a68..822cb91a 100644 --- a/invenio_vocabularies/datastreams/transformers.py +++ b/invenio_vocabularies/datastreams/transformers.py @@ -32,6 +32,11 @@ def apply(self, stream_entry, *args, **kwargs): class XMLTransformer(BaseTransformer): """XML transformer.""" + def __init__(self, root_element=None, *args, **kwargs): + """Initializes the transformer.""" + self.root_element = root_element + super().__init__(*args, **kwargs) + @classmethod def _xml_to_etree(cls, xml): """Converts XML to a lxml etree.""" @@ -43,10 +48,16 @@ def apply(self, stream_entry, **kwargs): Requires the root element to be named "record". """ xml_tree = self._xml_to_etree(stream_entry.entry) - record = etree_to_dict(xml_tree)["html"]["body"].get("record") - - if not record: - raise TransformerError(f"Record not found in XML entry.") + xml_dict = etree_to_dict(xml_tree)["html"]["body"] + + if self.root_element: + record = xml_dict.get(self.root_element) + if not record: + raise TransformerError( + f"Root element '{self.root_element}' not found in XML entry." + ) + else: + record = xml_dict stream_entry.entry = record return stream_entry diff --git a/invenio_vocabularies/datastreams/writers.py b/invenio_vocabularies/datastreams/writers.py index 84084599..6c70facc 100644 --- a/invenio_vocabularies/datastreams/writers.py +++ b/invenio_vocabularies/datastreams/writers.py @@ -13,10 +13,11 @@ import yaml from invenio_access.permissions import system_identity -from invenio_pidstore.errors import PIDAlreadyExists +from invenio_pidstore.errors import PIDAlreadyExists, PIDDoesNotExistError from invenio_records.systemfields.relations.errors import InvalidRelationValue from invenio_records_resources.proxies import current_service_registry from marshmallow import ValidationError +from sqlalchemy.exc import NoResultFound from .datastreams import StreamEntry from .errors import WriterError @@ -55,12 +56,15 @@ def write_many(self, stream_entries, *args, **kwargs): class ServiceWriter(BaseWriter): """Writes the entries to an RDM instance using a Service object.""" - def __init__(self, service_or_name, *args, identity=None, update=False, **kwargs): + def __init__( + self, service_or_name, *args, identity=None, insert=True, update=False, **kwargs + ): """Constructor. :param service_or_name: a service instance or a key of the service registry. :param identity: access identity. + :param insert: if True it will insert records which do not exist. :param update: if True it will update records if they exist. """ if isinstance(service_or_name, str): @@ -68,6 +72,7 @@ def __init__(self, service_or_name, *args, identity=None, update=False, **kwargs self._service = service_or_name self._identity = identity or system_identity + self._insert = insert self._update = update super().__init__(*args, **kwargs) @@ -79,20 +84,47 @@ def _entry_id(self, entry): def _resolve(self, id_): return self._service.read(self._identity, id_) + def _do_update(self, entry): + vocab_id = self._entry_id(entry) + current = self._resolve(vocab_id) + combined_dict = current.to_dict() + + # Update fields from entry + for key, value in entry.items(): + if key in combined_dict: + if isinstance(combined_dict[key], list) and isinstance(value, list): + combined_dict[key].extend( + item for item in value if item not in combined_dict[key] + ) + else: + combined_dict[key] = value + else: + combined_dict[key] = value + + return StreamEntry( + self._service.update(self._identity, vocab_id, combined_dict) + ) + def write(self, stream_entry, *args, **kwargs): """Writes the input entry using a given service.""" entry = stream_entry.entry + try: - try: - return StreamEntry(self._service.create(self._identity, entry)) - except PIDAlreadyExists: - if not self._update: - raise WriterError([f"Vocabulary entry already exists: {entry}"]) - vocab_id = self._entry_id(entry) - current = self._resolve(vocab_id) - updated = dict(current.to_dict(), **entry) - return StreamEntry( - self._service.update(self._identity, vocab_id, updated) + if self._insert: + try: + return StreamEntry(self._service.create(self._identity, entry)) + except PIDAlreadyExists: + if not self._update: + raise WriterError([f"Vocabulary entry already exists: {entry}"]) + return self._do_update(entry) + elif self._update: + try: + return self._do_update(entry) + except (NoResultFound, PIDDoesNotExistError): + raise WriterError([f"Vocabulary entry does not exist: {entry}"]) + else: + raise WriterError( + ["Writer wrongly configured to not insert and to not update"] ) except ValidationError as err: diff --git a/invenio_vocabularies/factories.py b/invenio_vocabularies/factories.py index 11135b1a..de5a2f8f 100644 --- a/invenio_vocabularies/factories.py +++ b/invenio_vocabularies/factories.py @@ -16,7 +16,13 @@ from .contrib.affiliations.datastreams import ( DATASTREAM_CONFIG as affiliations_ds_config, ) +from .contrib.affiliations.datastreams import ( + DATASTREAM_CONFIG_OPENAIRE as affiliations_openaire_ds_config, +) from .contrib.awards.datastreams import DATASTREAM_CONFIG as awards_ds_config +from .contrib.awards.datastreams import ( + DATASTREAM_CONFIG_CORDIS as awards_cordis_ds_config, +) from .contrib.funders.datastreams import DATASTREAM_CONFIG as funders_ds_config from .contrib.names.datastreams import DATASTREAM_CONFIG as names_ds_config from .contrib.subjects.datastreams import DATASTREAM_CONFIG as subjects_ds_config @@ -84,6 +90,17 @@ def get_service(self): raise NotImplementedError("Service not implemented for Awards") +class AwardsCordisVocabularyConfig(VocabularyConfig): + """Awards Vocabulary Config.""" + + config = awards_cordis_ds_config + vocabulary_name = "awards:cordis" + + def get_service(self): + """Get the service for the vocabulary.""" + raise NotImplementedError("Service not implemented for CORDIS Awards") + + class AffiliationsVocabularyConfig(VocabularyConfig): """Affiliations Vocabulary Config.""" @@ -95,13 +112,26 @@ def get_service(self): raise NotImplementedError("Service not implemented for Affiliations") +class AffiliationsOpenAIREVocabularyConfig(VocabularyConfig): + """OpenAIRE Affiliations Vocabulary Config.""" + + config = affiliations_openaire_ds_config + vocabulary_name = "affiliations:openaire" + + def get_service(self): + """Get the service for the vocabulary.""" + raise NotImplementedError("Service not implemented for OpenAIRE Affiliations") + + def get_vocabulary_config(vocabulary): """Factory function to get the appropriate Vocabulary Config.""" vocab_config = { "names": NamesVocabularyConfig, "funders": FundersVocabularyConfig, "awards": AwardsVocabularyConfig, + "awards:cordis": AwardsCordisVocabularyConfig, "affiliations": AffiliationsVocabularyConfig, + "affiliations:openaire": AffiliationsOpenAIREVocabularyConfig, "subjects": SubjectsVocabularyConfig, } return vocab_config.get(vocabulary, VocabularyConfig)() diff --git a/tests/contrib/affiliations/conftest.py b/tests/contrib/affiliations/conftest.py index c428e297..5a3392e8 100644 --- a/tests/contrib/affiliations/conftest.py +++ b/tests/contrib/affiliations/conftest.py @@ -33,6 +33,32 @@ def affiliation_full_data(): } +@pytest.fixture(scope="function") +def affiliation_openaire_data(): + """Full affiliation data.""" + return { + "acronym": "CERN", + "id": "01ggx4157", + "identifiers": [{"identifier": "999988133", "scheme": "pic"}], + "name": "Test affiliation", + "title": {"en": "Test affiliation", "es": "Afiliacion de test"}, + "country": "CH", + "country_name": "Switzerland", + "location_name": "Geneva", + "status": "active", + "types": ["facility", "funder"], + } + + +@pytest.fixture(scope="function") +def openaire_affiliation_full_data(): + """Full OpenAIRE affiliation data.""" + return { + "id": "01ggx4157", + "identifiers": [{"identifier": "999988133", "scheme": "pic"}], + } + + @pytest.fixture(scope="module") def service(): """Affiliations service object.""" diff --git a/tests/contrib/affiliations/test_affiliations_datastreams.py b/tests/contrib/affiliations/test_affiliations_datastreams.py index 243b540d..7c8e1df0 100644 --- a/tests/contrib/affiliations/test_affiliations_datastreams.py +++ b/tests/contrib/affiliations/test_affiliations_datastreams.py @@ -18,10 +18,12 @@ from invenio_vocabularies.contrib.affiliations.config import affiliation_schemes from invenio_vocabularies.contrib.affiliations.datastreams import ( AffiliationsServiceWriter, + OpenAIREAffiliationsServiceWriter, + OpenAIREOrganizationTransformer, ) from invenio_vocabularies.contrib.common.ror.datastreams import RORTransformer from invenio_vocabularies.datastreams import StreamEntry -from invenio_vocabularies.datastreams.errors import WriterError +from invenio_vocabularies.datastreams.errors import TransformerError, WriterError @pytest.fixture(scope="module") @@ -49,11 +51,13 @@ def expected_from_ror_json(): def test_ror_transformer(app, dict_ror_entry, expected_from_ror_json): + """Test RORTransformer to ensure it transforms ROR entries correctly.""" transformer = RORTransformer(vocab_schemes=affiliation_schemes) assert expected_from_ror_json == transformer.apply(dict_ror_entry).entry def test_affiliations_service_writer_create(app, search_clear, affiliation_full_data): + """Test AffiliationsServiceWriter for creating a new affiliation.""" writer = AffiliationsServiceWriter() affiliation_rec = writer.write(StreamEntry(affiliation_full_data)) affiliation_dict = affiliation_rec.entry.to_dict() @@ -66,6 +70,7 @@ def test_affiliations_service_writer_create(app, search_clear, affiliation_full_ def test_affiliations_service_writer_duplicate( app, search_clear, affiliation_full_data ): + """Test AffiliationsServiceWriter for handling duplicate entries.""" writer = AffiliationsServiceWriter() affiliation_rec = writer.write(stream_entry=StreamEntry(affiliation_full_data)) Affiliation.index.refresh() # refresh index to make changes live @@ -82,6 +87,7 @@ def test_affiliations_service_writer_duplicate( def test_affiliations_service_writer_update_existing( app, search_clear, affiliation_full_data, service ): + """Test updating an existing affiliation using AffiliationsServiceWriter.""" # create vocabulary writer = AffiliationsServiceWriter(update=True) orig_affiliation_rec = writer.write(stream_entry=StreamEntry(affiliation_full_data)) @@ -89,9 +95,11 @@ def test_affiliations_service_writer_update_existing( # update vocabulary updated_affiliation = deepcopy(affiliation_full_data) updated_affiliation["name"] = "Updated Name" + # check changes vocabulary _ = writer.write(stream_entry=StreamEntry(updated_affiliation)) affiliation_rec = service.read(system_identity, orig_affiliation_rec.entry.id) + affiliation_dict = affiliation_rec.to_dict() # needed while the writer resolves from ES @@ -105,6 +113,7 @@ def test_affiliations_service_writer_update_existing( def test_affiliations_service_writer_update_non_existing( app, search_clear, affiliation_full_data, service ): + """Test creating a new affiliation when updating a non-existing entry.""" # vocabulary item not created, call update directly updated_affiliation = deepcopy(affiliation_full_data) updated_affiliation["name"] = "New name" @@ -118,3 +127,141 @@ def test_affiliations_service_writer_update_non_existing( # not-ideal cleanup affiliation_rec._record.delete(force=True) + + +@pytest.fixture() +def dict_openaire_organization_entry(): + """An example entry from OpenAIRE organization Data Dump.""" + return StreamEntry( + { + "alternativeNames": [ + "European Organization for Nuclear Research", + "Organisation européenne pour la recherche nucléaire", + "CERN", + ], + "country": {"code": "CH", "label": "Switzerland"}, + "id": "openorgs____::47efb6602225236c0b207761a8b3a21c", + "legalName": "European Organization for Nuclear Research", + "legalShortName": "CERN", + "pid": [ + {"scheme": "mag_id", "value": "67311998"}, + {"scheme": "ISNI", "value": "000000012156142X"}, + {"scheme": "Wikidata", "value": "Q42944"}, + {"scheme": "ROR", "value": "https://ror.org/01ggx4157"}, + {"scheme": "PIC", "value": "999988133"}, + {"scheme": "OrgReg", "value": "INT1011"}, + {"scheme": "ISNI", "value": "000000012156142X"}, + {"scheme": "FundRef", "value": "100012470"}, + {"scheme": "GRID", "value": "grid.9132.9"}, + {"scheme": "OrgRef", "value": "37351"}, + ], + "websiteUrl": "http://home.web.cern.ch/", + } + ) + + +@pytest.fixture(scope="module") +def expected_from_openaire_json(): + return { + "id": "01ggx4157", + "identifiers": [{"identifier": "999988133", "scheme": "pic"}], + } + + +def test_openaire_organization_transformer( + app, dict_openaire_organization_entry, expected_from_openaire_json +): + """Test OpenAIREOrganizationTransformer for transforming OpenAIRE entries.""" + transformer = OpenAIREOrganizationTransformer() + assert ( + expected_from_openaire_json + == transformer.apply(dict_openaire_organization_entry).entry + ) + + +def test_openaire_affiliations_service_writer( + app, + search_clear, + affiliation_full_data, + openaire_affiliation_full_data, + service, +): + """Test writing and updating an OpenAIRE affiliation entry.""" + # create vocabulary with original service writer + orig_writer = AffiliationsServiceWriter() + + orig_affiliation_rec = orig_writer.write(StreamEntry(affiliation_full_data)) + + orig_affiliation_dict = orig_affiliation_rec.entry.to_dict() + Affiliation.index.refresh() # refresh index to make changes live + + # update vocabulary and check changes vocabulary with OpenAIRE service writer + writer = OpenAIREAffiliationsServiceWriter() + + _ = writer.write(StreamEntry(openaire_affiliation_full_data)) + Affiliation.index.refresh() # refresh index to make changes live + affiliation_rec = service.read(system_identity, orig_affiliation_rec.entry.id) + affiliation_dict = affiliation_rec.to_dict() + + assert _.entry.id == orig_affiliation_rec.entry.id + + # updating fields changing from one update to the other + orig_affiliation_dict["revision_id"] = affiliation_dict["revision_id"] + orig_affiliation_dict["updated"] = affiliation_dict["updated"] + # Adding the extra identifier coming from OpenAIRE + orig_affiliation_dict["identifiers"].extend( + openaire_affiliation_full_data["identifiers"] + ) + + assert dict(orig_affiliation_dict) == affiliation_dict + + # not-ideal cleanup + affiliation_rec._record.delete(force=True) + + +def test_openaire_affiliations_transformer_non_openorgs( + app, dict_openaire_organization_entry +): + """Test error handling for non-OpenOrgs ID in OpenAIRE transformer.""" + transformer = OpenAIREOrganizationTransformer() + + updated_organization_entry = deepcopy(dict_openaire_organization_entry.entry) + updated_organization_entry["id"] = "pending_org_::627931d047132a4e20dbc4a882eb9a35" + + with pytest.raises(TransformerError) as err: + transformer.apply(StreamEntry(updated_organization_entry)) + + expected_error = [ + f"Not valid OpenAIRE OpenOrgs id for: {updated_organization_entry}" + ] + assert expected_error in err.value.args + + +def test_openaire_affiliations_transformer_no_id(app, dict_openaire_organization_entry): + """Test error handling when missing ID in OpenAIRE transformer.""" + transformer = OpenAIREOrganizationTransformer() + + updated_organization_entry = deepcopy(dict_openaire_organization_entry.entry) + updated_organization_entry.pop("id", None) + + with pytest.raises(TransformerError) as err: + transformer.apply(StreamEntry(updated_organization_entry)) + + expected_error = [f"No id for: {updated_organization_entry}"] + assert expected_error in err.value.args + + +def test_openaire_affiliations_transformer_no_alternative_identifiers( + app, dict_openaire_organization_entry +): + """Test error handling when missing alternative identifiers in OpenAIRE transformer.""" + transformer = OpenAIREOrganizationTransformer() + + updated_organization_entry = deepcopy(dict_openaire_organization_entry.entry) + updated_organization_entry.pop("pid", None) + + with pytest.raises(TransformerError) as err: + transformer.apply(StreamEntry(updated_organization_entry)) + + expected_error = [f"No alternative identifiers for: {updated_organization_entry}"] + assert expected_error in err.value.args diff --git a/tests/contrib/awards/test_awards_datastreams.py b/tests/contrib/awards/test_awards_datastreams.py index b6555de9..182811ce 100644 --- a/tests/contrib/awards/test_awards_datastreams.py +++ b/tests/contrib/awards/test_awards_datastreams.py @@ -8,9 +8,7 @@ """Awards datastreams tests.""" -import io from copy import deepcopy -from unittest.mock import patch import pytest from invenio_access.permissions import system_identity @@ -18,11 +16,12 @@ from invenio_vocabularies.contrib.awards.api import Award from invenio_vocabularies.contrib.awards.datastreams import ( AwardsServiceWriter, - OpenAIREProjectHTTPReader, + CORDISProjectTransformer, OpenAIREProjectTransformer, ) from invenio_vocabularies.datastreams import StreamEntry -from invenio_vocabularies.datastreams.errors import ReaderError, WriterError +from invenio_vocabularies.datastreams.errors import WriterError +from invenio_vocabularies.datastreams.readers import XMLReader @pytest.fixture(scope="function") @@ -34,7 +33,7 @@ def dict_award_entry(): "enddate": "2010-09-30", "funding": [ { - "funding_stream": { + "fundingStream": { "description": "Directorate for Geosciences - Division of " "Ocean Sciences", "id": "NSF::GEO/OAD::GEO/OCE", @@ -66,7 +65,7 @@ def dict_award_entry_ec(): "enddate": "2025-12-31", "funding": [ { - "funding_stream": { + "fundingStream": { "description": "Test stream", "id": "TST::test::test", }, @@ -93,10 +92,10 @@ def expected_from_award_json(): "id": "021nxhr62::0751743", "identifiers": [{"identifier": "https://test.com", "scheme": "url"}], "number": "0751743", + "program": "NSF::GEO/OAD::GEO/OCE", "title": {"en": "Test title"}, "funder": {"id": "021nxhr62"}, "acronym": "TA", - "program": "GEO/OAD", } @@ -111,131 +110,78 @@ def expected_from_award_json_ec(): "title": {"en": "Test title"}, "funder": {"id": "00k4n6c32"}, "acronym": "TS", - "program": "test", + "program": "TST::test::test", } -API_JSON_RESPONSE_CONTENT = { - "linkset": [ - { - "anchor": "https://example.com/records/10488385", - "item": [ - { - "href": "https://example.com/records/10488385/files/organization.tar", - "type": "application/x-tar", - }, - { - "href": "https://example.com/records/10488385/files/project.tar", - "type": "application/x-tar", - }, - ], - }, - { - "anchor": "https://example.com/api/records/10488385", - "describes": [ - {"href": "https://example.com/records/10488385", "type": "text/html"} - ], - "type": "application/dcat+xml", - }, - ] -} - -API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR = { - "linkset": [ - { - "anchor": "https://example.com/records/10488385", - "item": [ - { - "href": "https://example.com/records/10488385/files/organization.tar", - "type": "application/x-tar", - }, - { - "href": "https://example.com/records/10488385/files/project.tar", - "type": "application/x-tar", - }, - { - "href": "https://example.com/another/project.tar", - "type": "application/x-tar", - }, - ], - }, - { - "anchor": "https://example.com/api/records/10488385", - "describes": [ - {"href": "https://example.com/records/10488385", "type": "text/html"} - ], - "type": "application/dcat+xml", - }, - ] -} - -DOWNLOAD_FILE_BYTES_CONTENT = b"The content of the file" - - -class MockResponse: - content = DOWNLOAD_FILE_BYTES_CONTENT - - def __init__(self, api_json_response_content): - self.api_json_response_content = api_json_response_content - - def json(self, **kwargs): - return self.api_json_response_content - - def raise_for_status(self): - pass - - -@pytest.fixture(scope="function") -def download_file_bytes_content(): - return DOWNLOAD_FILE_BYTES_CONTENT - - -@patch( - "requests.get", - side_effect=lambda url, headers=None: MockResponse(API_JSON_RESPONSE_CONTENT), +CORDIS_PROJECT_XML = bytes( + """ + + en + en + 264556 + 101117736 + Time2SWITCH + + 10.3030/101117736 + + + + + en + 1908489 + 999979888 + ATU37675002 + TECHNISCHE UNIVERSITAET WIEN + TU WIEN + Institute of Electrodynamics, Microwave and Circuit Engineer + + + + en + de,en,es,fr,it,pl + HES + Higher or Secondary Education Establishments + /Higher or Secondary Education Establishments + + + + + + + + en + de,en,es,fr,it,pl + /21/39/225 + oncology + /medical and health sciences/clinical medicine/oncology + + + + + """, + encoding="utf-8", ) -def test_openaire_project_http_reader(_, download_file_bytes_content): - reader = OpenAIREProjectHTTPReader(origin="full") - results = [] - for entry in reader.read(): - results.append(entry) - - assert len(results) == 1 - assert isinstance(results[0], io.BytesIO) - assert results[0].read() == download_file_bytes_content - - -@patch( - "requests.get", - side_effect=lambda url, headers=None: MockResponse( - API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR - ), -) -def test_openaire_project_http_reader_wrong_number_tar_items_error(_): - reader = OpenAIREProjectHTTPReader(origin="full") - with pytest.raises(ReaderError): - next(reader.read()) - - -def test_openaire_project_http_reader_unsupported_origin_option(): - reader = OpenAIREProjectHTTPReader(origin="unsupported_origin_option") - with pytest.raises(ReaderError): - next(reader.read()) - - -def test_openaire_project_http_reader_item_not_implemented(): - reader = OpenAIREProjectHTTPReader() - with pytest.raises(NotImplementedError): - next(reader.read("A fake item")) -def test_openaire_project_http_reader_iter_not_implemented(): - reader = OpenAIREProjectHTTPReader() - with pytest.raises(NotImplementedError): - reader._iter("A fake file pointer") +@pytest.fixture(scope="module") +def expected_from_cordis_project_xml(): + return { + "id": "00k4n6c32::101117736", + "subjects": [{"id": "euroscivoc:225"}], + "organizations": [ + { + "id": "999979888", + "scheme": "pic", + "organization": "TECHNISCHE UNIVERSITAET WIEN", + } + ], + } def test_awards_transformer(app, dict_award_entry, expected_from_award_json): + """Test the OpenAIREProjectTransformer's output against expected award data.""" transformer = OpenAIREProjectTransformer() assert expected_from_award_json == transformer.apply(dict_award_entry).entry @@ -243,6 +189,7 @@ def test_awards_transformer(app, dict_award_entry, expected_from_award_json): def test_awards_service_writer_create( app, search_clear, example_funder_ec, award_full_data ): + """Verify creation of an award record and match it with expected data.""" awards_writer = AwardsServiceWriter() award_rec = awards_writer.write(StreamEntry(award_full_data)) award_dict = award_rec.entry.to_dict() @@ -255,12 +202,9 @@ def test_awards_service_writer_create( def test_awards_funder_id_not_exist( - app, - search_clear, - example_funder, - example_funder_ec, - award_full_data_invalid_id, + app, search_clear, example_funder, example_funder_ec, award_full_data_invalid_id ): + """Ensure writing an award with an invalid funder ID raises an error.""" awards_writer = AwardsServiceWriter() with pytest.raises(WriterError) as err: awards_writer.write(StreamEntry(award_full_data_invalid_id)) @@ -278,6 +222,7 @@ def test_awards_funder_id_not_exist( def test_awards_funder_id_not_exist_no_funders( app, search_clear, award_full_data_invalid_id ): + """Check error on writing an award with no valid funders.""" awards_writer = AwardsServiceWriter() with pytest.raises(WriterError) as err: awards_writer.write(StreamEntry(award_full_data_invalid_id)) @@ -300,7 +245,9 @@ def test_awards_transformer_ec_functionality( expected_from_award_json, expected_from_award_json_ec, ): + """Test transformer output for standard and EC-specific awards.""" transformer = OpenAIREProjectTransformer() + assert expected_from_award_json == transformer.apply(dict_award_entry).entry assert expected_from_award_json_ec == transformer.apply(dict_award_entry_ec).entry @@ -308,6 +255,7 @@ def test_awards_transformer_ec_functionality( def test_awards_service_writer_duplicate( app, search_clear, example_funder_ec, award_full_data ): + """Verify error on attempting to create a duplicate award.""" writer = AwardsServiceWriter() award_rec = writer.write(stream_entry=StreamEntry(award_full_data)) Award.index.refresh() # refresh index to make changes live @@ -324,20 +272,17 @@ def test_awards_service_writer_duplicate( def test_awards_service_writer_update_existing( app, search_clear, example_funder_ec, award_full_data, service ): - # create vocabulary + """Check updating an existing award record with new data.""" writer = AwardsServiceWriter(update=True) orig_award_rec = writer.write(stream_entry=StreamEntry(award_full_data)) Award.index.refresh() # refresh index to make changes live - # update vocabulary updated_award = deepcopy(award_full_data) updated_award["title"] = {"en": "New Test title"} - # check changes vocabulary _ = writer.write(stream_entry=StreamEntry(updated_award)) award_rec = service.read(system_identity, orig_award_rec.entry.id) award_dict = award_rec.to_dict() updated_award["funder"]["name"] = example_funder_ec["name"] - # needed while the writer resolves from ES assert _.entry.id == orig_award_rec.entry.id assert dict(award_dict, **updated_award) == award_dict @@ -348,10 +293,9 @@ def test_awards_service_writer_update_existing( def test_awards_service_writer_update_non_existing( app, search_clear, example_funder_ec, award_full_data, service ): - # vocabulary item not created, call update directly + """Test updating a non-existing award, creating a new record.""" updated_award = deepcopy(award_full_data) updated_award["title"] = {"en": "New Test title"} - # check changes vocabulary writer = AwardsServiceWriter(update=True) award_rec = writer.write(stream_entry=StreamEntry(updated_award)) award_rec = service.read(system_identity, award_rec.entry.id) @@ -362,3 +306,14 @@ def test_awards_service_writer_update_non_existing( # not-ideal cleanup award_rec._record.delete(force=True) + + +def test_awards_cordis_transformer(expected_from_cordis_project_xml): + """Validate transformation of CORDIS project XML to expected format.""" + reader = XMLReader() + award = next(reader.read(CORDIS_PROJECT_XML)) + + cordis_transformer = CORDISProjectTransformer() + transformed_award_data = cordis_transformer.apply(StreamEntry(award["project"])) + + assert transformed_award_data.entry == expected_from_cordis_project_xml diff --git a/tests/contrib/common/openaire/test_openaire_datastreams.py b/tests/contrib/common/openaire/test_openaire_datastreams.py new file mode 100644 index 00000000..372b1dc6 --- /dev/null +++ b/tests/contrib/common/openaire/test_openaire_datastreams.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""OpenAIRE-related Datastreams Readers/Writers/Transformers tests.""" + +import io +from unittest.mock import patch + +import pytest + +from invenio_vocabularies.contrib.common.openaire.datastreams import OpenAIREHTTPReader +from invenio_vocabularies.datastreams.errors import ReaderError + +API_JSON_RESPONSE_CONTENT = { + "linkset": [ + { + "anchor": "https://example.com/records/10488385", + "item": [ + { + "href": "https://example.com/records/10488385/files/organization.tar", + "type": "application/x-tar", + }, + { + "href": "https://example.com/records/10488385/files/project.tar", + "type": "application/x-tar", + }, + ], + }, + { + "anchor": "https://example.com/api/records/10488385", + "describes": [ + {"href": "https://example.com/records/10488385", "type": "text/html"} + ], + "type": "application/dcat+xml", + }, + ] +} + +API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR = { + "linkset": [ + { + "anchor": "https://example.com/records/10488385", + "item": [ + { + "href": "https://example.com/records/10488385/files/organization.tar", + "type": "application/x-tar", + }, + { + "href": "https://example.com/records/10488385/files/project.tar", + "type": "application/x-tar", + }, + { + "href": "https://example.com/another/project.tar", + "type": "application/x-tar", + }, + ], + }, + { + "anchor": "https://example.com/api/records/10488385", + "describes": [ + {"href": "https://example.com/records/10488385", "type": "text/html"} + ], + "type": "application/dcat+xml", + }, + ] +} + +DOWNLOAD_FILE_BYTES_CONTENT = b"The content of the file" + + +class MockResponse: + content = DOWNLOAD_FILE_BYTES_CONTENT + + def __init__(self, api_json_response_content): + self.api_json_response_content = api_json_response_content + + def json(self, **kwargs): + return self.api_json_response_content + + def raise_for_status(self): + pass + + +@pytest.fixture(scope="function") +def download_file_bytes_content(): + return DOWNLOAD_FILE_BYTES_CONTENT + + +@patch( + "requests.get", + side_effect=lambda url, headers=None: MockResponse(API_JSON_RESPONSE_CONTENT), +) +def test_openaire_http_reader(_, download_file_bytes_content): + reader = OpenAIREHTTPReader(origin="full", tar_href="/project.tar") + results = [] + for entry in reader.read(): + results.append(entry) + + assert len(results) == 1 + assert isinstance(results[0], io.BytesIO) + assert results[0].read() == download_file_bytes_content + + +@patch( + "requests.get", + side_effect=lambda url, headers=None: MockResponse( + API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR + ), +) +def test_openaire_http_reader_wrong_number_tar_items_error(_): + reader = OpenAIREHTTPReader(origin="full", tar_href="/project.tar") + with pytest.raises(ReaderError): + next(reader.read()) + + +def test_openaire_http_reader_unsupported_origin_option(): + reader = OpenAIREHTTPReader(origin="unsupported_origin_option") + with pytest.raises(ReaderError): + next(reader.read()) + + +def test_openaire_http_reader_item_not_implemented(): + reader = OpenAIREHTTPReader() + with pytest.raises(NotImplementedError): + next(reader.read("A fake item")) + + +def test_openaire_http_reader_iter_not_implemented(): + reader = OpenAIREHTTPReader() + with pytest.raises(NotImplementedError): + reader._iter("A fake file pointer") diff --git a/tests/contrib/subjects/euroscivoc/test_subjects_euroscivoc_datastream.py b/tests/contrib/subjects/euroscivoc/test_subjects_euroscivoc_datastream.py index c8f62373..a638aff1 100644 --- a/tests/contrib/subjects/euroscivoc/test_subjects_euroscivoc_datastream.py +++ b/tests/contrib/subjects/euroscivoc/test_subjects_euroscivoc_datastream.py @@ -108,12 +108,12 @@ def expected_from_rdf_pref_label_with_parent(): "scheme": "EuroSciVoc", "subject": "Satellite radio", "title": { - "it": "radio satellitare", - "pl": "radio satelitarne", - "fr": "radio satellite", - "es": "radio por satélite", + "it": "Radio satellitare", + "pl": "Radio satelitarne", + "fr": "Radio satellite", + "es": "Radio por satélite", "de": "Satellitenfunk", - "en": "satellite radio", + "en": "Satellite radio", }, "props": {"parents": "euroscivoc:1225"}, "identifiers": [ @@ -127,7 +127,7 @@ def expected_from_rdf_pref_label_with_parent(): "id": "euroscivoc:1225", "scheme": "EuroSciVoc", "subject": "Radio channel", - "title": {"en": "radio channel"}, + "title": {"en": "Radio channel"}, "props": {}, "identifiers": [ { @@ -145,7 +145,7 @@ def expected_from_rdf_alt_label_without_parent(): "id": "euroscivoc:1717", "scheme": "EuroSciVoc", "subject": "Broadcastingsatellite service", - "title": {"en": "broadcastingsatellite service"}, + "title": {"en": "Broadcastingsatellite service"}, "props": {}, "identifiers": [ { diff --git a/tests/datastreams/test_transformers.py b/tests/datastreams/test_transformers.py index aab05aae..764bde63 100644 --- a/tests/datastreams/test_transformers.py +++ b/tests/datastreams/test_transformers.py @@ -18,8 +18,10 @@ @pytest.fixture(scope="module") def expected_from_xml(): return { - "field_one": "value", - "multi_field": {"some": "value", "another": "value too"}, + "record": { + "field_one": "value", + "multi_field": {"some": "value", "another": "value too"}, + } } @@ -55,6 +57,7 @@ def test_bad_xml_transformer(): ) ) - transformer = XMLTransformer() + transformer = XMLTransformer(root_element="field_two") + with pytest.raises(TransformerError): transformer.apply(bytes_xml_entry) diff --git a/tests/datastreams/test_writers.py b/tests/datastreams/test_writers.py index 04a1b1eb..8456b22f 100644 --- a/tests/datastreams/test_writers.py +++ b/tests/datastreams/test_writers.py @@ -76,6 +76,28 @@ def test_service_writer_update_non_existing(lang_type, lang_data, service, ident assert dict(record, **updated_lang) == record +def test_writer_wrong_config_no_insert_no_update( + lang_type, lang_data, service, identity +): + writer = ServiceWriter(service, identity=identity, insert=False, update=False) + + with pytest.raises(WriterError) as err: + writer.write(stream_entry=StreamEntry(lang_data)) + + expected_error = ["Writer wrongly configured to not insert and to not update"] + assert expected_error in err.value.args + + +def test_writer_no_insert(lang_type, lang_data, service, identity): + writer = ServiceWriter(service, identity=identity, insert=False, update=True) + + with pytest.raises(WriterError) as err: + writer.write(stream_entry=StreamEntry(lang_data)) + + expected_error = [f"Vocabulary entry does not exist: {lang_data}"] + assert expected_error in err.value.args + + ## # YAML Writer ##