Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Needs discussion]names: adds logging #357

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions invenio_vocabularies/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import click
from flask.cli import with_appcontext
from invenio_access.permissions import system_identity
from invenio_logging.structlog import LoggerFactory
from invenio_pidstore.errors import PIDDeletedError, PIDDoesNotExistError

from .datastreams import DataStreamFactory
Expand All @@ -31,22 +32,38 @@ def _process_vocab(config, num_samples=None):
transformers_config=config.get("transformers"),
writers_config=config["writers"],
)

cli_logger = LoggerFactory.get_logger("cli")
cli_logger.info("Starting processing")
success, errored, filtered = 0, 0, 0
left = num_samples or -1
for result in ds.process():
batch_size = config.get("batch_size", 1000)
write_many = config.get("write_many", False)

for result in ds.process(batch_size=batch_size, write_many=write_many):
left = left - 1
if result.filtered:
filtered += 1
cli_logger.info("Filtered", entry=result.entry, operation=result.op_type)
if result.errors:
for err in result.errors:
click.secho(err, fg="red")
cli_logger.error(
"Error",
entry=result.entry,
operation=result.op_type,
errors=result.errors,
)
errored += 1
else:
success += 1
cli_logger.info("Success", entry=result.entry, operation=result.op_type)
if left == 0:
click.secho(f"Number of samples reached {num_samples}", fg="green")
break
cli_logger.info(
"Finished processing", success=success, errored=errored, filtered=filtered
)

return success, errored, filtered


Expand Down Expand Up @@ -101,7 +118,10 @@ def update(vocabulary, filepath=None, origin=None):
config = vc.get_config(filepath, origin)

for w_conf in config["writers"]:
w_conf["args"]["update"] = True
if w_conf["type"] == "async":
w_conf["args"]["writer"]["args"]["update"] = True
else:
w_conf["args"]["update"] = True

success, errored, filtered = _process_vocab(config)

Expand Down Expand Up @@ -140,18 +160,27 @@ def convert(vocabulary, filepath=None, origin=None, target=None, num_samples=Non
type=click.STRING,
help="Identifier of the vocabulary item to delete.",
)
@click.option("--all", is_flag=True, default=False, help="Not supported yet.")
@click.option("--all", is_flag=True, default=False)
@with_appcontext
def delete(vocabulary, identifier, all):
"""Delete all items or a specific one of the vocabulary."""
if not id and not all:
if not identifier and not all:
click.secho("An identifier or the --all flag must be present.", fg="red")
exit(1)

vc = get_vocabulary_config(vocabulary)
service = vc.get_service()
if identifier:
try:
if service.delete(identifier, system_identity):
if service.delete(system_identity, identifier):
click.secho(f"{identifier} deleted from {vocabulary}.", fg="green")
except (PIDDeletedError, PIDDoesNotExistError):
click.secho(f"PID {identifier} not found.")
elif all:
items = service.scan(system_identity)
for item in items.hits:
try:
if service.delete(system_identity, item["id"]):
click.secho(f"{item['id']} deleted from {vocabulary}.", fg="green")
except (PIDDeletedError, PIDDoesNotExistError):
click.secho(f"PID {item['id']} not found.")
14 changes: 13 additions & 1 deletion invenio_vocabularies/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
ZipReader,
)
from .datastreams.transformers import XMLTransformer
from .datastreams.writers import ServiceWriter, YamlWriter
from .datastreams.writers import AsyncWriter, ServiceWriter, YamlWriter
from .resources import VocabulariesResourceConfig
from .services.config import VocabulariesServiceConfig

Expand Down Expand Up @@ -134,6 +134,7 @@
VOCABULARIES_DATASTREAM_WRITERS = {
"service": ServiceWriter,
"yaml": YamlWriter,
"async": AsyncWriter,
}
"""Data Streams writers."""

Expand All @@ -154,3 +155,14 @@
"sort": ["name", "count"],
}
"""Vocabulary type search configuration."""

