Skip to content

Commit

Permalink
parallel processing for science direct and scopus APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
star-nox committed Mar 4, 2024
1 parent f024116 commit c3c5a8f
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 73 deletions.
196 changes: 125 additions & 71 deletions ai_ta_backend/journal_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def getFromDoi(doi: str, course_name: str):
downloadSpringerFulltext(doi=doi, course_name=course_name)
elif 'elsevier' in publisher:
# download from elsevier
downloadElsevierFulltextFromDoi(doi=doi, course_name=course_name)
downloadElsevierFulltextFromId(id=doi, id_type='doi', course_name=course_name)
else:
print("No direct openaccess link found. Searching PubMed...")

Expand Down Expand Up @@ -120,41 +120,56 @@ def downloadSpringerFulltext(issn=None, subject=None, journal=None, title=None,
The initial API response returns a list of articles with metadata.
"""
# create directory to store files
directory = os.path.join(os.getcwd(), 'springer_papers')
if not os.path.exists(directory):
os.makedirs(directory)

# set headers
api_url = "http://api.springernature.com/openaccess/json?q="
headers = {'Accept': 'application/json'}

# form the query URL based on the input parameters received
if doi:
# query by doi
query_str = "doi:" + doi
elif issn:
# query by issn
query_str = "issn:" + issn
elif journal:
# query by journal title
journal = "%22" + journal.replace(" ", "%20") + "%22"
query_str = "journal:" + journal
elif title:
# query by article title
title = "%22" + title.replace(" ", "%20") + "%22"
query_str = "title:" + title
print("Title: ", title)
elif subject:
# query by subject
query_str = "subject:" + subject
else:
return "No query parameters provided"

main_url = api_url + query_str + "&api_key=" + str(SPRINGER_API_KEY) + "&s=301"
main_url = api_url + query_str + "&api_key=" + str(SPRINGER_API_KEY)
print("Full URL: ", main_url)


response = requests.get(main_url)
response = requests.get(main_url, headers=headers)
print("Status: ", response.status_code)

if response.status_code != 200:
return "Error: " + str(response.status_code) + " - " + response.text

data = response.json()
print("Total records: ", len(data['records']))
# check for total number of records
total_records = data['result'][0]['total']
print("Total records: ", total_records)
current_records = data['result'][0]['recordsDisplayed']

while current_records < total_records:
# check if nextPage exists
if 'nextPage' in data:
next_page_url = data['nextPage']
else:
next_page_url = None

# multi-process all records in current page
# write a separate function to download articles


while 'nextPage' in data:
# extract current page data
Expand Down Expand Up @@ -261,6 +276,10 @@ def downloadElsevierFulltextFromId(id: str, id_type: str, course_name: str):
"""
This function downloads articles from Elsevier for a given DOI.
Modify the function to accept all sorts of IDs - pii, pubmed_id, eid
Args:
id: DOI, PII, EID, or Pubmed ID
id_type: type of ID - doi, pii, eid, pubmed_id
course_name: course name
"""

# create directory to store files
Expand All @@ -283,7 +302,7 @@ def downloadElsevierFulltextFromId(id: str, id_type: str, course_name: str):
return "No query parameters provided"

response = requests.get(url, headers=headers)
print("Response content type: ", response.headers)
#print("Response content type: ", response.headers)
if response.status_code != 200:
return "Error in download function: " + str(response.status_code) + " - " + response.text

Expand Down Expand Up @@ -323,7 +342,7 @@ def searchScopusArticles(course: str, search_str: str, title: str, pub: str, sub
# log start time
start_time = time.monotonic()

# create directory to store files
# create directory to store files locally
directory = os.path.join(os.getcwd(), 'elsevier_papers')
if not os.path.exists(directory):
os.makedirs(directory)
Expand Down Expand Up @@ -371,7 +390,7 @@ def searchScopusArticles(course: str, search_str: str, title: str, pub: str, sub
next_page_url = link['@href']
break

# multi-process all records in this page
# multi-process all records in this page - extract PII and call download function on it
records = search_response['search-results']['entry']
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(downloadElsevierFulltextFromId, record['pii'], 'pii', course) for record in records]
Expand All @@ -393,8 +412,9 @@ def searchScopusArticles(course: str, search_str: str, title: str, pub: str, sub
else:
search_response = response.json()

exit()
# after all records are downloaded, upload to supabase bucket

# after all records are downloaded, upload to supabase bucket
supabase_bucket_path = "publications/elsevier_journals/cell_host_and_mircobe"
try:
for root, directories, files in os.walk(directory):
for file in files:
Expand All @@ -403,8 +423,7 @@ def searchScopusArticles(course: str, search_str: str, title: str, pub: str, sub
upload_path = "elsevier_papers/" + file
try:
with open(filepath, "rb") as f:
res = SUPABASE_CLIENT.storage.from_("publications/elsevier_journals/cell_host_and_mircobe").upload(file=f, path=upload_path, file_options={"content-type": "application/pdf"})
print("Upload response: ", res)
res = SUPABASE_CLIENT.storage.from_(supabase_bucket_path).upload(file=f, path=upload_path, file_options={"content-type": "application/pdf"})
except Exception as e:
print("Error: ", e)

Expand All @@ -419,76 +438,111 @@ def searchScopusArticles(course: str, search_str: str, title: str, pub: str, sub
return "success"


def searchScienceDirectArticles(course: str, query: str, title: str, pub: str):
def searchScienceDirectArticles(course_name: str, search_str: str, article_title: str, publication_title: str):
"""
This function is used for a text-based search in ScienceDirect.
This function is used for a text-based search in ScienceDirect (Elsevier service).
1. Use ScienceDirect API to search for articles based on input parameters. Need to make a PUT request.
2. Extract PII/DOI from the results and call downloadElsevierFulltextFromDoi() to download the full-text to a local directory.
3. Upload the files to a supabase bucket.
Args:
course: course name
query: search query
title: article title
journal: journal title
search_str: free text search query - will search against all text fields in articles
article_title: title of the article or book chapter
publication_title: title of journal or book
"""
# log start time
start_time = time.monotonic()

# create directory to store files locally
directory = os.path.join(os.getcwd(), 'elsevier_papers')
if not os.path.exists(directory):
os.makedirs(directory)

# create payload for API request
data = {
"filter": {
"openAccess": True
},
"display": {
"offset": 0,
"show": 50
"offset": 430,
"show": 10
}
}

# read parameters from request
if query:
data["qs"] = query
if title:
data["title"] = title
if pub:
data["pub"] = pub

url = "https://api.elsevier.com/content/search/sciencedirect?"
headers = {'X-ELS-APIKey': ELSEVIER_API_KEY, 'Accept':'application/json'}

# read parameters from request - use quotation marks for exact match, need at least one of title, qs or author.
if search_str:
data["qs"] = search_str
if article_title:
data["title"] = "\"" + article_title + "\""
if publication_title:
data["pub"] = "\"" + publication_title + "\""
if not search_str:
data["qs"] = "\"" + publication_title + "\""

#data = json.dumps(data)
#data = json.loads(data)
print("Data: ", data)

url = "https://api.elsevier.com/content/search/sciencedirect"

#headers = {'X-ELS-APIKey': ELSEVIER_API_KEY, 'Accept':'application/json'}
headers = {'X-ELS-APIKey': "7f59af901d2d86f78a1fd60c1bf9426a", 'Accept':'application/json'}
response = requests.put(url, headers=headers, json=data)
print("Status: ", response.status_code)

if response.status_code != 200:
return "Error: " + str(response.status_code) + " - " + response.text

response_data = response.json()
results = response_data['results']
total_results = response_data['resultsFound']
print("Total results: ", total_results)
current_results = len(results)

# iterate through results and extract doi and pii
for result in results:
doi = result['doi']
#pii = result['pii']
if doi:
downloadElsevierFulltextFromId(id=doi, id_type='doi', course_name=course)
# elif pii:
# # download with pii
# pass

# paginate through results if total > current
while current_results < total_results:
total_records = response_data['resultsFound']
print("Total records: ", total_records)
current_records = 430

while current_records < total_records:
# iterate through results and extract pii
if 'results' not in response_data:
print("response_data: ", response.text)
print("headers: ", response.headers)
break
records = response_data['results']

data["display"]["offset"] += current_results
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(downloadElsevierFulltextFromId, record['pii'], 'pii', course_name) for record in records]

current_records += len(records)
print("Current records: ", current_records)

# update the offset parameter in data and call the API again
data["display"]["offset"] += current_records
response = requests.put(url, headers=headers, json=data)
print("Status: ", response.status_code)
response_data = response.json()
results = response_data['results']
current_results += len(results)
print("Current results: ", current_results)

# iterate through results and extract doi and pii
for result in results:
doi = result['doi']
#pii = result['pii']
if doi:
downloadElsevierFulltextFromId(id=doi, id_type='doi', course_name=course)
# elif pii:
# # download with pii
# pass

if response.status_code != 200:
return "Error: " + str(response.status_code) + " - " + response.text

response_data = response.json()

print(f"⏰ Total Download Time: {(time.monotonic() - start_time):.2f} seconds")

# after all records are downloaded, upload to supabase bucket
# supabase_bucket_path = "publications/elsevier_journals/trends_in_microbiology"
# try:
# for root, directories, files in os.walk(directory):
# for file in files:
# filepath = os.path.join(root, file)
# print("Uploading: ", file)
# upload_path = "elsevier_papers/" + file
# try:
# with open(filepath, "rb") as f:
# res = SUPABASE_CLIENT.storage.from_(supabase_bucket_path).upload(file=f, path=upload_path, file_options={"content-type": "application/pdf"})
# except Exception as e:
# print("Error: ", e)

# # remove local files
# shutil.rmtree(directory)
# except Exception as e:
# print("Error: ", e)

# log end time
print(f"⏰ Total Runtime: {(time.monotonic() - start_time):.2f} seconds")

return "success"


Expand Down
31 changes: 29 additions & 2 deletions ai_ta_backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from ai_ta_backend.journal_ingest import (get_arxiv_fulltext, downloadSpringerFulltext,
downloadElsevierFulltextFromId, getFromDoi,
downloadPubmedArticles, searchPubmedArticlesWithEutils,
searchScopusArticles)
searchScopusArticles, searchScienceDirectArticles)

# Sentry.io error logging
sentry_sdk.init(
Expand Down Expand Up @@ -832,7 +832,7 @@ def getPubmedArticleWithEutils():
return response

@app.route('/getScopusArticles', methods=['GET'])
def getScopusArticle() -> Response:
def getScopusArticles() -> Response:
"""
Download full-text article from Scopus
"""
Expand All @@ -859,6 +859,33 @@ def getScopusArticle() -> Response:
response.headers.add('Access-Control-Allow-Origin', '*')
return response

@app.route('/getScienceDirectArticles', methods=['GET'])
def getScienceDirectArticles() -> Response:
"""
Download full-text article from Scopus
"""
course_name = request.args.get('course_name', default='', type=str)
article_title = request.args.get('article_title', default='', type=str)
journal_title = request.args.get('journal_title', default='', type=str)
search_str = request.args.get('search_str', default='', type=str)


print("In /getScienceDirectArticles")

if (article_title == '' and journal_title == '' and search_str == '') or course_name == '':
# proper web error "400 Bad request"
abort(
400,
description=
f"Missing required parameters: 'article_title', 'journal_title' or 'search_str' and 'course_name' must be provided."
)

fulltext = searchScienceDirectArticles(course_name, search_str, article_title, journal_title)

response = jsonify(fulltext)
response.headers.add('Access-Control-Allow-Origin', '*')
return response


@app.route('/resource-report', methods=['GET'])
def resource_report() -> Response:
Expand Down

0 comments on commit c3c5a8f

Please sign in to comment.