Skip to content

Commit

Permalink
data ingest working
Browse files Browse the repository at this point in the history
  • Loading branch information
zzeppozz committed Nov 17, 2023
1 parent e8245e2 commit 59f457a
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 66 deletions.
6 changes: 6 additions & 0 deletions .ipynb_checkpoints/Untitled-checkpoint.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"cells": [],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 5
}
6 changes: 6 additions & 0 deletions Untitled.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"cells": [],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 5
}
81 changes: 80 additions & 1 deletion _sphinx_config/pages/aws_experiments.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#######################
AWS workflow experiments
==========================
#######################

***************
AWS Batch
***************

Expand Down Expand Up @@ -94,3 +96,80 @@ Batch
* Command: Define the command that should be executed within the container.

* Submit AWS Batch Job

***************************
Glue - Interactive Development
***************************

`AWS Glue Studio with Jupyter
<https://docs.aws.amazon.com/glue/latest/dg/create-notebook-job.html>`_

`Local development with Jupyter
<https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-parquet-home.html>`_


Problem/Solution
--------------------
Interactive Samples fail with error (File Not Found) for public GBIF data

* Create database in AWS Glue for metadata about project inputs

* In the DB, create table for each data input, using Glue Crawler

Problem/Solution
--------------------

Interactive Data Preview fails for public and private data

* Use AWS Glue DataBrew to visually examine data

* First add dataset to Glue Data Catalog
"A table is the metadata definition that represents your data, including its
schema. A table can be used as a source or target in a job definition."
* Next add dataset to Glue DataBrew

Problem/Solution
--------------------
AWS Glue DataBrew add dataset, create connection to RDS, shows no tables in
bison-metadata database.


***************************
BISON AWS data/tools
***************************

* Amazon RDS, PostgreSQL, bison-db-test

* Create JDBC connection from Crawler, then change to Amazon RDS to bison-test-db/%

* AWS Glue Data Catalog

* bison-metadata Database, populated by
* AWS Glue Crawler, crawls data to create tables of metadata/schema

* GBIF Crawler to crawl GBIF Open Data Registry 11-2023 --> gbif-odr-occurrence_parquet table
* BISON RDS Crawler to crawl Amazon RDS bison-db-test --> bison-ref-?? table

Problem: fails with Permission Errors