VOCABULARIES_ORCID_ACCESS_KEY = "TODO"
"""ORCID access key to access the s3 bucket."""
VOCABULARIES_ORCID_SECRET_KEY = "TODO"
"""ORCID secret key to access the s3 bucket."""
VOCABULARIES_ORCID_SUMMARIES_BUCKET = "v3.0-summaries"
"""ORCID summaries bucket name."""
VOCABULARIES_ORCID_SYNC_MAX_WORKERS = 32
"""ORCID max number of simultaneous workers/connections."""
VOCABULARIES_ORCID_SYNC_DAYS = 1
"""ORCID number of days to sync."""
120 changes: 112 additions & 8 deletions invenio_vocabularies/contrib/names/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,115 @@

"""Names datastreams, transformers, writers and readers."""

from invenio_access.permissions import system_identity
import io
import tarfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta

import s3fs
from flask import current_app
from invenio_records.dictutils import dict_lookup

from ...datastreams.errors import TransformerError
from ...datastreams.readers import SimpleHTTPReader
from ...datastreams.readers import BaseReader, SimpleHTTPReader
from ...datastreams.transformers import BaseTransformer
from ...datastreams.writers import ServiceWriter


class OrcidDataSyncReader(BaseReader):
"""ORCiD Data Sync Reader."""

def _fetch_orcid_data(self, orcid_to_sync, fs, bucket):
"""Fetches a single ORCiD record from S3."""
# The ORCiD file key is located in a folder which name corresponds to the last three digits of the ORCiD
suffix = orcid_to_sync[-3:]
key = f"{suffix}/{orcid_to_sync}.xml"
try:
with fs.open(f"s3://{bucket}/{key}", "rb") as f:
file_response = f.read()
return file_response
except Exception as e:
# TODO: log
return None

def _process_lambda_file(self, fileobj):
"""Process the ORCiD lambda file and returns a list of ORCiDs to sync.

The decoded fileobj looks like the following:
orcid,last_modified,created
0000-0001-5109-3700,2021-08-02 15:00:00.000,2021-08-02 15:00:00.000

Yield ORCiDs to sync until the last sync date is reached.
"""
date_format = "%Y-%m-%d %H:%M:%S.%f"
date_format_no_millis = "%Y-%m-%d %H:%M:%S"

last_sync = datetime.now() - timedelta(days=current_app.config["VOCABULARIES_ORCID_SYNC_DAYS"])

file_content = fileobj.read().decode("utf-8")

for line in file_content.splitlines()[1:]: # Skip the header line
elements = line.split(",")
orcid = elements[0]

# Lambda file is ordered by last modified date
last_modified_str = elements[3]
try:
last_modified_date = datetime.strptime(last_modified_str, date_format)
except ValueError:
last_modified_date = datetime.strptime(
last_modified_str, date_format_no_millis
)

if last_modified_date >= last_sync:
yield orcid
else:
break

def _iter(self, orcids, fs):
"""Iterates over the ORCiD records yielding each one."""

with ThreadPoolExecutor(
max_workers=current_app.config["VOCABULARIES_ORCID_SYNC_MAX_WORKERS"]
) as executor:
futures = [
executor.submit(
self._fetch_orcid_data,
orcid,
fs,
current_app.config["VOCABULARIES_ORCID_SUMMARIES_BUCKET"],
)
for orcid in orcids
]
for future in as_completed(futures):
result = future.result()
if result is not None:
yield result

def read(self, item=None, *args, **kwargs):
"""Streams the ORCiD lambda file, process it to get the ORCiDS to sync and yields it's data."""
fs = s3fs.S3FileSystem(
key=current_app.config["VOCABULARIES_ORCID_ACCESS_KEY"],
secret=current_app.config["VOCABULARIES_ORCID_SECRET_KEY"],
)
# Read the file from S3
with fs.open("s3://orcid-lambda-file/last_modified.csv.tar", "rb") as f:
tar_content = f.read()

orcids_to_sync = []
# Opens tar file and process it
with tarfile.open(fileobj=io.BytesIO(tar_content)) as tar:
# Iterate over each member (file or directory) in the tar file
for member in tar.getmembers():
# Extract the file
extracted_file = tar.extractfile(member)
if extracted_file:
# Process the file and get the ORCiDs to sync
orcids_to_sync.extend(self._process_lambda_file(extracted_file))

yield from self._iter(orcids_to_sync, fs)


class OrcidHTTPReader(SimpleHTTPReader):
"""ORCiD HTTP Reader."""

