From 38a9c8c3b1425b2c730126f33cd3218fc992736b Mon Sep 17 00:00:00 2001 From: jamesamcl Date: Tue, 30 Jul 2024 00:47:52 +0100 Subject: [PATCH] Assign ids to everything not just equivalence groups (#7) --- .../src/write_associations.rs | 1 - .../grebi_ingest_gwas/src/write_studies.rs | 3 +- 01_ingest/grebi_ingest_ols/src/main.rs | 11 - 01_ingest/grebi_ingest_reactome/src/main.rs | 1 - 01_ingest/grebi_ingest_sssom/src/main.rs | 1 - .../grebi_normalise_prefixes/src/main.rs | 2 +- .../grebi_assign_ids/Cargo.toml | 0 .../grebi_assign_ids/build.rs | 0 .../grebi_assign_ids/src/main.rs | 51 +++-- .../grebi_extract_identifiers}/Cargo.toml | 2 +- .../grebi_extract_identifiers}/build.rs | 0 .../grebi_extract_identifiers}/src/main.rs | 52 ++--- .../grebi_identifiers2groups}/Cargo.toml | 2 +- .../grebi_identifiers2groups}/build.rs | 0 .../grebi_identifiers2groups/src/main.rs | 177 +++++++++++++++++ .../grebi_equivalences2groups/src/main.rs | 188 ------------------ Cargo.lock | 10 +- Cargo.toml | 6 +- .../subgraph_configs/ebi_full_monarch.json | 3 +- configs/subgraph_configs/hett.json | 3 +- configs/subgraph_configs/hra_kg.json | 3 +- configs/subgraph_configs/monarch.json | 3 +- grebi_shared/src/lib.rs | 22 -- nextflow/01_create_subgraph.nf | 28 +-- 24 files changed, 263 insertions(+), 306 deletions(-) rename {02_equivalences => 02_assign_ids}/grebi_assign_ids/Cargo.toml (100%) rename {02_equivalences => 02_assign_ids}/grebi_assign_ids/build.rs (100%) rename {02_equivalences => 02_assign_ids}/grebi_assign_ids/src/main.rs (80%) rename {02_equivalences/grebi_extract_equivalences => 02_assign_ids/grebi_extract_identifiers}/Cargo.toml (89%) rename {02_equivalences/grebi_equivalences2groups => 02_assign_ids/grebi_extract_identifiers}/build.rs (100%) rename {02_equivalences/grebi_extract_equivalences => 02_assign_ids/grebi_extract_identifiers}/src/main.rs (62%) rename {02_equivalences/grebi_equivalences2groups => 02_assign_ids/grebi_identifiers2groups}/Cargo.toml (90%) rename {02_equivalences/grebi_extract_equivalences => 02_assign_ids/grebi_identifiers2groups}/build.rs (100%) create mode 100644 02_assign_ids/grebi_identifiers2groups/src/main.rs delete mode 100644 02_equivalences/grebi_equivalences2groups/src/main.rs diff --git a/01_ingest/grebi_ingest_gwas/src/write_associations.rs b/01_ingest/grebi_ingest_gwas/src/write_associations.rs index fe3a013..4e576c0 100644 --- a/01_ingest/grebi_ingest_gwas/src/write_associations.rs +++ b/01_ingest/grebi_ingest_gwas/src/write_associations.rs @@ -5,7 +5,6 @@ use std::io::{BufWriter, self, BufReader, StdinLock, StdoutLock, Write}; use std::ptr::eq; use grebi_shared::prefix_map::PrefixMap; use grebi_shared::prefix_map::PrefixMapBuilder; -use grebi_shared::serialize_equivalence; use serde_json::{json, Value}; use crate::check_headers::check_headers; diff --git a/01_ingest/grebi_ingest_gwas/src/write_studies.rs b/01_ingest/grebi_ingest_gwas/src/write_studies.rs index 03e8e6c..60a952a 100644 --- a/01_ingest/grebi_ingest_gwas/src/write_studies.rs +++ b/01_ingest/grebi_ingest_gwas/src/write_studies.rs @@ -5,7 +5,6 @@ use std::io::{BufWriter, self, BufReader, StdinLock, StdoutLock, Write}; use std::ptr::eq; use grebi_shared::prefix_map::PrefixMap; use grebi_shared::prefix_map::PrefixMapBuilder; -use grebi_shared::serialize_equivalence; use serde_json::json; use crate::check_headers::check_headers; @@ -106,4 +105,4 @@ pub fn write_studies(csv_reader: &mut csv::Reader>,nodes_wr } } -} \ No newline at end of file +} diff --git a/01_ingest/grebi_ingest_ols/src/main.rs b/01_ingest/grebi_ingest_ols/src/main.rs index d78df6d..59882dc 100644 --- a/01_ingest/grebi_ingest_ols/src/main.rs +++ b/01_ingest/grebi_ingest_ols/src/main.rs @@ -6,7 +6,6 @@ use std::ptr::eq; use clap::Parser; use grebi_shared::prefix_map::PrefixMap; use grebi_shared::prefix_map::PrefixMapBuilder; -use grebi_shared::serialize_equivalence; use struson::reader::{JsonReader, JsonStreamReader, ValueType}; use serde_json::Value; use serde_json::Map; @@ -146,16 +145,6 @@ fn read_ontology(json: &mut JsonStreamReader>>, output_n } -const EQUIV_PREDICATES :[&str;2]= [ - "owl:equivalentClass", - "owl:equivalentProperty", - // "owl:sameAs", - // "skos:exactMatch", - // "oboinowl:hasAlternativeId", - // "uniprot:replaces", - // "iao:0100001" // -> replacement term -]; - fn read_entities(json: &mut JsonStreamReader>>, output_nodes: &mut BufWriter, datasource:&String, grebitype:&str, defining_only:bool) { json.begin_array().unwrap(); while json.has_next().unwrap() { diff --git a/01_ingest/grebi_ingest_reactome/src/main.rs b/01_ingest/grebi_ingest_reactome/src/main.rs index 073da61..4239e33 100644 --- a/01_ingest/grebi_ingest_reactome/src/main.rs +++ b/01_ingest/grebi_ingest_reactome/src/main.rs @@ -7,7 +7,6 @@ use std::env; use clap::Parser; use grebi_shared::prefix_map::PrefixMap; use grebi_shared::prefix_map::PrefixMapBuilder; -use grebi_shared::serialize_equivalence; use serde_json::json; use serde_json::Value; diff --git a/01_ingest/grebi_ingest_sssom/src/main.rs b/01_ingest/grebi_ingest_sssom/src/main.rs index a9ff4e6..c7cb96c 100644 --- a/01_ingest/grebi_ingest_sssom/src/main.rs +++ b/01_ingest/grebi_ingest_sssom/src/main.rs @@ -6,7 +6,6 @@ use std::ptr::eq; use clap::Parser; use grebi_shared::prefix_map::PrefixMap; use grebi_shared::prefix_map::PrefixMapBuilder; -use grebi_shared::serialize_equivalence; use serde_json::json; use serde_yaml; diff --git a/01_ingest/grebi_normalise_prefixes/src/main.rs b/01_ingest/grebi_normalise_prefixes/src/main.rs index 8682ccc..63713db 100644 --- a/01_ingest/grebi_normalise_prefixes/src/main.rs +++ b/01_ingest/grebi_normalise_prefixes/src/main.rs @@ -7,7 +7,7 @@ use std::{env, io}; use std::io::{BufRead, BufReader }; use std::io::{Write, BufWriter}; -use grebi_shared::{get_subject, find_strings, serialize_equivalence, json_parser, json_lexer}; +use grebi_shared::{get_subject, find_strings, json_parser, json_lexer}; use grebi_shared::prefix_map::PrefixMap; use grebi_shared::prefix_map::PrefixMapBuilder; diff --git a/02_equivalences/grebi_assign_ids/Cargo.toml b/02_assign_ids/grebi_assign_ids/Cargo.toml similarity index 100% rename from 02_equivalences/grebi_assign_ids/Cargo.toml rename to 02_assign_ids/grebi_assign_ids/Cargo.toml diff --git a/02_equivalences/grebi_assign_ids/build.rs b/02_assign_ids/grebi_assign_ids/build.rs similarity index 100% rename from 02_equivalences/grebi_assign_ids/build.rs rename to 02_assign_ids/grebi_assign_ids/build.rs diff --git a/02_equivalences/grebi_assign_ids/src/main.rs b/02_assign_ids/grebi_assign_ids/src/main.rs similarity index 80% rename from 02_equivalences/grebi_assign_ids/src/main.rs rename to 02_assign_ids/grebi_assign_ids/src/main.rs index 8ac2b7f..b597827 100644 --- a/02_equivalences/grebi_assign_ids/src/main.rs +++ b/02_assign_ids/grebi_assign_ids/src/main.rs @@ -18,7 +18,7 @@ use grebi_shared::find_strings; struct Args { #[arg(long)] - add_prefix: String, // used to prepend the subgraph name like hra_kg:g: + identifier_properties:String, #[arg(long)] groups_txt: String, @@ -34,9 +34,15 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; fn main() { let args = Args::parse(); - let preserve_fields:HashSet> = args.preserve_field.iter().map(|x| x.as_bytes().to_vec()).collect(); - let add_prefix = args.add_prefix; + + let mut id_props:HashSet> = HashSet::new(); + for prop in args.identifier_properties.split(",") { + id_props.insert(prop.as_bytes().to_vec()); + } + + + let preserve_fields:HashSet> = args.preserve_field.iter().map(|x| x.as_bytes().to_vec()).collect(); let id_to_group:HashMap, Vec> = { @@ -95,29 +101,32 @@ fn main() { while json.peek().kind != JsonTokenType::EndObject { let prop_key = json.name(); - if prop_key == b"id" { - id = Some(json.string()); + // any of the IDs will do, we only need one + // as all identifiers map to the same group + // + if id_props.contains(prop_key) { + // TODO handle the same cases as the id extraction does + if json.peek().kind == JsonTokenType::StartArray { + json.begin_array(); + id = Some(json.string()); + } else { + id = Some(json.string()); + } + break; } else { json.value(); // skip } } let group = id_to_group.get(id.unwrap()); - if group.is_some() { - - // the subject mapped to an equivalence group - writer.write_all("{\"grebi:nodeId\":\"".as_bytes()).unwrap(); - writer.write_all(add_prefix.as_bytes()).unwrap(); - writer.write_all(group.unwrap().as_slice()).unwrap(); - writer.write_all("\"".as_bytes()).unwrap(); - } else { - // the subject did not map to an equivalence group - writer.write_all("{\"grebi:nodeId\":\"".as_bytes()).unwrap(); - writer.write_all(add_prefix.as_bytes()).unwrap(); - writer.write_all(id.unwrap()).unwrap(); - writer.write_all("\"".as_bytes()).unwrap(); + if !group.is_some() { + panic!("could not find identifier group for id: {}", String::from_utf8(id.unwrap().to_vec()).unwrap()); } + writer.write_all("{\"grebi:nodeId\":\"".as_bytes()).unwrap(); + writer.write_all(group.unwrap().as_slice()).unwrap(); + writer.write_all("\"".as_bytes()).unwrap(); + json.rewind(); while json.peek().kind != JsonTokenType::EndObject { @@ -129,7 +138,6 @@ fn main() { } else { let name_group = id_to_group.get(name); if name_group.is_some() { - writer.write_all(add_prefix.as_bytes()).unwrap(); writer.write_all(name_group.unwrap()).unwrap(); } else { writer.write_all(name).unwrap(); @@ -140,7 +148,7 @@ fn main() { if name.eq(b"id") || preserve_fields.contains(name) { writer.write_all(json.value()).unwrap(); } else { - write_value(&mut writer, json.value(), &id_to_group, &add_prefix); + write_value(&mut writer, json.value(), &id_to_group); } } @@ -151,7 +159,7 @@ fn main() { } -fn write_value(writer:&mut BufWriter, value:&[u8], id_to_group:&HashMap, Vec>, add_prefix:&str) { +fn write_value(writer:&mut BufWriter, value:&[u8], id_to_group:&HashMap, Vec>) { let string_locations = find_strings(&value); @@ -174,7 +182,6 @@ fn write_value(writer:&mut BufWriter, value:&[u8], id_to_group:& let pv_group = id_to_group.get(str); if pv_group.is_some() { - writer.write_all(add_prefix.as_bytes()).unwrap(); writer.write_all(pv_group.unwrap()).unwrap(); } else { writer.write_all(str).unwrap(); diff --git a/02_equivalences/grebi_extract_equivalences/Cargo.toml b/02_assign_ids/grebi_extract_identifiers/Cargo.toml similarity index 89% rename from 02_equivalences/grebi_extract_equivalences/Cargo.toml rename to 02_assign_ids/grebi_extract_identifiers/Cargo.toml index 4e54207..7e33761 100644 --- a/02_equivalences/grebi_extract_equivalences/Cargo.toml +++ b/02_assign_ids/grebi_extract_identifiers/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "grebi_extract_equivalences" +name = "grebi_extract_identifiers" version = "0.1.0" edition = "2021" diff --git a/02_equivalences/grebi_equivalences2groups/build.rs b/02_assign_ids/grebi_extract_identifiers/build.rs similarity index 100% rename from 02_equivalences/grebi_equivalences2groups/build.rs rename to 02_assign_ids/grebi_extract_identifiers/build.rs diff --git a/02_equivalences/grebi_extract_equivalences/src/main.rs b/02_assign_ids/grebi_extract_identifiers/src/main.rs similarity index 62% rename from 02_equivalences/grebi_extract_equivalences/src/main.rs rename to 02_assign_ids/grebi_extract_identifiers/src/main.rs index 3267bb3..d19c361 100644 --- a/02_equivalences/grebi_extract_equivalences/src/main.rs +++ b/02_assign_ids/grebi_extract_identifiers/src/main.rs @@ -7,7 +7,7 @@ use std::{env, io}; use std::io::{BufRead, BufReader }; use std::io::{Write, BufWriter}; -use grebi_shared::{get_subject, find_strings, serialize_equivalence, json_parser, json_lexer}; +use grebi_shared::{get_subject, find_strings, json_parser, json_lexer}; use clap::Parser; @@ -22,7 +22,7 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; struct Args { #[arg(long)] - equivalence_properties:String + identifier_properties:String } fn main() { @@ -36,13 +36,13 @@ fn main() { let stdout = io::stdout().lock(); let mut writer = BufWriter::new(stdout); - let mut equiv_props:HashSet> = HashSet::new(); + let mut id_props:HashSet> = HashSet::new(); let mut n_total = 0; let args = Args::parse(); - for prop in args.equivalence_properties.split(",") { - equiv_props.insert(prop.as_bytes().to_vec()); + for prop in args.identifier_properties.split(",") { + id_props.insert(prop.as_bytes().to_vec()); } loop { @@ -54,32 +54,15 @@ fn main() { } let mut json = JsonParser::parse(&line); - - - let mut id:Option<&[u8]> = None; json.begin_object(); - json.mark(); - while json.peek().kind != JsonTokenType::EndObject { - let name = json.name(); - if name.eq("id".as_bytes()) { - id = Some(json.string()); - break; - } else { - json.value(); // skip - } - } - json.rewind(); - - if id.is_none() { - panic!("Missing id field in JSON: {}", String::from_utf8(line).unwrap()); - } + let mut wrote_any = false; while json.peek().kind != JsonTokenType::EndObject { let k = json.name(); - if !equiv_props.contains(k) { + if !id_props.contains(k) { json.value(); // skip continue; } @@ -88,24 +71,33 @@ fn main() { json.begin_array(); while json.peek().kind != JsonTokenType::EndArray { if json.peek().kind == JsonTokenType::StartString { - let serialized = serialize_equivalence(id.unwrap(), json.string()); - if serialized.is_some() { - writer.write_all(&serialized.unwrap()).unwrap(); + if wrote_any { + writer.write_all(b"\t").unwrap(); + } else { + wrote_any = true; } + writer.write_all(&json.string()).unwrap(); } else { json.value(); // skip } } json.end_array(); } else if json.peek().kind == JsonTokenType::StartString { - let serialized = serialize_equivalence(id.unwrap(), json.string()); - if serialized.is_some() { - writer.write_all(&serialized.unwrap()).unwrap(); + if wrote_any { + writer.write_all(b"\t").unwrap(); + } else { + wrote_any = true; } + writer.write_all(&json.string()).unwrap(); } else { json.value(); // skip } } + if !wrote_any { + panic!("no identifiers found in object {}", String::from_utf8_lossy(&line)); + } + + writer.write_all(b"\n").unwrap(); n_total = n_total + 1; diff --git a/02_equivalences/grebi_equivalences2groups/Cargo.toml b/02_assign_ids/grebi_identifiers2groups/Cargo.toml similarity index 90% rename from 02_equivalences/grebi_equivalences2groups/Cargo.toml rename to 02_assign_ids/grebi_identifiers2groups/Cargo.toml index ebc8665..ec77587 100644 --- a/02_equivalences/grebi_equivalences2groups/Cargo.toml +++ b/02_assign_ids/grebi_identifiers2groups/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "grebi_equivalences2groups" +name = "grebi_identifiers2groups" version = "0.1.0" edition = "2021" diff --git a/02_equivalences/grebi_extract_equivalences/build.rs b/02_assign_ids/grebi_identifiers2groups/build.rs similarity index 100% rename from 02_equivalences/grebi_extract_equivalences/build.rs rename to 02_assign_ids/grebi_identifiers2groups/build.rs diff --git a/02_assign_ids/grebi_identifiers2groups/src/main.rs b/02_assign_ids/grebi_identifiers2groups/src/main.rs new file mode 100644 index 0000000..c991bdb --- /dev/null +++ b/02_assign_ids/grebi_identifiers2groups/src/main.rs @@ -0,0 +1,177 @@ + +use std::collections::{HashSet, HashMap, BTreeMap}; +use std::{env, io}; +use csv; +use rusqlite::Connection; +use bloomfilter::Bloom; +use clap::Parser; +use std::io::{BufRead, BufReader }; +use std::io::{Write, BufWriter}; + +#[derive(clap::Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + + #[arg(long)] + add_group: Vec, + + #[arg(long)] + add_prefix: String // used to prepend the subgraph name like hra_kg:g: +} + + +fn main() { + + let mut group_to_entities:BTreeMap>> = BTreeMap::new(); + let mut entity_to_group:BTreeMap, u64> = BTreeMap::new(); + + let mut next_group_id:u64 = 1; + + let args = Args::parse(); + let add_group:Vec = args.add_group; + for group in add_group { + let entries:HashSet> = group.split(",").map(|s| s.as_bytes().to_vec()).collect(); + let gid = next_group_id; + next_group_id = next_group_id + 1; + for id in &entries { + entity_to_group.insert(id.to_vec(), gid); + } + group_to_entities.insert(gid, entries); + } + + let start_time = std::time::Instant::now(); + let mut n = 0; + + let stdin = io::stdin(); + let handle = stdin.lock(); + let mut reader = BufReader::new(handle); + + let mut stdout = io::stdout().lock(); + let mut writer = BufWriter::new(stdout); + + loop { + let mut line: Vec = Vec::new(); + reader.read_until(b'\n', &mut line).unwrap(); + + n = n + 1; + if n % 1000000 == 0 { + eprintln!("...{} lines in {} seconds", n, start_time.elapsed().as_secs()); + } + + + if line.len() == 0 { + break; + } + if line[line.len() - 1] == b'\n' { + line.pop(); + } + + let mut ids:Vec> = line.split(|&byte| byte == b'\t').map(|id| id.to_vec()).collect(); + + let mut target_group:u64 = 0; + for id in &ids { + let g = entity_to_group.get(id); + if g.is_some() { + target_group = *g.unwrap(); + break; + } + } + + if target_group != 0 { + // at least one of the ids already had a group; + // put everything else into it + for id in &ids { + let g2 = entity_to_group.get(id); + if g2.is_some() && *g2.unwrap() != target_group { + // this id already had a group different to ours + let entities_in_b = group_to_entities.remove(&g2.unwrap()).unwrap(); + for e in entities_in_b.clone() { + entity_to_group.insert(e, target_group); + } + let entities_in_a = group_to_entities.get_mut(&target_group).unwrap(); + entities_in_a.extend(entities_in_b); + } else { + // this id didn't have a group + entity_to_group.insert(id.to_vec(), target_group); + group_to_entities.get_mut(&target_group).unwrap().insert(id.to_vec()); + } + } + } else { + + // none of the ids had a group so we make a new one + target_group = next_group_id; + next_group_id = next_group_id + 1; + for id in &ids { + entity_to_group.insert(id.to_vec(), target_group); + } + group_to_entities.insert(target_group, ids.iter().map(|id| id.to_vec()).collect::>()); + } + } + + eprintln!("Loaded {} lines in {} seconds", n, start_time.elapsed().as_secs()); + + let start_time2 = std::time::Instant::now(); + let mut n2 = 0; + + + for group in group_to_entities { + + n2 = n2 + 1; + + // writer.write_all("group_".as_bytes()).unwrap(); + // writer.write_all(group.0.to_string().as_bytes()).unwrap(); + // writer.write_all("\t".as_bytes()).unwrap(); + + let mut sorted_ids:Vec<&Vec> = group.1.iter().collect(); + sorted_ids.sort_unstable_by(|a, b| id_score(a).cmp(&id_score(b))); + + let mut is_first_value = true; + + for entity in sorted_ids { + if is_first_value { + writer.write_all(&args.add_prefix.as_bytes()).unwrap(); + writer.write_all(entity.as_slice()).unwrap(); + writer.write_all("\t".as_bytes()).unwrap(); + is_first_value = false; + } else { + writer.write_all("\t".as_bytes()).unwrap(); + } + writer.write_all(entity.as_slice()).unwrap(); + } + + writer.write_all("\n".as_bytes()).unwrap(); + } + + eprintln!("Wrote {} groups in {} seconds", n2, start_time2.elapsed().as_secs()); + +} + + +// From the equivalence group, try to pick an ID which will be obvious in Neo4j. +// Prefer: +// - CURIEs +// - textual (readable) IDs rather than numeric +// - "grebi:" IDs always win (used to consolidate names on grebi:name etc.) +// lower score is better +// +fn id_score(id:&[u8]) -> i32 { + + if id.starts_with(b"grebi:") { + return i32::MIN; + } + + let mut score = 0; + + if id.contains(&b':') && !id.starts_with(b"http") { + score = score - 1000; // curie-like + } + + for c in id { + if c.is_ascii_alphabetic() { + score = score + 1; + } + } + + return score; +} + diff --git a/02_equivalences/grebi_equivalences2groups/src/main.rs b/02_equivalences/grebi_equivalences2groups/src/main.rs deleted file mode 100644 index 88f21cd..0000000 --- a/02_equivalences/grebi_equivalences2groups/src/main.rs +++ /dev/null @@ -1,188 +0,0 @@ - - -use std::collections::{HashSet, HashMap, BTreeMap}; -use std::{env, io}; -use csv; -use rusqlite::Connection; -use bloomfilter::Bloom; -use clap::Parser; -use std::io::{BufRead, BufReader }; -use std::io::{Write, BufWriter}; - -#[derive(clap::Parser, Debug)] -#[command(author, version, about, long_about = None)] -struct Args { - - #[arg(long)] - add_group: Vec -} - - -fn main() { - - let mut group_to_entities:BTreeMap>> = BTreeMap::new(); - let mut entity_to_group:BTreeMap, u64> = BTreeMap::new(); - - let mut next_group_id:u64 = 0; - - let args = Args::parse(); - let add_group:Vec = args.add_group; - for group in add_group { - let entries:HashSet> = group.split(",").map(|s| s.as_bytes().to_vec()).collect(); - let gid = next_group_id; - next_group_id = next_group_id + 1; - for id in &entries { - entity_to_group.insert(id.to_vec(), gid); - } - group_to_entities.insert(gid, entries); - } - - let start_time = std::time::Instant::now(); - let mut n = 0; - - let stdin = io::stdin(); - let handle = stdin.lock(); - let mut reader = BufReader::new(handle); - - let mut stdout = io::stdout().lock(); - let mut writer = BufWriter::new(stdout); - - loop { - let mut subject: Vec = Vec::new(); - reader.read_until(b'\t', &mut subject).unwrap(); - - if subject.len() == 0 { - break; - } - - subject.pop(); // remove \t - - let mut object: Vec = Vec::new(); - reader.read_until(b'\n', &mut object).unwrap(); - - object.pop(); // remove \n - - - n = n + 1; - if n % 1000000 == 0 { - eprintln!("...{} equivalences in {} seconds", n, start_time.elapsed().as_secs()); - } - - if subject == object { - continue; - } - - let group_a:Option = entity_to_group.get(&subject).cloned(); - let group_b:Option = entity_to_group.get(&object).cloned(); - - if group_a.is_some() { - // A has a group - let gA = group_a.unwrap(); - if group_b.is_some() { - // B has a group - let gB = group_b.unwrap(); - if gA == gB { - // already in the same group, nothing to do - continue - } - // A and B are in different groups - // merge B into A - let entities_in_b = group_to_entities.remove(&gB).unwrap(); - for e in entities_in_b.clone() { - entity_to_group.insert(e, gA); - } - let entities_in_a = group_to_entities.get_mut(&gA).unwrap(); - entities_in_a.extend(entities_in_b); - - } else { - // A has a group and B doesn't - // Put B in A's group - entity_to_group.insert(object.clone(), gA); - group_to_entities.get_mut(&gA).unwrap().insert(object); - } - } else { - // A does not have a group - if group_b.is_some() { - let gB = group_b.unwrap(); - // B has a group but A does not - // Put A in B's group - entity_to_group.insert(subject.clone(), gB); - group_to_entities.get_mut(&gB).unwrap().insert(subject); - } else { - // Neither A nor B have a group. - // Put both into a new group. - - let group_id = next_group_id; - next_group_id = next_group_id + 1; - - entity_to_group.insert(subject.clone(), group_id); - entity_to_group.insert(object.clone(), group_id); - group_to_entities.insert(group_id, HashSet::from([subject, object])); - } - } - } - - eprintln!("Loaded {} equivalences in {} seconds", n, start_time.elapsed().as_secs()); - - let start_time2 = std::time::Instant::now(); - let mut n2 = 0; - - - for group in group_to_entities { - - n2 = n2 + 1; - - // writer.write_all("group_".as_bytes()).unwrap(); - // writer.write_all(group.0.to_string().as_bytes()).unwrap(); - // writer.write_all("\t".as_bytes()).unwrap(); - - let mut sorted_ids:Vec<&Vec> = group.1.iter().collect(); - sorted_ids.sort_unstable_by(|a, b| id_score(a).cmp(&id_score(b))); - - let mut is_first_value = true; - - for entity in sorted_ids { - if is_first_value { - is_first_value = false; - } else { - writer.write_all("\t".as_bytes()).unwrap(); - } - writer.write_all(entity.as_slice()).unwrap(); - } - - writer.write_all("\n".as_bytes()).unwrap(); - } - - eprintln!("Wrote {} groups in {} seconds", n2, start_time2.elapsed().as_secs()); - -} - - -// From the equivalence group, try to pick an ID which will be obvious in Neo4j. -// Prefer: -// - CURIEs -// - textual (readable) IDs rather than numeric -// - "grebi:" IDs always win (used to consolidate names on grebi:name etc.) -// lower score is better -// -fn id_score(id:&[u8]) -> i32 { - - if id.starts_with(b"grebi:") { - return i32::MIN; - } - - let mut score = 0; - - if id.contains(&b':') && !id.starts_with(b"http") { - score = score - 1000; // curie-like - } - - for c in id { - if c.is_ascii_alphabetic() { - score = score - 1; - } - } - - return score; -} - diff --git a/Cargo.lock b/Cargo.lock index d906615..befd238 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -463,29 +463,29 @@ dependencies = [ ] [[package]] -name = "grebi_equivalences2groups" +name = "grebi_extract_identifiers" version = "0.1.0" dependencies = [ - "bloomfilter", "clap", "csv", "fasthash", "grebi_shared", + "jemallocator", "lmdb-zero", - "rusqlite", "serde_json", ] [[package]] -name = "grebi_extract_equivalences" +name = "grebi_identifiers2groups" version = "0.1.0" dependencies = [ + "bloomfilter", "clap", "csv", "fasthash", "grebi_shared", - "jemallocator", "lmdb-zero", + "rusqlite", "serde_json", ] diff --git a/Cargo.toml b/Cargo.toml index 9cbaec9..f4a0ca1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,9 @@ members = [ "01_ingest/grebi_ingest_reactome", "01_ingest/grebi_ingest_kgx_edges", "01_ingest/grebi_normalise_prefixes", - "02_equivalences/grebi_extract_equivalences", - "02_equivalences/grebi_equivalences2groups", - "02_equivalences/grebi_assign_ids", + "02_assign_ids/grebi_extract_identifiers", + "02_assign_ids/grebi_identifiers2groups", + "02_assign_ids/grebi_assign_ids", "03_merge/grebi_merge", "04_index/grebi_index", "05_materialise/grebi_materialise", diff --git a/configs/subgraph_configs/ebi_full_monarch.json b/configs/subgraph_configs/ebi_full_monarch.json index ef0c647..a7ee39c 100644 --- a/configs/subgraph_configs/ebi_full_monarch.json +++ b/configs/subgraph_configs/ebi_full_monarch.json @@ -2,7 +2,8 @@ "id": "EBI_MONARCH", "name": "EBI Resources and MONARCH Initiative KG", "bytes_per_merged_file": 1073741824, - "equivalence_props": [ + "identifier_props": [ + "id", "owl:equivalentClass", "owl:equivalentProperty", "owl:sameAs", diff --git a/configs/subgraph_configs/hett.json b/configs/subgraph_configs/hett.json index c184160..7e24e2d 100644 --- a/configs/subgraph_configs/hett.json +++ b/configs/subgraph_configs/hett.json @@ -2,7 +2,8 @@ "id": "HETT", "name": "EMBL Human Ecosystems", "bytes_per_merged_file": 1073741824, - "equivalence_props": [ + "identifier_props": [ + "id", "owl:equivalentClass", "owl:equivalentProperty", "owl:sameAs", diff --git a/configs/subgraph_configs/hra_kg.json b/configs/subgraph_configs/hra_kg.json index 30d7e87..efc9982 100644 --- a/configs/subgraph_configs/hra_kg.json +++ b/configs/subgraph_configs/hra_kg.json @@ -2,7 +2,8 @@ "id": "HRA_KG", "name": "Human Reference Atlas KG", "bytes_per_merged_file": 1073741824, - "equivalence_props": [ + "identifier_props": [ + "id", "owl:equivalentClass", "owl:equivalentProperty", "owl:sameAs", diff --git a/configs/subgraph_configs/monarch.json b/configs/subgraph_configs/monarch.json index 46d04ad..f885b41 100644 --- a/configs/subgraph_configs/monarch.json +++ b/configs/subgraph_configs/monarch.json @@ -2,7 +2,8 @@ "id": "MONARCH", "name": "MONARCH Initiative KG", "bytes_per_merged_file": 1073741824, - "equivalence_props": [ + "identifier_props": [ + "id", "owl:equivalentClass", "owl:equivalentProperty", "owl:sameAs", diff --git a/grebi_shared/src/lib.rs b/grebi_shared/src/lib.rs index 7c05c0d..8ce91ed 100644 --- a/grebi_shared/src/lib.rs +++ b/grebi_shared/src/lib.rs @@ -130,28 +130,6 @@ pub fn get_subjects<'a>(json:&'a [u8])->Vec<&'a [u8]> { } -pub fn serialize_equivalence(subject:&[u8], object:&[u8]) -> Option> { - - if subject.eq(object) { - return None; - } - - let mut buf = Vec::with_capacity(subject.len() + object.len() + 2); - - if subject < object { - buf.extend(subject.iter().map(filter_newlines)); - buf.push(b'\t'); - buf.extend(object.iter().map(filter_newlines)); - } else { - buf.extend(object.iter().map(filter_newlines)); - buf.push(b'\t'); - buf.extend(subject.iter().map(filter_newlines)); - } - - buf.push(b'\n'); - return Some(buf); -} - fn filter_newlines(ch:&u8)->u8 { if *ch == b'\n' || *ch == b'\t' { return b' '; diff --git a/nextflow/01_create_subgraph.nf b/nextflow/01_create_subgraph.nf index d3782fb..400e461 100644 --- a/nextflow/01_create_subgraph.nf +++ b/nextflow/01_create_subgraph.nf @@ -16,9 +16,9 @@ workflow { files_listing = prepare() | splitText | map { row -> parseJson(row) } - ingest(files_listing, Channel.value(config.equivalence_props)) - groups_txt = build_equiv_groups(ingest.out.equivalences.collect(), Channel.value(config.additional_equivalence_groups)) - assigned = assign_ids(ingest.out.nodes, groups_txt).collect(flat: false) + ingest(files_listing, Channel.value(config.identifier_props)) + groups_txt = build_equiv_groups(ingest.out.identifiers.collect(), Channel.value(config.additional_equivalence_groups)) + assigned = assign_ids(ingest.out.nodes, groups_txt, Channel.value(config.identifier_props)).collect(flat: false) merged = merge_ingests( assigned, @@ -26,7 +26,7 @@ workflow { Channel.value(config.bytes_per_merged_file)) indexed = index(merged.collect()) - materialised = materialise(merged.flatten(), indexed.metadata_jsonl, Channel.value(config.exclude_edges + config.equivalence_props)) + materialised = materialise(merged.flatten(), indexed.metadata_jsonl, Channel.value(config.exclude_edges + config.identifier_props)) rocks_db = create_rocks(materialised.collect()) @@ -85,11 +85,11 @@ process ingest { input: val(file_listing) - val(equivalence_props) + val(identifier_props) output: tuple val(file_listing.datasource.name), path("nodes_${task.index}.jsonl.gz"), emit: nodes - path("equivalences_${task.index}.tsv"), emit: equivalences + path("identifiers_${task.index}.tsv"), emit: identifiers script: """ @@ -101,9 +101,9 @@ process ingest { --filename "${basename(file_listing.filename)}" \ ${buildIngestArgs(file_listing.ingest.ingest_args)} \ | ${params.home}/target/release/grebi_normalise_prefixes ${params.home}/prefix_maps/prefix_map_normalise.json \ - | tee >(${params.home}/target/release/grebi_extract_equivalences \ - --equivalence-properties ${equivalence_props.iterator().join(",")} \ - > equivalences_${task.index}.tsv) \ + | tee >(${params.home}/target/release/grebi_extract_identifiers \ + --identifier-properties ${identifier_props.iterator().join(",")} \ + > identifiers_${task.index}.tsv) \ | pigz --fast > nodes_${task.index}.jsonl.gz """ } @@ -114,7 +114,7 @@ process build_equiv_groups { time '23h' input: - path(equivalences_tsv) + path(identifiers_tsv) val(additional_equivalence_groups) output: @@ -124,8 +124,9 @@ process build_equiv_groups { """ #!/usr/bin/env bash set -Eeuo pipefail - cat ${equivalences_tsv} \ - | ${params.home}/target/release/grebi_equivalences2groups \ + cat ${identifiers_tsv} \ + | ${params.home}/target/release/grebi_identifiers2groups \ + --add-prefix ${params.subgraph}:g: \ ${buildAddEquivGroupArgs(additional_equivalence_groups)} \ > groups.txt """ @@ -141,6 +142,7 @@ process assign_ids { input: tuple(val(datasource_name), path(nodes_jsonl)) path groups_txt + val(identifier_props) output: tuple(val(datasource_name), path("nodes_with_ids.sorted.jsonl.gz")) @@ -151,7 +153,7 @@ process assign_ids { set -Eeuo pipefail zcat ${nodes_jsonl} \ | ${params.home}/target/release/grebi_assign_ids \ - --add-prefix ${params.subgraph}:g: \ + --identifier-properties ${identifier_props.iterator().join(",")} \ --groups-txt ${groups_txt} \ > nodes_with_ids.jsonl LC_ALL=C sort -o nodes_with_ids.sorted.jsonl nodes_with_ids.jsonl