Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Escape special chars to fix json parsing issue breaking ETL #260

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"files": null,
"lines": null
},
"generated_at": "2020-07-17T20:10:13Z",
"generated_at": "2021-06-28T22:02:53Z",
"plugins_used": [
{
"name": "AWSKeyDetector"
Expand Down Expand Up @@ -68,18 +68,34 @@
],
"Pipfile.lock": [
{
"hashed_secret": "02fa51340819a4f9ec82417c54803d425223d707",
"hashed_secret": "9896b32b0f97d1353b28277d29c91cad9b5584de",
"is_verified": false,
"line_number": 4,
"type": "Hex High Entropy String"
},
{
"hashed_secret": "f5f0ab1675c0f791c8c158f599126da62021c9f0",
"is_verified": false,
"line_number": 473,
"line_number": 578,
"type": "Hex High Entropy String"
}
],
"docker-compose.yml": [
{
"hashed_secret": "e727d1464ae12436e899a726da5b2f11d8381b26",
"is_verified": false,
"line_number": 89,
"type": "Secret Keyword"
}
],
"tests/gen3/tube/compose-creds.json": [
{
"hashed_secret": "afc848c316af1a89d49826c5ae9d00ed769415f3",
"is_verified": false,
"line_number": 5,
"type": "Secret Keyword"
}
],
"tests/gen3/tube/creds.json": [
{
"hashed_secret": "afc848c316af1a89d49826c5ae9d00ed769415f3",
Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,34 @@ The required field to be specified for each property is `name`. All `injecting_p
A `collector` etlMapping can use the special built-in `source_node` property to include the node in the dictionary that the entity comes from. For example, if a data file is from the `reference_file` node on the dictionary, the value of `source_node` will be `"reference_file"`.

This is useful if we are ETL-ing data files because in many dictionaries, data files can be located on one of several nodes, and sometimes it's helpful to know where each data file came from. For example, PFB export of data files in portal (https://github.com/uc-cdis/data-portal/pull/729) relies on `source_node` in order to tell the `pelican-export` job where in the dictionary to search for data files.

## Testing

All services required to run a local ETL are provided in `docker-compose.yml`

First, enable elasticsearch's virtual memory system requirements:

`sudo sysctl -w vm.max_map_count=262144`

Then, running

`docker-compose up --force-recreate --build`

will start an ETL.

By default, unit tests are setup to ensure correctness against a provided sheepdog dump.
Running `docker-compose exec tube python -m pytest tests/` will run unit tests against the completed ETL.

Optionally, set any of:

`EXPORT ETL_MAPPING=./{yourETLMapping}.yml`

`EXPORT DICTIONARY_URL="your_dictionary_url"`

`EXPORT METADATA_DB=./{your_sheepdog_db_dump}.sql`

to run ETL on non-default data. To acquire a sheepdog dump for a dev commons, run
`ssh {you}@cdistest.csoc < tests/dump_sheepdog.txt > out.sql`

Note, however, that many unit tests are written to the spec of default data
and may fail even after a successful ETL of any other data.
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ pytest-cov==2.5.1
codacy-coverage
mock==2.0.0
moto==1.1.24
-e git+https://github.com/uc-cdis/[email protected]#egg=cdisutilstest-0.2.5
-e git+https://github.com/uc-cdis/[email protected]#egg=cdisutilstest
elasticsearch==6.3.1
elasticsearch-dsl==6.2.1
115 changes: 115 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
version: '3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.4.2
container_name: elasticsearch
environment:
- cluster.name=elasticsearch-cluster
- bootstrap.memory_lock=false
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
ulimits:
memlock:
soft: -1
hard: -1
ports:
- 9200:9200
- 9300:9300
networks:
- devnet
healthcheck:
test: ["CMD-SHELL", "curl --silent --fail localhost:9200/_cluster/health || exit 1"]
interval: 30s
timeout: 30s
retries: 10
tube:
build: .
command: >
bash -c "
sleep 30 &&
pip install -r dev-requirements.txt &&
python run_config.py &&
python run_etl.py -s import &&
python run_etl.py -s transform &&
echo 'ETL COMPLETE' &&
while :; do sleep 1; done
"
networks:
- devnet
environment:
- "DICTIONARY_URL=${DICTIONARY_URL:-https://s3.amazonaws.com/dictionary-artifacts/ndhdictionary/master/schema.json}"
- ES_URL=elasticsearch
- ES_INDEX_NAME=etl
- HADOOP_URL=hdfs://spark:9000
- HADOOP_HOST=spark
volumes:
- "${ETL_MAPPING:-./tests/gen3/tube/etlMapping.yaml}:/usr/share/gen3/tube/etlMapping.yaml"
- ./tests/gen3/tube/compose-creds.json:/usr/share/gen3/tube/creds.json
- ./tests/gen3/tube/user.yaml:/usr/share/gen3/tube/user.yaml
- ./dev-requirements.txt:/tmp/dev-requirements.txt
depends_on:
elasticsearch:
condition: service_healthy
spark:
condition: service_healthy
spark:
image: "quay.io/cdis/gen3-spark:master"
command: bash -c "
python run_config.py &&
hdfs namenode -format &&
hdfs --daemon start namenode &&
hdfs --daemon start datanode &&
yarn --daemon start resourcemanager &&
yarn --daemon start nodemanager &&
hdfs dfsadmin -safemode leave &&
hdfs dfs -mkdir /result &&
echo READY &&
touch /tmp/ready.flag &&
while true; do sleep 5; done
"
expose:
- 22
- 8030
- 8031
- 8032
- 9000
networks:
- devnet
environment:
- HADOOP_URL=hdfs://0.0.0.0:9000
- HADOOP_HOST=0.0.0.0
mem_limit: 1g
healthcheck:
test: bash -c "[ -f /tmp/ready.flag ]"
interval: 2s
retries: 100
db:
image: postgres
restart: always
environment:
POSTGRES_PASSWORD: mypass
POSTGRES_HOST_AUTH_METHOD: trust
POSTGRES_DB: metadata_db
networks:
- devnet
expose:
- 5432
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U postgres" ]
interval: 1s
retries: 100
db_init:
image: postgres
depends_on:
db:
condition: service_healthy
volumes:
- "${METADATA_DB:-./tests/metadata_db.sql}:/tmp/metadata_db.sql"
networks:
- devnet
command: >
bash -c "
(dropdb -h db -U postgres metadata_db || true) && createdb -h db -U postgres metadata_db &&
psql -h db -d metadata_db -U postgres -f /tmp/metadata_db.sql
"
networks:
devnet:
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cdislogging==1.0.0
cryptography>=2.1.2
dictionaryutils==3.0.2
gdcdictionary==1.2.0
elasticsearch==6.3.1
elasticsearch-dsl==6.2.1
gen3datamodel==3.0.2
Expand Down
12 changes: 12 additions & 0 deletions tests/dump_sheepdog.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Usage: ssh {you}@cdistest.csoc < dump_sheepdog.txt > dump.sql

DB_HOST=$(cat Gen3Secrets/creds.json | jq -r .sheepdog.db_host)
DB_USERNAME=$(cat Gen3Secrets/creds.json | jq -r .sheepdog.db_username)
DB_PASSWORD=$(cat Gen3Secrets/creds.json | jq -r .sheepdog.db_password)
DB_DATABASE=$(cat Gen3Secrets/creds.json | jq -r .sheepdog.db_database)

PGPASSWORD=$DB_PASSWORD \
pg_dump \
-h $DB_HOST \
-U $DB_USERNAME \
-d $DB_DATABASE \
7 changes: 7 additions & 0 deletions tests/gen3/tube/compose-creds.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"db_host": "db",
"db_port": "5432",
"db_username": "postgres",
"db_password": "postgres",
"db_database": "metadata_db"
}
20 changes: 5 additions & 15 deletions tube/etl/indexers/base/lambdas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,15 @@
def extract_metadata(str_value):
"""
Get all fields in _props (strs[3] in hadoop file) and node_id field (strs[4])
:param str_value:
:param str_value: plaintext dump of one row in HDFS file
:return:
"""
origin_value = str_value
str_value = str_value.replace("'", "###")
str_value = str_value.replace('\\""', "##")
strs = ast.literal_eval(str_value.replace('""', "'"))
_, __, ___, props_text, node_id = ast.literal_eval(str_value)
try:
props = json.loads(
strs[3].replace("'", '"').replace("###", "'").replace("##", '\\"'),
strict=False,
)
props = json.loads(props_text, strict=False)
except Exception as ex:
raise Exception(
"ERROR IN SPARK: origin: {}, after replacing: {}".format(
origin_value, str_value
)
)
return tuple([strs[4], props])
raise Exception("ERROR IN SPARK when parsing _props json {}".format(props_text))
return tuple([node_id, props])


def extract_link(str_value):
Expand Down
4 changes: 4 additions & 0 deletions tube/importers/sql_to_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def import_all_tables_from_sql(cls, jdbc, username, password, output_dir, m):
"temp",
"--enclosed-by",
'"',
"--escaped-by",
"\\",
"--exclude-tables",
"transaction_documents,transaction_logs,transaction_snapshots,_voided_edges,_voided_nodes",
"--map-column-java",
Expand Down Expand Up @@ -76,6 +78,8 @@ def import_table_from_sql(cls, tb, jdbc, username, password, output_dir, m):
"temp",
"--enclosed-by",
'"',
"--escaped-by",
"\\",
"--map-column-java",
"_props=String,acl=String,_sysan=String,{}".format(optional_fields),
]
Expand Down
Loading