Skip to content

Commit 20678a4

Browse files
committed
Merge branch 'master' into feat/test-framework
2 parents 3173a66 + cee1b6f commit 20678a4

File tree

18 files changed

+399
-198
lines changed

18 files changed

+399
-198
lines changed

server/preprocessing/other-scripts/metrics.R

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,50 @@ library("plyr")
55
mlog <- getLogger("metrics")
66

77

8-
enrich_metadata_metrics <- function(metadata) {
8+
enrich_metadata_metrics <- function(metadata, metrics_sources=c("altmetric", "crossref")) {
99
start.time <- Sys.time()
1010

11+
original_sorting <- metadata$id
12+
13+
if ("altmetric" %in% metrics_sources) {
14+
metadata <- add_altmetrics(metadata)
15+
}
16+
if ("crossref" %in% metrics_sources) {
17+
metadata <- add_citations(metadata)
18+
}
19+
20+
# Remove duplicate lines - TODO: check for root of this problem
21+
metadata <- unique(metadata)
22+
23+
# restore original sorting
24+
metadata <- metadata[match(original_sorting, metadata$id), ]
25+
26+
end.time <- Sys.time()
27+
time.taken <- end.time - start.time
28+
mlog$info(paste("vis_id:", .GlobalEnv$VIS_ID, "Time taken:", time.taken, sep = " "))
29+
30+
return(metadata)
31+
}
32+
33+
get_altmetrics <- function(dois) {
34+
valid_dois <- unique(dois[which(dois != "")])
35+
results <- data.frame()
36+
for (doi in valid_dois) {
37+
tryCatch(
38+
{
39+
metrics <- altmetric_data(altmetrics(doi = doi, apikey = ""))
40+
results <- rbind.fill(results, metrics)
41+
},
42+
error = function(err) {
43+
mlog$debug(gsub("[\r\n]", "", paste(err, doi, sep = " ")))
44+
}
45+
)
46+
}
47+
return(results)
48+
}
49+
50+
add_altmetrics <- function(metadata) {
51+
1152
results <- get_altmetrics(metadata$doi)
1253
requested_metrics <- c(
1354
"cited_by_wikipedia_count",
@@ -36,45 +77,19 @@ enrich_metadata_metrics <- function(metadata) {
3677
# merge the metadata with the results of the altmetrics
3778
# don't remove any rows from the metadata, just add the altmetrics to the
3879
# output
39-
output <- merge(x = metadata, y = results, by = "doi", all.x = TRUE, all.y = FALSE)
80+
result <- merge(x = metadata, y = results, by = "doi", all.x = TRUE, all.y = FALSE)
4081
} else {
4182
for (metric in requested_metrics) {
4283
metadata[[metric]] <- NA
4384
}
4485
mlog$info("No altmetrics found for any paper in this dataset.")
45-
output <- metadata
86+
result <- metadata
4687
}
47-
output <- add_citations(output)
48-
49-
# Remove duplicate lines - TODO: check for root of this problem
50-
output <- unique(output)
51-
52-
end.time <- Sys.time()
53-
time.taken <- end.time - start.time
54-
mlog$info(paste("vis_id:", .GlobalEnv$VIS_ID, "Time taken:", time.taken, sep = " "))
55-
56-
return(output)
57-
}
58-
59-
get_altmetrics <- function(dois) {
60-
valid_dois <- unique(dois[which(dois != "")])
61-
results <- data.frame()
62-
for (doi in valid_dois) {
63-
tryCatch(
64-
{
65-
metrics <- altmetric_data(altmetrics(doi = doi, apikey = ""))
66-
results <- rbind.fill(results, metrics)
67-
},
68-
error = function(err) {
69-
mlog$debug(gsub("[\r\n]", "", paste(err, doi, sep = " ")))
70-
}
71-
)
72-
}
73-
return(results)
88+
return(result)
7489
}
7590

76-
add_citations <- function(output) {
77-
dois <- output$doi
91+
add_citations <- function(metadata) {
92+
dois <- metadata$doi
7893
valid_dois <- unique(dois[which(dois != "")])
7994

8095
cc <- tryCatch(
@@ -87,6 +102,6 @@ add_citations <- function(output) {
87102
}
88103
)
89104
names(cc)[names(cc) == "count"] <- "citation_count"
90-
output <- merge(x = output, y = cc, by = "doi", all.x = TRUE)
91-
return(output)
105+
result <- merge(x = metadata, y = cc, by = "doi", all.x = TRUE)
106+
return(result)
92107
}

server/preprocessing/other-scripts/run_metrics.R

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ if (!is.null(params$lang_id)) {
4747
lang_id <- 'all'
4848
}
4949

50+
if (!is.null(params$metrics_sources)) {
51+
metrics_sources <- params$metrics_sources
52+
} else {
53+
metrics_sources <- c("altmetric", "crossref")
54+
}
55+
5056
source('metrics.R')
5157

5258
registerDoParallel(detectCores(all.tests = FALSE, logical = TRUE)-1)
@@ -57,7 +63,7 @@ tryCatch({
5763
if ("doi" %in% names(metadata)) {
5864
# only enrich metadata with metrics if at least one DOI is present
5965
if (!all(is.na(metadata$doi))) {
60-
output <- enrich_metadata_metrics(metadata)
66+
output <- enrich_metadata_metrics(metadata, metrics_sources)
6167
}
6268
} else {
6369
mlog$warn("No DOIs found in metadata")

server/services/snapshot/data-config_pubmed.js

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,23 @@ var data_config = {
33
mode: "search_repos",
44

55
service: "pubmed",
6+
bubble_min_scale: 1.2,
7+
bubble_max_scale: 1,
8+
paper_min_scale: 1,
9+
paper_max_scale: 1,
10+
showLanguage: true,
611

7-
title: "",
12+
// Configuring papers scaling
813
base_unit: "citations",
14+
initial_sort: "citations",
15+
scale_by: "citations",
16+
17+
title: "",
918
use_area_uri: true,
1019
show_multiples: false,
1120
show_dropdown: false,
1221
preview_type: "pdf",
13-
sort_options: ["readers", "title", "authors", "year"],
22+
sort_options: ["citations", "title", "authors", "year"],
1423
is_force_areas: true,
1524
language: "eng_pubmed",
1625
area_force_alpha: 0.015,

server/workers/common/common/utils.py

Lines changed: 154 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1+
import re
12
import os
23
import json
34
import time
45
import uuid
5-
from dateutil.parser import parse
6-
from datetime import timedelta
7-
import re
86
import redis
9-
import pandas as pd
107
import pathlib
8+
import numpy as np
9+
import pandas as pd
10+
from datetime import timedelta
11+
from dateutil.parser import parse
12+
from typing import Dict, List, Union
13+
from typing_extensions import Literal
1114

1215

1316
redis_config = {
@@ -29,7 +32,7 @@ def get_key(store, key, timeout=180):
2932
result = {
3033
"k": key,
3134
"status": "error",
32-
"error": "timeout"
35+
"error": "timeout"
3336
}
3437
while tries <= max_tries:
3538
res = store.get(key+"_output")
@@ -119,3 +122,149 @@ def get_nested_value(data, keys, default=None):
119122
if data is None:
120123
return default
121124
return data
125+
126+
127+
def push_metadata_to_queue(
128+
redis_store: redis.Redis,
129+
params: Dict[str, Union[str, List[str]]],
130+
metadata: pd.DataFrame,
131+
source_list: List[str]
132+
) -> str:
133+
"""
134+
Sending metadata for processing into Redis queue and returning the request_id.
135+
136+
:param redis_store: Object of the Redis store.
137+
:param params: Request params.
138+
:param metadata: DataFrame with default metadata.
139+
:param source_list: define from which service additional metadata will be received (available values: "crossref", "altmetric").
140+
:return: request_id for the receiving of the request result.
141+
"""
142+
# Checks that valid values are specified in the source array
143+
check_metadata_enrichment_source(source_list)
144+
145+
# Creates a new unique request identifier that will then be used to retrieve the result
146+
request_id = str(uuid.uuid4())
147+
148+
# Specifies from which sources to obtain information
149+
params["metrics_sources"] = source_list
150+
151+
# Payload object creation
152+
task_data = json.dumps({
153+
"id": request_id,
154+
"params": params,
155+
"metadata": metadata.to_json(orient="records"),
156+
})
157+
158+
# Pushing request to Redis and returning request id
159+
redis_store.rpush("metrics", task_data)
160+
return request_id
161+
162+
163+
def check_metadata_enrichment_source(source_list: List[str]) -> None:
164+
"""
165+
Checks that valid values are specified in the source array.
166+
167+
:param source_list: List of sources from where metadata will be enriched.
168+
:return: None.
169+
"""
170+
if not all(source in ("crossref", "altmetric") for source in source_list):
171+
raise ValueError("Source list must contain only 'crossref' or 'altmetric'")
172+
173+
174+
def fetch_enriched_metadata(redis_store: redis.Redis, request_id: str, timeout: int = 600) -> pd.DataFrame:
175+
"""
176+
Getting enriched metadata from Redis.
177+
178+
:param redis_store: Object of the Redis store.
179+
:param request_id: Unique indemnificator of the request.
180+
:param timeout: Results waiting time (default - 600 seconds).
181+
:return: Enriched DataFrame with metadata.
182+
"""
183+
# Getting result of metadata enrichment from Redis
184+
result = get_key(redis_store, request_id, timeout)
185+
return pd.DataFrame(result["input_data"])
186+
187+
188+
def get_metadata_columns_for_source(source_list: List[str]) -> List[str]:
189+
"""
190+
Returning required metadata columns for different sources.
191+
192+
:param source_list: List of sources from where metadata received.
193+
:return: array with required metadata columns.
194+
"""
195+
# Checks that valid values are specified in the source array
196+
check_metadata_enrichment_source(source_list)
197+
198+
# Define required metadata columns for different sources and return them
199+
result = []
200+
201+
if "crossref" in source_list:
202+
result.extend(["citation_count"])
203+
204+
if "altmetric" in source_list:
205+
result.extend([
206+
"cited_by_wikipedia_count",
207+
"cited_by_msm_count",
208+
"cited_by_policies_count",
209+
"cited_by_patents_count",
210+
"cited_by_accounts_count",
211+
"cited_by_fbwalls_count",
212+
"cited_by_feeds_count",
213+
"cited_by_gplus_count",
214+
"cited_by_rdts_count",
215+
"cited_by_qna_count",
216+
"cited_by_tweeters_count",
217+
"cited_by_videos_count"
218+
])
219+
220+
return result
221+
222+
223+
def ensure_required_columns(metadata: pd.DataFrame, source_list: List[str]) -> pd.DataFrame:
224+
"""
225+
Checks that all necessary columns are available or adding them with NaN value.
226+
227+
:param metadata: DataFrame with metadata.
228+
:param source_list: List of sources from where metadata received.
229+
:return: Updated DataFrame.
230+
"""
231+
# Checks that valid values are specified in the source array
232+
check_metadata_enrichment_source(source_list)
233+
234+
# Gets metadata columns that must be received from source(-s)
235+
columns = get_metadata_columns_for_source(source_list)
236+
for column in columns:
237+
if column not in metadata.columns:
238+
metadata[column] = np.NaN
239+
240+
return metadata
241+
242+
243+
def enrich_metadata(
244+
redis: redis.Redis,
245+
params: Dict[str, Union[str, List[str]]],
246+
metadata: pd.DataFrame,
247+
source_list: List[str],
248+
) -> pd.DataFrame:
249+
"""
250+
Enriching metadata - adding information about citations from Redis.
251+
252+
:param redis: store object of Redis.
253+
:param params: params of the request.
254+
:param metadata: DataFrame with default metadata.
255+
:param source: define from which service additional metadata will be received (available values: "crossref", "altmetric").
256+
:return: Enriched DataFrame with metadata.
257+
"""
258+
# Checks that valid values are specified in the source array
259+
check_metadata_enrichment_source(source_list)
260+
261+
# Creates a request to metrics for metadata enrichment
262+
# and returns request_id for receiving the result later
263+
request_id = push_metadata_to_queue(redis, params, metadata, source_list)
264+
265+
# Getting the result after metadata enrichment at metrics
266+
enriched_metadata = fetch_enriched_metadata(redis, request_id)
267+
268+
# Checks that all necessary columns are available or adding them with NaN value
269+
enriched_metadata = ensure_required_columns(enriched_metadata, source_list)
270+
return enriched_metadata

server/workers/metrics/src/metrics.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import time
22
import json
3-
import subprocess
43
import logging
4+
import subprocess
55
from common.r_wrapper import RWrapper
6-
from common.decorators import error_logging_aspect
76
from common.rate_limiter import RateLimiter
7+
from common.decorators import error_logging_aspect
8+
89

910
formatter = logging.Formatter(
1011
fmt='%(asctime)s %(levelname)-8s %(message)s',
@@ -35,15 +36,13 @@ def next_item(self):
3536
@error_logging_aspect(log_level=logging.ERROR)
3637
def execute_search(self, params: dict, metadata: str) -> dict:
3738
command = [
38-
self.command,
39-
self.runner,
40-
self.wd,
41-
params.get('q'),
39+
self.command,
40+
self.runner,
41+
self.wd,
42+
params.get('q'),
4243
params.get('service')
4344
]
4445

45-
self.logger.debug(f"Executing command: {command}")
46-
4746
data = {
4847
"params": params,
4948
"metadata": metadata
@@ -59,8 +58,6 @@ def execute_search(self, params: dict, metadata: str) -> dict:
5958
)
6059
stdout, stderr = proc.communicate(json.dumps(data))
6160

62-
self.logger.debug(f"Stdout: {stdout}")
63-
6461
output = [line for line in stdout.split('\n') if line]
6562
errors = [line for line in stderr.split('\n') if line]
6663

@@ -69,8 +66,6 @@ def execute_search(self, params: dict, metadata: str) -> dict:
6966

7067
raw_metadata = json.loads(output[-2])
7168

72-
self.logger.debug(f"Raw metadata: {raw_metadata}")
73-
7469
if isinstance(raw_metadata, dict) and raw_metadata.get('status') == "error":
7570
return raw_metadata
7671

0 commit comments

Comments
 (0)