Expand All @@ -42,6 +142,8 @@ def apply(self, stream_entry, **kwargs):
name = person.get("name")
if name is None:
raise TransformerError(f"Name not found in ORCiD entry.")
if name.get("family-name") is None:
raise TransformerError(f"Family name not found in ORCiD entry.")

entry = {
"id": orcid_id,
Expand Down Expand Up @@ -89,6 +191,7 @@ def _entry_id(self, entry):

VOCABULARIES_DATASTREAM_READERS = {
"orcid-http": OrcidHTTPReader,
"orcid-data-sync": OrcidDataSyncReader,
}


Expand All @@ -107,22 +210,23 @@ def _entry_id(self, entry):
DATASTREAM_CONFIG = {
"readers": [
{
"type": "tar",
"args": {
"regex": "\\.xml$",
},
"type": "orcid-data-sync",
},
{"type": "xml"},
],
"transformers": [{"type": "orcid"}],
"writers": [
{
"type": "names-service",
"type": "async",
"args": {
"identity": system_identity,
"writer": {
"type": "names-service",
}
},
}
],
"batch_size": 1000,
"write_many": True,
}
"""ORCiD Data Stream configuration.

Expand Down
61 changes: 49 additions & 12 deletions invenio_vocabularies/datastreams/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,25 @@

"""Base data stream."""

from invenio_logging.structlog import LoggerFactory

from .errors import ReaderError, TransformerError, WriterError


class StreamEntry:
"""Object to encapsulate streams processing."""

def __init__(self, entry, errors=None):
"""Constructor."""
def __init__(self, entry, errors=None, op_type=None):
"""Constructor for the StreamEntry class.

:param entry (object): The entry object, usually a record dict.
:param errors (list, optional): List of errors. Defaults to None.
:param op_type (str, optional): The operation type. Defaults to None.
"""
self.entry = entry
self.filtered = False
self.errors = errors or []
self.op_type = op_type


class DataStream:
Expand All @@ -39,15 +47,9 @@ def filter(self, stream_entry, *args, **kwargs):
"""Checks if an stream_entry should be filtered out (skipped)."""
return False

def process(self, *args, **kwargs):
"""Iterates over the entries.

Uses the reader to get the raw entries and transforms them.
It will iterate over the `StreamEntry` objects returned by
the reader, apply the transformations and yield the result of
writing it.
"""
for stream_entry in self.read():
def process_batch(self, batch, write_many=False):
transformed_entries = []
for stream_entry in batch:
if stream_entry.errors:
yield stream_entry # reading errors
else:
Expand All @@ -58,7 +60,37 @@ def process(self, *args, **kwargs):
transformed_entry.filtered = True
yield transformed_entry
else:
yield self.write(transformed_entry)
transformed_entries.append(transformed_entry)
if transformed_entries:
if write_many:
yield from self.batch_write(transformed_entries)
else:
yield from (self.write(entry) for entry in transformed_entries)

def process(self, batch_size=100, write_many=False, logger=None, *args, **kwargs):
"""Iterates over the entries.

Uses the reader to get the raw entries and transforms them.
It will iterate over the `StreamEntry` objects returned by
the reader, apply the transformations and yield the result of
writing it.
"""
if not logger:
logger = LoggerFactory.get_logger("datastreams")

batch = []
logger.info(
f"Start reading datastream with batch_size={batch_size} and write_many={write_many}"
)
for stream_entry in self.read():
batch.append(stream_entry)
if len(batch) >= batch_size:
yield from self.process_batch(batch, write_many=write_many)
batch = []

# Process any remaining entries in the last batch
if batch:
yield from self.process_batch(batch, write_many=write_many)

def read(self):
"""Recursively read the entries."""
Expand Down Expand Up @@ -107,6 +139,11 @@ def write(self, stream_entry, *args, **kwargs):

return stream_entry

def batch_write(self, stream_entries, *args, **kwargs):
"""Apply the transformations to an stream_entry. Errors are handler in the service layer."""
for writer in self._writers:
yield from writer.write_many(stream_entries)

def total(self, *args, **kwargs):
"""The total of entries obtained from the origin."""
raise NotImplementedError()
Loading
Loading