Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate subjects in awards datastream #402

Merged
merged 11 commits into from
Oct 3, 2024
9 changes: 7 additions & 2 deletions invenio_vocabularies/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,14 @@ def update(vocabulary, filepath=None, origin=None):

for w_conf in config["writers"]:
if w_conf["type"] == "async":
w_conf["args"]["writer"]["args"]["update"] = True
w_conf_update = w_conf["args"]["writer"]
else:
w_conf["args"]["update"] = True
w_conf_update = w_conf

if "args" in w_conf_update:
w_conf_update["args"]["update"] = True
else:
w_conf_update["args"] = {"update": True}

alejandromumo marked this conversation as resolved.
Show resolved Hide resolved
success, errored, filtered = _process_vocab(config)

Expand Down
9 changes: 9 additions & 0 deletions invenio_vocabularies/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,17 @@
}
""""Generic identifier schemes, usable by other vocabularies."""


def is_pic(val):
alejandromumo marked this conversation as resolved.
Show resolved Hide resolved
"""Test if argument is a Participant Identification Code (PIC)."""
if len(val) != 9:
return False
return val.isdigit()


VOCABULARIES_AFFILIATION_SCHEMES = {
**VOCABULARIES_IDENTIFIER_SCHEMES,
"pic": {"label": _("PIC"), "validator": is_pic},
}
"""Affiliations allowed identifier schemes."""

Expand Down
96 changes: 95 additions & 1 deletion invenio_vocabularies/contrib/affiliations/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
"""Affiliations datastreams, transformers, writers and readers."""

from flask import current_app
from invenio_i18n import lazy_gettext as _

from ...datastreams.errors import TransformerError, WriterError
from ...datastreams.transformers import BaseTransformer
from ...datastreams.writers import ServiceWriter
from ..common.ror.datastreams import RORTransformer

Expand Down Expand Up @@ -46,16 +47,78 @@ def __init__(
)


class OpenAIREOrganizationTransformer(BaseTransformer):
"""OpenAIRE Organization Transformer."""

def apply(self, stream_entry, **kwargs):
"""Applies the transformation to the stream entry."""
record = stream_entry.entry

if "id" not in record:
raise TransformerError([f"No id for: {record}"])

if not record["id"].startswith("openorgs____::"):
raise TransformerError([f"Not valid OpenAIRE OpenOrgs id for: {record}"])

if "pid" not in record:
raise TransformerError([f"No alternative identifiers for: {record}"])

organization = {}

for pid in record["pid"]:
if pid["scheme"] == "ROR":
organization["id"] = pid["value"].removeprefix("https://ror.org/")
elif pid["scheme"] == "PIC":
organization["identifiers"] = [
{
"scheme": "pic",
"identifier": pid["value"],
}
]
0einstein0 marked this conversation as resolved.
Show resolved Hide resolved

stream_entry.entry = organization
return stream_entry


class OpenAIREAffiliationsServiceWriter(ServiceWriter):
"""OpenAIRE Affiliations service writer."""

def __init__(self, *args, **kwargs):
"""Constructor."""
service_or_name = kwargs.pop("service_or_name", "affiliations")
# Here we only update and we do not insert, since OpenAIRE data is used to augment existing affiliations
# (with PIC identifiers) and is not used to create new affiliations.
super().__init__(
service_or_name=service_or_name, insert=False, update=True, *args, **kwargs
)

def _entry_id(self, entry):
"""Get the id from an entry."""
return entry["id"]

def write(self, stream_entry, *args, **kwargs):
"""Writes the input entry using a given service."""
entry = stream_entry.entry

return super().write(stream_entry, *args, **kwargs)

def write_many(self, stream_entries, *args, **kwargs):
"""Writes the input entries using a given service."""
return super().write_many(stream_entries, *args, **kwargs)
0einstein0 marked this conversation as resolved.
Show resolved Hide resolved


VOCABULARIES_DATASTREAM_READERS = {}
"""Affiliations datastream readers."""

VOCABULARIES_DATASTREAM_WRITERS = {
"affiliations-service": AffiliationsServiceWriter,
"openaire-affiliations-service": OpenAIREAffiliationsServiceWriter,
}
"""Affiliations datastream writers."""

VOCABULARIES_DATASTREAM_TRANSFORMERS = {
"ror-affiliations": AffiliationsRORTransformer,
"openaire-organization": OpenAIREOrganizationTransformer,
}
"""Affiliations datastream transformers."""

Expand Down Expand Up @@ -90,3 +153,34 @@ def __init__(

An origin is required for the reader.
"""

DATASTREAM_CONFIG_OPENAIRE = {
"readers": [
{"type": "openaire-http", "args": {"tar_href": "/organization.tar"}},
{
"type": "tar",
"args": {
"regex": "\\.json.gz$",
"mode": "r",
},
},
{"type": "gzip"},
{"type": "jsonl"},
],
"transformers": [
{
"type": "openaire-organization",
},
],
"writers": [
{
"type": "async",
"args": {
"writer": {
"type": "openaire-affiliations-service",
}
},
}
],
}
"""Alternative Data Stream configuration for OpenAIRE Affiliations."""
19 changes: 15 additions & 4 deletions invenio_vocabularies/contrib/awards/awards.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,35 @@
from invenio_records.dumpers import SearchDumper
from invenio_records.dumpers.indexedat import IndexedAtDumperExt
from invenio_records.dumpers.relations import RelationDumperExt
from invenio_records.systemfields import RelationsField
from invenio_records.systemfields import MultiRelationsField
from invenio_records_resources.factories.factory import RecordTypeFactory
from invenio_records_resources.records.systemfields import ModelPIDField, PIDRelation
from invenio_records_resources.records.systemfields import (
ModelPIDField,
PIDListRelation,
PIDRelation,
)
from invenio_records_resources.resources.records.headers import etag_headers

from ...services.permissions import PermissionPolicy
from ..funders.api import Funder
from ..subjects.api import Subject
from .config import AwardsSearchOptions, service_components
from .schema import AwardSchema
from .serializer import AwardL10NItemSchema

award_relations = RelationsField(
award_relations = MultiRelationsField(
funders=PIDRelation(
"funder",
keys=["name"],
pid_field=Funder.pid,
cache_key="funder",
)
),
subjects=PIDListRelation(
"subjects",
keys=["subject", "scheme", "identifiers", "props"],
pid_field=Subject.pid,
cache_key="subjects",
),
)

record_type = RecordTypeFactory(
Expand Down
Loading