Skip to content

Commit

Permalink
Merge pull request #147 from ZhongFuze/upstream/batch-upsert
Browse files Browse the repository at this point in the history
Optimize/batch-upsert
  • Loading branch information
nykma authored May 27, 2024
2 parents 225f725 + f64e451 commit 1fd76b9
Show file tree
Hide file tree
Showing 41 changed files with 4,937 additions and 219 deletions.
6 changes: 3 additions & 3 deletions config/main.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ url = "https://proof-service.next.id"
api_key = "x-api-key"

[upstream.aggregation_service]
url = "https://7x16bogxfb.execute-api.us-east-1.amazonaws.com/v1/identity/search"
url = "http://data-server-hostname/data_server"

[upstream.sybil_service]
url = "https://raw.githubusercontent.com/Uniswap/sybil-list/master/verified.json"

[upstream.keybase_service]
url = "https://keybase.io/_/api/1.0/user/lookup.json"
stable_url = "http://ec2-18-167-90-166.ap-east-1.compute.amazonaws.com/data_server/keybase"
stable_url = "http://data-server-hostname/data_server/keybase"

[upstream.knn3_service]
url = "https://mw.graphql.knn3.xyz/"
Expand Down Expand Up @@ -62,4 +62,4 @@ url = "https://indexer.crossbell.io/v1/graphql"
rpc_url = "https://api.mainnet-beta.solana.com"

