From 65ff0631602705d7598ae4de4048c3cf72e205e9 Mon Sep 17 00:00:00 2001 From: James McLaughlin Date: Wed, 11 Sep 2024 21:44:40 +0100 Subject: [PATCH] add display types to nodes based on rarest type property --- 04_index/grebi_index/src/main.rs | 16 ++++++- 05_materialise/grebi_materialise/src/main.rs | 47 +++++++++++++++---- .../grebi_make_neo_csv/src/main.rs | 13 ++++- .../grebi_make_solr/src/main.rs | 1 + .../subgraph_configs/ebi_full_monarch.json | 17 +------ grebi_shared/src/slice_merged_entity.rs | 11 ++++- nextflow/01_create_subgraph.nf | 28 +++++------ scripts/dataload_saturos.sh | 2 +- 8 files changed, 89 insertions(+), 46 deletions(-) diff --git a/04_index/grebi_index/src/main.rs b/04_index/grebi_index/src/main.rs index 3606657..e030e25 100644 --- a/04_index/grebi_index/src/main.rs +++ b/04_index/grebi_index/src/main.rs @@ -53,6 +53,7 @@ fn main() { let mut entity_props_to_count:HashMap,i64> = HashMap::new(); let mut edge_props_to_count:HashMap,i64> = HashMap::new(); + let mut types_to_count:HashMap,i64> = HashMap::new(); let mut all_names:BTreeSet> = BTreeSet::new(); let mut all_ids:BTreeSet> = BTreeSet::new(); @@ -106,7 +107,9 @@ fn main() { w_count = entity_props_to_count.get_mut(prop_key); let count:&mut i64 = w_count.unwrap(); - if prop_key.eq(b"grebi:type") || prop_key.eq(b"grebi:datasources") || prop_key.eq(b"id") { + let is_type = prop_key.eq(b"grebi:type"); + + if is_type || prop_key.eq(b"grebi:datasources") || prop_key.eq(b"id") { metadata_writer.write_all(r#",""#.as_bytes()).unwrap(); metadata_writer.write_all(&prop_key).unwrap(); metadata_writer.write_all(r#"":["#.as_bytes()).unwrap(); @@ -118,6 +121,10 @@ fn main() { } else { metadata_writer.write_all(r#","#.as_bytes()).unwrap(); } + if is_type && val.kind == JsonTokenType::StartString { + let typecount = types_to_count.entry(val.value[1..val.value.len()-1].to_vec()).or_insert(0); + *typecount += 1; + } metadata_writer.write_all(val.value).unwrap(); } } @@ -218,7 +225,12 @@ fn main() { return (String::from_utf8(k.to_vec()).unwrap(), json!({ "count": v })) - }).collect::>() + }).collect::>(), + "types": types_to_count.iter().map(|(k,v)| { + return (String::from_utf8(k.to_vec()).unwrap(), json!({ + "count": v + })) + }).collect::>(), })).unwrap().as_bytes()).unwrap(); for name in all_names { diff --git a/05_materialise/grebi_materialise/src/main.rs b/05_materialise/grebi_materialise/src/main.rs index 3d41ff9..e1db6bb 100644 --- a/05_materialise/grebi_materialise/src/main.rs +++ b/05_materialise/grebi_materialise/src/main.rs @@ -46,6 +46,9 @@ struct Args { #[arg(long)] in_metadata_jsonl: String, + #[arg(long)] + in_summary_json: String, + #[arg(long)] out_edges_jsonl: String, @@ -111,6 +114,14 @@ fn main() -> std::io::Result<()> { let node_metadata = load_metadata_mapping_table::load_metadata_mapping_table(&args.in_metadata_jsonl); + let mut types_to_count:HashMap,i64> = HashMap::new(); + { + let summary_json:Map = serde_json::from_reader(File::open(&args.in_summary_json).unwrap()).unwrap(); + for (k, v) in summary_json["types"].as_object().unwrap() { + types_to_count.insert(k.as_bytes().to_vec(), v.as_object().unwrap()["count"].as_i64().unwrap()); + } + } + let stdin = io::stdin().lock(); let mut reader = BufReader::new(stdin); @@ -149,20 +160,31 @@ fn main() -> std::io::Result<()> { eprintln!("... written {} nodes", n_nodes); } + let mut rarest_type:Option> = None; + let mut rarest_type_count:i64 = std::i64::MAX; + sliced.props.iter().for_each(|prop| { let prop_key = prop.key; - if prop_key.eq(b"grebi:type") { - for val in &prop.values { - if val.kind == JsonTokenType::StartString { - let buf = &val.value.to_vec(); - let str = JsonParser::parse(&buf).string(); - all_types.insert(str.to_vec()); - } - } + if prop_key.eq(b"grebi:type") { + for val in &prop.values { + if val.kind == JsonTokenType::StartString { + let buf = &val.value.to_vec(); + let str = JsonParser::parse(&buf).string(); + all_types.insert(str.to_vec()); + + let count = types_to_count.get(str); + if count.is_some() { + if *count.unwrap() < rarest_type_count { + rarest_type = Some(str.to_vec()); + rarest_type_count = *count.unwrap(); + } + } + } + } - } + } all_entity_props.insert(prop_key.to_vec()); @@ -187,6 +209,11 @@ fn main() -> std::io::Result<()> { }; nodes_writer.write_all(&line[0..line.len()-1] /* skip closing bracket */).unwrap(); + if rarest_type.is_some() { + nodes_writer.write_all(b",\"grebi:displayType\":\"").unwrap(); + nodes_writer.write_all(&rarest_type.unwrap()).unwrap(); + nodes_writer.write_all(b"\"").unwrap(); + } nodes_writer.write_all(b",\"_refs\":").unwrap(); nodes_writer.write_all(serde_json::to_string(&_refs).unwrap().as_bytes()).unwrap(); nodes_writer.write_all(b"}\n").unwrap(); @@ -225,7 +252,7 @@ fn main() -> std::io::Result<()> { summary_writer.write_all(serde_json::to_string_pretty(&json!({ "entity_prop_defs": entity_prop_defs, "edge_prop_defs": edge_prop_defs, - "type_defs": type_defs, + "types": type_defs, "edges": edge_summary })).unwrap().as_bytes()).unwrap(); diff --git a/06_prepare_db_import/grebi_make_neo_csv/src/main.rs b/06_prepare_db_import/grebi_make_neo_csv/src/main.rs index 02c793b..188314b 100644 --- a/06_prepare_db_import/grebi_make_neo_csv/src/main.rs +++ b/06_prepare_db_import/grebi_make_neo_csv/src/main.rs @@ -92,7 +92,7 @@ fn main() -> std::io::Result<()> { - nodes_writer.write_all("grebi:nodeId:ID,:LABEL,grebi:datasources:string[],grebi:subgraph:string".as_bytes()).unwrap(); + nodes_writer.write_all("grebi:nodeId:ID,:LABEL,grebi:datasources:string[],grebi:subgraph:string,grebi:displayType:string".as_bytes()).unwrap(); for prop in &all_entity_props { nodes_writer.write_all(b",").unwrap(); nodes_writer.write_all(prop.as_bytes()).unwrap(); @@ -204,8 +204,16 @@ fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&HashSet, nodes_writer:&mut BufWriter<&File>) { k.eq("grebi:from") || k.eq("grebi:to") || k.eq("grebi:subgraph") || + k.eq("grebi:displayType") || ( k.eq("grebi:type") && !v.is_array() /* edge types are singular */ ) { out_json.insert(escape_key(k), v.clone()); diff --git a/configs/subgraph_configs/ebi_full_monarch.json b/configs/subgraph_configs/ebi_full_monarch.json index 3acebad..4910991 100644 --- a/configs/subgraph_configs/ebi_full_monarch.json +++ b/configs/subgraph_configs/ebi_full_monarch.json @@ -61,21 +61,6 @@ "ols:ontologyId" ], "datasource_configs": [ - "./configs/datasource_configs/gwas.json", - "./configs/datasource_configs/hgnc.json", - "./configs/datasource_configs/impc.json", - "./configs/datasource_configs/sssom.json", - "./configs/datasource_configs/ols.json", - "./configs/datasource_configs/reactome.json", - "./configs/datasource_configs/ubergraph.json", - "./configs/datasource_configs/otar.json", - "./configs/datasource_configs/monarch.json", - "./configs/datasource_configs/metabolights.json", - "./configs/datasource_configs/mondo_efo.json", - "./configs/datasource_configs/hett_pesticides_appril.json", - "./configs/datasource_configs/hett_pesticides_eu.json", - "./configs/datasource_configs/hett_pesticides_gb.json", - "./configs/datasource_configs/aopwiki.json", - "./configs/datasource_configs/chembl.json" + "./configs/datasource_configs/reactome.json" ] } diff --git a/grebi_shared/src/slice_merged_entity.rs b/grebi_shared/src/slice_merged_entity.rs index 5dc5060..57fe99f 100644 --- a/grebi_shared/src/slice_merged_entity.rs +++ b/grebi_shared/src/slice_merged_entity.rs @@ -23,7 +23,8 @@ pub struct SlicedEntity<'a> { pub datasources:Vec<&'a [u8]>, pub subgraph:&'a [u8], pub props:Vec>, - pub _refs:Option<&'a [u8]> + pub _refs:Option<&'a [u8]>, + pub display_type:Option<&'a [u8]> } impl<'a> SlicedEntity<'a> { @@ -34,6 +35,7 @@ impl<'a> SlicedEntity<'a> { let mut props:Vec = Vec::new(); let mut entity_datasources:Vec<&[u8]> = Vec::new(); + let mut display_type:Option<&[u8]> = None; let mut _refs:Option<&[u8]> = None; // { @@ -62,6 +64,11 @@ impl<'a> SlicedEntity<'a> { let prop_key = parser.name(); + if prop_key == b"grebi:displayType" { + display_type = Some(&parser.value()); + continue; + } + if prop_key == b"_refs" { _refs = Some(&parser.value()); continue; @@ -106,7 +113,7 @@ impl<'a> SlicedEntity<'a> { - return SlicedEntity { id, datasources: entity_datasources, subgraph, props, _refs }; + return SlicedEntity { id, datasources: entity_datasources, subgraph, props, display_type, _refs }; } diff --git a/nextflow/01_create_subgraph.nf b/nextflow/01_create_subgraph.nf index b6011f4..59b005f 100644 --- a/nextflow/01_create_subgraph.nf +++ b/nextflow/01_create_subgraph.nf @@ -27,14 +27,15 @@ workflow { Channel.value(config.bytes_per_merged_file)) indexed = index(merged.collect()) - materialise(merged.flatten(), indexed.metadata_jsonl, Channel.value(config.exclude_edges + config.identifier_props), Channel.value(config.exclude_self_referential_edges + config.identifier_props), groups_txt) - merge_summary_jsons(indexed.prop_summary_json.collect() + materialise.out.edge_summary.collect()) + + materialise(merged.flatten(), indexed.metadata_jsonl, indexed.summary_json, Channel.value(config.exclude_edges + config.identifier_props), Channel.value(config.exclude_self_referential_edges + config.identifier_props), groups_txt) + merge_summary_jsons(indexed.summary_json.collect() + materialise.out.mat_summary.collect()) materialised_nodes_and_edges = materialise.out.nodes.collect() + materialise.out.edges.collect() rocks_db = create_rocks(materialised_nodes_and_edges) - neo_input_dir = prepare_neo(indexed.prop_summary_json, materialise.out.nodes, materialise.out.edges) + neo_input_dir = prepare_neo(indexed.summary_json, materialise.out.nodes, materialise.out.edges) ids_csv = create_neo_ids_csv(indexed.ids_txt) neo_db = create_neo( @@ -208,7 +209,7 @@ process index { output: path("metadata.jsonl"), emit: metadata_jsonl - path("prop_summary.json"), emit: prop_summary_json + path("summary.json"), emit: summary_json path("names.txt"), emit: names_txt path("ids_${params.subgraph}.txt"), emit: ids_txt @@ -220,7 +221,7 @@ process index { | ${params.home}/target/release/grebi_index \ --subgraph-name ${params.subgraph} \ --out-metadata-jsonl-path metadata.jsonl \ - --out-summary-json-path prop_summary.json \ + --out-summary-json-path summary.json \ --out-names-txt names.txt \ --out-ids-txt ids_${params.subgraph}.txt """ @@ -237,6 +238,7 @@ process materialise { input: path(merged_filename) path(metadata_jsonl) + path(index_summary_json) val(exclude) val(exclude_self_referential) path(groups_txt) @@ -244,7 +246,7 @@ process materialise { output: path("materialised_nodes_${task.index}.jsonl"), emit: nodes path("materialised_edges_${task.index}.jsonl"), emit: edges - path("edge_summary_${task.index}.json"), emit: edge_summary + path("mat_summary_${task.index}.json"), emit: mat_summary script: """ @@ -253,9 +255,10 @@ process materialise { cat ${merged_filename} \ | ${params.home}/target/release/grebi_materialise \ --in-metadata-jsonl ${metadata_jsonl} \ + --in-summary-json ${index_summary_json} \ --groups-txt ${groups_txt} \ --out-edges-jsonl materialised_edges_${task.index}.jsonl \ - --out-summary-json edge_summary_${task.index}.json \ + --out-summary-json mat_summary_${task.index}.json \ --exclude ${exclude.iterator().join(",")} \ --exclude-self-referential ${exclude_self_referential.iterator().join(",")} \ > materialised_nodes_${task.index}.jsonl @@ -266,9 +269,6 @@ process merge_summary_jsons { cache "lenient" memory "8 GB" time "1h" - //time { 1.hour + 8.hour * (task.attempt-1) } - //errorStrategy { task.exitStatus in 137..140 ? 'retry' : 'terminate' } - //maxRetries 5 publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true @@ -321,7 +321,7 @@ process prepare_neo { publishDir "${params.tmp}/${params.config}/${params.subgraph}/neo4j_csv", overwrite: true input: - path(prop_summary_json) + path(summary_json) path(nodes_jsonl) path(edges_jsonl) @@ -335,7 +335,7 @@ process prepare_neo { #!/usr/bin/env bash set -Eeuo pipefail ${params.home}/target/release/grebi_make_neo_csv \ - --in-summary-jsons ${prop_summary_json} \ + --in-summary-jsons ${summary_json} \ --in-nodes-jsonl ${nodes_jsonl} \ --in-edges-jsonl ${edges_jsonl} \ --out-nodes-csv-path neo_nodes_${params.subgraph}_${task.index}.csv \ @@ -435,7 +435,7 @@ process create_solr_nodes_core { set -Eeuo pipefail python3 ${params.home}/06_prepare_db_import/make_solr_config.py \ --subgraph-name ${params.subgraph} \ - --in-summary-json ${params.tmp}/${params.config}/${params.subgraph}/prop_summary.json \ + --in-summary-json ${params.tmp}/${params.config}/${params.subgraph}/summary.json \ --in-template-config-dir ${params.home}/06_prepare_db_import/solr_config_template \ --out-config-dir solr_config python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \ @@ -464,7 +464,7 @@ process create_solr_edges_core { set -Eeuo pipefail python3 ${params.home}/06_prepare_db_import/make_solr_config.py \ --subgraph-name ${params.subgraph} \ - --in-summary-json ${params.tmp}/${params.config}/${params.subgraph}/prop_summary.json \ + --in-summary-json ${params.tmp}/${params.config}/${params.subgraph}/summary.json \ --in-template-config-dir ${params.home}/06_prepare_db_import/solr_config_template \ --out-config-dir solr_config python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \ diff --git a/scripts/dataload_saturos.sh b/scripts/dataload_saturos.sh index c4a80f1..d1ad0d5 100755 --- a/scripts/dataload_saturos.sh +++ b/scripts/dataload_saturos.sh @@ -1,7 +1,7 @@ #!/bin/bash export GREBI_HOME=/home/james/grebi export GREBI_TMP=/data/grebi_tmp -export GREBI_CONFIG=hett_only +export GREBI_CONFIG=ebi export GREBI_IS_EBI=false export GREBI_TIMESTAMP=$(date +%Y_%m_%d__%H_%M) cd $GREBI_TMP