Skip to content

Commit

Permalink
jobs: register RoR affiliation job
Browse files Browse the repository at this point in the history
  • Loading branch information
kpsherva committed Aug 23, 2024
1 parent e8e9843 commit 1a9a132
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 5 deletions.
16 changes: 12 additions & 4 deletions invenio_vocabularies/contrib/common/ror/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@


class RORHTTPReader(BaseReader):
"""ROR HTTP Reader returning an in-memory binary stream of the latest ROR data dump ZIP file."""
"""
ROR HTTP Reader returning an in-memory
binary stream of the latest ROR data dump ZIP file.
"""

def __init__(self, origin=None, mode="r", since=None, *args, **kwargs):
"""Constructor."""
Expand Down Expand Up @@ -54,7 +57,10 @@ def _get_last_dump_date(self, linksets):
)

def read(self, item=None, *args, **kwargs):
"""Reads the latest ROR data dump ZIP file from Zenodo and yields an in-memory binary stream of it."""
"""
Reads the latest ROR data dump ZIP file from
Zenodo and yields an in-memory binary stream of it.
"""
if item:
raise NotImplementedError(
"RORHTTPReader does not support being chained after another reader"
Expand Down Expand Up @@ -91,8 +97,10 @@ def read(self, item=None, *args, **kwargs):
raise ReaderError(f"Expected 1 ZIP item but got {len(zip_files)}")

# Download the ZIP file and fully load the response bytes content in memory.
# The bytes content are then wrapped by a BytesIO to be file-like object (as required by `zipfile.ZipFile`).
# Using directly `file_resp.raw` is not possible since `zipfile.ZipFile` requires the file-like object to be seekable.
# The bytes content are then wrapped by a BytesIO to be
# file-like object (as required by `zipfile.ZipFile`).
# Using directly `file_resp.raw` is not possible since
# `zipfile.ZipFile` requires the file-like object to be seekable.
file_resp = requests.get(file_url)
file_resp.raise_for_status()
yield io.BytesIO(file_resp.content)
Expand Down
77 changes: 77 additions & 0 deletions invenio_vocabularies/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import datetime
import typing

from invenio_jobs.jobs import RegisteredTask

from invenio_vocabularies.services.tasks import process_datastream
from marshmallow import EXCLUDE, Schema, fields, validate, post_load, types
from marshmallow_utils.fields import SanitizedUnicode, TZDateTime
from datetime import timezone


class ArgsSchema(Schema):

since = TZDateTime(
timezone=timezone.utc,
format="iso",
metadata={
"description": "YYYY-MM-DD HH:mm format. Leave field empty if it should continue since last successful run"
},
)
type = fields.String(
metadata={"type": "hidden"},
dump_default="ArgsSchemaAPI",
load_default="ArgsSchemaAPI",
)


class ProcessDataStreamRegisteredTask(RegisteredTask):

arguments_schema = ArgsSchema
task = process_datastream
id = "process_datastream"
title = "Generic Process Data Stream task"

@classmethod
def build_task_arguments(cls, job_obj, since=None, custom_args=None, **kwargs):
raise NotImplemented


class ProcessRORAffiliationsRegisteredTask(ProcessDataStreamRegisteredTask):
"""Process ROR affiliations datastream registered task."""

description = "Process ROR affiliations"
title = "Load ROR affiliations"

@classmethod
def build_task_arguments(cls, job_obj, since=None, custom_args=None, **kwargs):
if custom_args:
return custom_args

if since is None and job_obj.last_runs["success"]:
since = job_obj.last_runs["success"].started_at
else:
since = datetime.datetime.now()

return {"config": {
"readers": [
{
"args": {"since": since},
"type": "ror-http",
},
{"args": {"regex": "_schema_v2\\.json$"}, "type": "zip"},
{"type": "json"},
],
"writers": [
{
"args": {
"writer": {
"type": "affiliations-service",
"args": {"update": True},
}
},
"type": "async",
}
],
"transformers": [{"type": "ror-affiliations"}],
}}
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ invenio_i18n.translations =
invenio_vocabularies = invenio_vocabularies
invenio_celery.tasks =
invenio_vocabularies_services = invenio_vocabularies.services.tasks

invenio_jobs.jobs =
process_datastream = invenio_vocabularies.jobs:ProcessRORAffiliationsRegisteredTask

[build_sphinx]
source-dir = docs/
Expand Down

0 comments on commit 1a9a132

Please sign in to comment.