[upstream.genome_api]
rpc_url = "http://ec2-18-167-90-166.ap-east-1.compute.amazonaws.com/data_server/genome"
rpc_url = "http://data-server-hostname/data_server/genome"
55 changes: 55 additions & 0 deletions src/config/tdb/migrations/LoadingJob_SocialGraph.gsql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,61 @@ CREATE GRAPH SocialGraph (Identities, Proof_Forward, Proof_Backward, Contracts,

USE GRAPH SocialGraph

CREATE OR REPLACE QUERY insert_contract_connection(STRING edges_str) FOR GRAPH SocialGraph SYNTAX v2 {
JSONARRAY edges = parse_json_array(edges_str);
SumAccum<INT> @@created_edges;
INT array_size = edges.size();
FOREACH idx IN RANGE[0, array_size - 1] DO
JSONOBJECT edge_obj = edges.getJsonObject(idx);
STRING edge_type = edge_obj.getString("edge_type");
STRING from_id = edge_obj.getString("from_id");
STRING to_id = edge_obj.getString("to_id");
IF edge_type == "Hold_Contract" THEN
INSERT INTO Hold_Contract(FROM, TO, DISCRIMINATOR(source, transaction, id), uuid, created_at, updated_at, fetcher, expired_at)
VALUES (
from_id Identities,
to_id Contracts,
edge_obj.getString("source"),
edge_obj.getString("transaction"),
edge_obj.getString("id"),
edge_obj.getString("uuid"),
to_datetime(edge_obj.getString("created_at")),
now(),
edge_obj.getString("fetcher"),
to_datetime(edge_obj.getString("expired_at"))
);
@@created_edges += 1;
ELSE IF edge_type == "Reverse_Resolve_Contract" THEN
INSERT INTO Reverse_Resolve_Contract(FROM, TO, DISCRIMINATOR(source, system, name), uuid, updated_at, fetcher)
VALUES (
from_id Identities,
to_id Contracts,
edge_obj.getString("source"),
edge_obj.getString("system"),
edge_obj.getString("name"),
edge_obj.getString("uuid"),
now(),
edge_obj.getString("fetcher")
);
@@created_edges += 1;
ELSE IF edge_type == "Resolve_Contract" THEN
INSERT INTO Resolve_Contract(FROM, TO, DISCRIMINATOR(source, system, name), uuid, updated_at, fetcher)
VALUES (
from_id Contracts,
to_id Identities,
edge_obj.getString("source"),
edge_obj.getString("system"),
edge_obj.getString("name"),
edge_obj.getString("uuid"),
now(),
edge_obj.getString("fetcher")
);
@@created_edges += 1;
END;
END;
PRINT @@created_edges as created_edges;
}

CREATE OR REPLACE QUERY upsert_isolated_vertex(STRING vertex_str, INT updated_nanosecond) FOR GRAPH SocialGraph SYNTAX v2 {
TYPEDEF TUPLE< INT updated_nanosecond, STRING id > MinUpdatedTimeTuple;
JSONOBJECT from_v = parse_json_object(vertex_str);
Expand Down
4 changes: 2 additions & 2 deletions src/tigergraph/edge/hold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl Transfer for HoldRecord {
attributes_map
}

fn to_json_value(&self) -> Value {
fn to_json_value(&self) -> Map<String, Value> {
let mut map = Map::new();
map.insert("uuid".to_string(), json!(self.uuid));
map.insert("source".to_string(), json!(self.source));
Expand All @@ -246,7 +246,7 @@ impl Transfer for HoldRecord {
self.expired_at
.map_or(json!("1970-01-01 00:00:00"), |expired_at| json!(expired_at)),
);
Value::Object(map)
map
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/tigergraph/edge/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub mod hold;
pub mod part_of_identities_graph;
pub mod proof;
pub mod relation;
pub mod resolve;
pub use hold::{Hold, HoldRecord, HOLD_CONTRACT, HOLD_IDENTITY};
pub use part_of_identities_graph::{HyperEdge, HyperEdgeRecord, HYPER_EDGE, HYPER_EDGE_REVERSE};
pub use proof::{
Proof, ProofRecord, EDGE_NAME as PROOF_EDGE, REVERSE_EDGE_NAME as PROOF_REVERSE_EDGE,
};
Expand Down
174 changes: 174 additions & 0 deletions src/tigergraph/edge/part_of_identities_graph.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
use crate::{
error::Error,
tigergraph::{
edge::{Edge, EdgeRecord, EdgeWrapper, FromWithParams, Wrapper},
vertex::{IdentitiesGraph, Identity, Vertex, VertexRecord},
Attribute, Transfer,
},
};

use hyper::{client::HttpConnector, Client};
use serde::{Deserialize, Serialize};
use serde_json::value::{Map, Value};
use std::collections::HashMap;
use uuid::Uuid;

// always IdentitiesGraph -> Identities
pub const HYPER_EDGE: &str = "PartOfIdentitiesGraph_Reverse";
pub const HYPER_EDGE_REVERSE: &str = "PartOfIdentitiesGraph_Reverse";
pub const IS_DIRECTED: bool = true;

/// HyperEdge
#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct HyperEdge {}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct HyperEdgeRecord(pub EdgeRecord<HyperEdge>);

impl FromWithParams<HyperEdge> for EdgeRecord<HyperEdge> {
fn from_with_params(
e_type: String,
directed: bool,
from_id: String,
from_type: String,
to_id: String,
to_type: String,
attributes: HyperEdge,
) -> Self {
EdgeRecord {
e_type,
directed,
from_id,
from_type,
to_id,
to_type,
discriminator: None,
attributes,
}
}
}

impl From<EdgeRecord<HyperEdge>> for HyperEdgeRecord {
fn from(record: EdgeRecord<HyperEdge>) -> Self {
HyperEdgeRecord(record)
}
}

impl std::ops::Deref for HyperEdgeRecord {
type Target = EdgeRecord<HyperEdge>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for HyperEdgeRecord {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl std::ops::Deref for EdgeRecord<HyperEdge> {
type Target = HyperEdge;

fn deref(&self) -> &Self::Target {
&self.attributes
}
}

impl std::ops::DerefMut for EdgeRecord<HyperEdge> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.attributes
}
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct HyperEdgeAttribute(HashMap<String, Attribute>);

// Implement the `From` trait for converting `HyperEdgeRecord` into a `HashMap<String, Attr>`.
impl Transfer for HyperEdgeRecord {
fn to_attributes_map(&self) -> HashMap<String, Attribute> {
let attributes_map = HashMap::new();
attributes_map
}

fn to_json_value(&self) -> Map<String, Value> {
let map = Map::new();
map
}
}

#[async_trait::async_trait]
impl Edge<IdentitiesGraph, Identity, HyperEdgeRecord> for HyperEdgeRecord {
fn e_type(&self) -> String {
self.e_type.clone()
}

fn directed(&self) -> bool {
// TODO: query from server is the best solution
self.directed.clone()
}

/// Find an edge by UUID.
async fn find_by_uuid(
_client: &Client<HttpConnector>,
_uuid: &Uuid,
) -> Result<Option<HyperEdgeRecord>, Error> {
todo!()
}

/// Find `EdgeRecord` by source and target
async fn find_by_from_to(
&self,
_client: &Client<HttpConnector>,
_from: &VertexRecord<IdentitiesGraph>,
_to: &VertexRecord<Identity>,
_filter: Option<HashMap<String, String>>,
) -> Result<Option<Vec<HyperEdgeRecord>>, Error> {
todo!()
}

/// Connect 2 vertex.
async fn connect(
&self,
_client: &Client<HttpConnector>,
_from: &IdentitiesGraph,
_to: &Identity,
) -> Result<(), Error> {
todo!()
}

/// notice this function is deprecated
async fn connect_reverse(
&self,
_client: &Client<HttpConnector>,
_from: &IdentitiesGraph,
_to: &Identity,
) -> Result<(), Error> {
todo!()
}
}

impl Wrapper<HyperEdgeRecord, IdentitiesGraph, Identity> for HyperEdge {
fn wrapper(
&self,
from: &IdentitiesGraph,
to: &Identity,
name: &str,
) -> EdgeWrapper<HyperEdgeRecord, IdentitiesGraph, Identity> {
let part_of = EdgeRecord::from_with_params(
name.to_string(),
IS_DIRECTED,
from.primary_key(),
from.vertex_type(),
to.primary_key(),
to.vertex_type(),
self.to_owned(),
);
EdgeWrapper {
edge: HyperEdgeRecord(part_of),
source: from.to_owned(),
target: to.to_owned(),
}
}
}
4 changes: 2 additions & 2 deletions src/tigergraph/edge/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl Transfer for ProofRecord {
attributes_map
}

fn to_json_value(&self) -> Value {
fn to_json_value(&self) -> Map<String, Value> {
let mut map = Map::new();
map.insert("uuid".to_string(), json!(self.uuid));
map.insert("source".to_string(), json!(self.source));
Expand All @@ -207,7 +207,7 @@ impl Transfer for ProofRecord {
);
map.insert("updated_at".to_string(), json!(self.updated_at));
map.insert("fetcher".to_string(), json!(self.fetcher));
Value::Object(map)
map
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/tigergraph/edge/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,15 @@ impl Transfer for ResolveRecord {
);
attributes_map
}
fn to_json_value(&self) -> Value {
fn to_json_value(&self) -> Map<String, Value> {
let mut map = Map::new();
map.insert("uuid".to_string(), json!(self.uuid));
map.insert("source".to_string(), json!(self.source));
map.insert("system".to_string(), json!(self.system));
map.insert("name".to_string(), json!(self.name));
map.insert("fetcher".to_string(), json!(self.fetcher));
map.insert("updated_at".to_string(), json!(self.updated_at));
Value::Object(map)
map
}
}

Expand Down
Loading

0 comments on commit 1fd76b9

Please sign in to comment.