"Crawler Error:
com.amazonaws.services.secretsmanager.model.AWSSecretsManagerException: Service
Principal: glue.amazonaws.com is not authorized to perform:
secretsmanager:GetSecretValue on resource: admin_bison-db-test because no
identity-based policy allows the secretsmanager:GetSecretValue action
(Service: AWSSecretsManager; Status Code: 400; Error Code:
AccessDeniedException; Request ID: bcb2d711-0c65-4976-815c-4bc3d6dd1e66; Proxy:
null). For more information, see Setting up IAM Permissions in the Developer
Guide (http://docs.aws.amazon.com/glue/latest/dg/getting-started-access.html)."

Added SecretManager policy to AWSGlueServiceRole-
"Crawler Error:
Crawler cannot be started. Verify the permissions in the policies attached to
the IAM role defined in the crawler. "

Does Glue Crawler only access S3?

Add policy to my user:
https://docs.aws.amazon.com/glue/latest/dg/configure-iam-for-glue.html
Added policy according to instructions in Step 3, verbatim -
Error:
81 changes: 46 additions & 35 deletions _sphinx_config/pages/year5.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,37 +89,9 @@ Data constants
SCINAME_FLD, SCIAUTHOR_FLD, RANK_FLD, ASSESSMENT_FLD, TAXON_AUTHORITY_FLD.


GBIF data options
----------------

**Option1:** Get a current version of GBIF data from the GBIF portal
* Create a user account on the GBIF website, then login and
* request the data by putting the following URL in a browser:
https://www.gbif.org/occurrence/search?country=US&has_coordinate=true&has_geospatial_issue=false&occurrence_status=present
* adding a restriction to occurrence data identified to species or a lower rank
will reduce the amount of data that will be filtered out.

Verify that the file occurrence.txt contains GBIF-annotated records that will be the
primary input file. The primary input file will contain fieldnames in the first line
of the file, and those listed as values for GBIF class attributes with (attribute)
names ending in _FLD or _KEY should all be among the fields.

The query will request a download, which will take some time for GBIF to assemble.
GBIF will send an email with a link for downloading the Darwin Core Archive, a
very large zipped file. Only the occurrence.txt file is required for data processing.
Rename the file with the date for clarity on what data is being used. Use
the following pattern gbif_yyyy-mm-dd.csv so that interim data filenames can be
created and parsed consistently. Note the underscore (_) between 'gbif' and the date, and
the dash (-) between date elements.

::

unzip <dwca zipfile> occurrence.txt
mv occurrence.txt gbif_2023-08-23.csv

**Option 2:** Use the GBIF Open Data Registry on AWS S3. The data contains a subset of
Darwin Core fields. More information is in `GBIF Ingestion`_ below.

GBIF Data
--------------
Ingested in Workflow

Data constants
^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -229,17 +201,56 @@ different data storage and processing strategies - they each have speed and cost
and cons.


GBIF Ingestion
---------------------------
* **Option 1:** Subset GBIF Open Data Registry to Bison S3 bucket, serverless, script
`glue_bison_subset_gbif.py <../../scripts/glue_bison_subset_gbif.py>`_
GBIF Data Ingestion
--------------------


* **Option 1:** Using **S3 Select**, subset GBIF Open Data Registry to Bison S3 bucket,
serverless, script `glue_bison_subset_gbif.py
<../../scripts/glue_bison_subset_gbif.py>`_ `AWS S3 Select Doc
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/selecting-content-from-objects.html>`_
and `blog post
<https://aws.amazon.com/blogs/storage/querying-data-without-servers-or-databases-using-amazon-s3-select/>`_
* **Option 2:** Query GBIF portal manually, then initiate an EC2 Spot instance to
download and subset it, saving it to S3. The script that downloads, subsets, and
uploads data from the EC2 Spot instance is installed on the EC2
instance on creation, `user_data_for_ec2spot.py
<../../scripts/user_data_for_ec2spot.py>`_. The script that builds and instantiates
the EC2 Spot instance is: `gbif_to_s3.py <../../scripts/gbif_to_s3.py>`_ .


GBIF data options
----------------

**Option1:** Get a current version of GBIF data from the GBIF portal
* Create a user account on the GBIF website, then login and
* request the data by putting the following URL in a browser:
https://www.gbif.org/occurrence/search?country=US&has_coordinate=true&has_geospatial_issue=false&occurrence_status=present
* adding a restriction to occurrence data identified to species or a lower rank
will reduce the amount of data that will be filtered out.

Verify that the file occurrence.txt contains GBIF-annotated records that will be the
primary input file. The primary input file will contain fieldnames in the first line
of the file, and those listed as values for GBIF class attributes with (attribute)
names ending in _FLD or _KEY should all be among the fields.

The query will request a download, which will take some time for GBIF to assemble.
GBIF will send an email with a link for downloading the Darwin Core Archive, a
very large zipped file. Only the occurrence.txt file is required for data processing.
Rename the file with the date for clarity on what data is being used. Use
the following pattern gbif_yyyy-mm-dd.csv so that interim data filenames can be
created and parsed consistently. Note the underscore (_) between 'gbif' and the date, and
the dash (-) between date elements.

::

unzip <dwca zipfile> occurrence.txt
mv occurrence.txt gbif_2023-08-23.csv

**Option 2:** Use the GBIF Open Data Registry on AWS S3. The data contains a subset of
Darwin Core fields. More information is in `GBIF Ingestion`_ below.


Reference Data
-----------------
Reference data consists of US-RIIS data and geospatial data for intersections.
Expand Down
105 changes: 75 additions & 30 deletions scripts/glue_bison_subset_gbif.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
"""Glue script to pull data subset from AWS GBIF Open Data Registry and write to S3."""
import datetime as DT
import sys
from awsglue.transforms import Filter
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
import datetime as DT
from pyspark import SparkConf
from pyspark.context import SparkContext
import sys
from awsglue.context import GlueContext
from awsglue.job import Job

region = "us-east-1"
bison_path = f"s3://bison-321942852011-{region}"
gbif_path = f"s3://gbif-open-data-{region}/occurrence"
bison_bucket = "s3://bison-321942852011-us-east-1/"
gbif_bucket = "s3://gbif-open-data-us-east-1/"

n = DT.datetime.now()
datastr = f"{n.year}-{n.month}-01"

gbif_s3_fullname = f"{gbif_path}/{datastr}/occurrence.parquet/"
bison_s3_fullname = f"{bison_path}/raw_data/gbif_{datastr}.parquet/"
bison_s3_fullname = f"{bison_bucket}/raw_data/gbif_{datastr}.parquet/"
gbif_s3_fullname = f"{gbif_bucket}/occurrence/{datastr}/occurrence.parquet/"

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

conf = SparkConf()
conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED")
conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")
conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED")
conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
conf.set("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED")
conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
conf.set("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED")
conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")

sc = SparkContext.getOrCreate(conf=conf)
glueContext = GlueContext(sc)
Expand All @@ -33,34 +31,81 @@
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Input from GBIF S3 bucket
gbif_full_data = glueContext.create_dynamic_frame.from_options(
# # Small test 5000 record dataset
# bison_s3_fullname = f"{bison_path}/raw_data/gbif_5k_{datastr}.parquet/"
# gbif_full_data = glueContext.create_sample_dynamic_frame_from_options(
# connection_type="s3",
# connection_options={
# "paths": [ gbif_s3_fullname ],
# "recurse": True
# },
# num=5000,
# format="parquet",
# transformation_ctx="S3bucket_node_gbif"
# )

# Full dataset
gbif_dynf = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={
"paths": [gbif_s3_fullname],
"recurse": True,
# 'groupFiles': 'inPartition',
# 'groupSize': '1073741824'
},
transformation_ctx="S3bucket_node_gbif",
# transformation_ctx="S3bucket_node_gbif",
)
print(f"Read GBIF {gbif_s3_fullname} with {gbif_dynf.count()} records.")

# Create filtered DynamicFrame with custom lambda to filter records
gbif_subset = Filter.apply(
frame=gbif_full_data,
f=lambda x: x["countrycode"] == "US" and x["occurrencestatus"] == "PRESENT" and x["taxonrank"] in ["species", "subspecies", "variety", "form", "infraspecific_name", "infrasubspecific_name"] and "COORDINATE_INVALID" not in x["issue"] and "COORDINATE_OUT_OF_RANGE" not in x["issue"] and "COORDINATE_REPROJECTION_FAILED" not in x["issue"] and "COORDINATE_REPROJECTION_SUSPICIOUS" not in x["issue"])
gbif_filtered_dynf = gbif_dynf.filter(
f=lambda x: x["countrycode"] == "US" and x["occurrencestatus"] == "PRESENT" and x["taxonrank"] in ["SPECIES", "SUBSPECIES", "VARIETY", "FORM", "INFRASPECIFIC_NAME", "INFRASUBSPECIFIC_NAME"])
print(f"Filtered GBIF to dynamic frame with {gbif_filtered_dynf.count()} records.")

# Output to BISON S3 bucket
bison_full_data = glueContext.write_dynamic_frame.from_options(
frame=gbif_subset,
glueContext.write_dynamic_frame.from_options(
frame=gbif_filtered_dynf,
connection_type="s3",
connection_options={"path": bison_s3_fullname},
format="parquet",
connection_options={
"path": bison_s3_fullname,
"compression": "snappy",
"partitionKeys": []
},
transformation_ctx="S3bucket_node_bison",
format_options={
"useGlueParquetWriter": True,
"compression": "snappy"
}
)
print(f"Wrote dynamic frame to {bison_s3_fullname}.")

# # Create filtered DynamicFrame with custom lambda to filter records
# gbif_subset = Filter.apply(
# frame = gbif_full_data,
# f=lambda row: (
# bool(bool(re.match("US", row["countrycode"]))
# and bool(re.match("PRESENT", row["occurrencestatus"]))
# and bool(
# re.match("SPECIES", row["taxonrank"])
# or bool(re.match("SUBSPECIES", row["taxonrank"]))
# or bool(re.match("VARIETY", row["taxonrank"]))
# or bool(re.match("FORM", row["taxonrank"]))
# or bool(re.match("INFRASPECIFIC_NAME", row["taxonrank"]))
# or bool(re.match("INFRASUBSPECIFIC_NAME", row["taxonrank"]))
# )
# )
# ))


# # Output to BISON S3 bucket
# bison_full_data = glueContext.write_dynamic_frame.from_options(
# frame=gbif_subset,
# connection_type="s3",
# connection_options={
# "path": bison_s3_fullname,
# },
# format="parquet",
# format_options={
# "useGlueParquetWriter": True,
# "compression": "snappy"
# },
# transformation_ctx="S3bucket_node_bison",
# )

job.commit()
Loading

0 comments on commit 59f457a

Please sign in to comment.