Skip to content

Commit

Permalink
Merge pull request #148 from ZhongFuze/upstream/batch-upsert
Browse files Browse the repository at this point in the history
[!] Fix ethereum primary ens_domain `display_name`
  • Loading branch information
ZhongFuze authored May 28, 2024
2 parents 1fd76b9 + 62386af commit 7ff3353
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 43 deletions.
82 changes: 70 additions & 12 deletions src/tigergraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ pub async fn batch_upsert(
client: &Client<HttpConnector>,
edges: Vec<EdgeWrapperEnum>,
) -> Result<(), Error> {
// let json_raw = serde_json::to_string(&edges).map_err(|err| Error::JSONParseError(err))?;
// trace!("edges = {}", json_raw);
let mut graph: UpsertGraph = BatchEdges(edges.clone()).into();
// let json_raw_2 = serde_json::to_string(&graph).map_err(|err| Error::JSONParseError(err))?;
// trace!("Graph upsert struct: {}", json_raw_2);
Expand Down Expand Up @@ -786,36 +788,92 @@ impl From<BatchEdges> for UpsertGraph {
.or_insert_with(HashMap::new)
.insert(target_vertex_id.clone(), edge_attributes);

// Helper function to merge vertex attributes
fn merge_vertex_attributes(
vertices_map: &mut HashMap<String, HashMap<String, HashMap<String, Attribute>>>,
vertex_type: &str,
vertex_id: &str,
new_attributes: HashMap<String, Attribute>,
) {
if let Some(existing_attributes) = vertices_map
.entry(vertex_type.to_string())
.or_insert_with(HashMap::new)
.get_mut(vertex_id)
{
for (key, new_attr) in new_attributes {
match key.as_str() {
"reverse" => {
if let Some(existing_attr) = existing_attributes.get_mut("reverse")
{
if let (Value::Bool(existing_val), Value::Bool(new_val)) =
(&existing_attr.value, new_attr.value)
{
existing_attr.value = json!(*existing_val || new_val);
}
} else {
existing_attributes.insert(key, new_attr);
}
}
"display_name" => {
if let Some(existing_attr) =
existing_attributes.get_mut("display_name")
{
if existing_attr.value == json!("")
&& new_attr.value != json!("")
{
existing_attr.value = new_attr.value;
}
} else {
existing_attributes.insert(key, new_attr);
}
}
_ => {
existing_attributes.insert(key, new_attr);
}
}
}
} else {
vertices_map
.get_mut(vertex_type)
.unwrap()
.insert(vertex_id.to_string(), new_attributes);
}
}

// downcast_ref is a method from the Any trait in Rust,
// which allows you to safely attempt to
// convert a reference to a trait object (&dyn Any)
// back into a reference to a specific concrete type (&T)
if let Some(source) = edge_wrapper_enum
.source()
.as_any()
.downcast_ref::<IdentitiesGraph>()
.downcast_ref::<Identity>()
{
vertices_map
.entry(source_vertex_type.clone())
.or_insert_with(HashMap::new)
.insert(source_vertex_id.clone(), source.to_attributes_map());
merge_vertex_attributes(
&mut vertices_map,
&source_vertex_type,
&source_vertex_id,
source.to_attributes_map(),
);
}

if let Some(target) = edge_wrapper_enum
.target()
.as_any()
.downcast_ref::<IdentitiesGraph>()
.downcast_ref::<Identity>()
{
vertices_map
.entry(target_vertex_type.clone())
.or_insert_with(HashMap::new)
.insert(target_vertex_id.clone(), target.to_attributes_map());
merge_vertex_attributes(
&mut vertices_map,
&target_vertex_type,
&target_vertex_id,
target.to_attributes_map(),
);
}

if let Some(source) = edge_wrapper_enum
.source()
.as_any()
.downcast_ref::<Identity>()
.downcast_ref::<IdentitiesGraph>()
{
vertices_map
.entry(source_vertex_type.clone())
Expand All @@ -826,7 +884,7 @@ impl From<BatchEdges> for UpsertGraph {
if let Some(target) = edge_wrapper_enum
.target()
.as_any()
.downcast_ref::<Identity>()
.downcast_ref::<IdentitiesGraph>()
{
vertices_map
.entry(target_vertex_type.clone())
Expand Down
30 changes: 20 additions & 10 deletions src/upstream/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn test_fetch_all() -> Result<(), Error> {
// Some(5),
// )
// .await?;

// fetch_all(
// vec![Target::Identity(
// Platform::Ethereum,
Expand All @@ -39,6 +40,15 @@ async fn test_fetch_all() -> Result<(), Error> {
// )
// .await?;

fetch_all(
vec![Target::Identity(
Platform::Ethereum,
"0xd8da6bf26964af9d7eed9e03e53415d37aa96045".into(),
)],
Some(5),
)
.await?;

// fetch_all(
// vec![Target::NFT(
// Chain::Ethereum,
Expand All @@ -50,16 +60,16 @@ async fn test_fetch_all() -> Result<(), Error> {
// )
// .await?;

fetch_all(
vec![Target::NFT(
Chain::Ethereum,
ContractCategory::ENS,
ContractCategory::ENS.default_contract_address().unwrap(),
"vitalik.eth".to_string(),
)],
Some(5),
)
.await?;
// fetch_all(
// vec![Target::NFT(
// Chain::Ethereum,
// ContractCategory::ENS,
// ContractCategory::ENS.default_contract_address().unwrap(),
// "vitalik.eth".to_string(),
// )],
// Some(5),
// )
// .await?;

// fetch_all(
// vec![Target::Identity(Platform::CKB, "ckb1qzfhdsa4syv599s2s3nfrctwga70g0tu07n9gpnun9ydlngf5vsnwqggq7v6mzt3n8wv9y2n6h9z429ta0auek7v05yq0xdd39cenhxzj9fatj324z47h77vm0x869nu03m".into())],
Expand Down
49 changes: 28 additions & 21 deletions src/upstream/unstoppable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async fn batch_fetch_by_wallet(target: &Target) -> Result<(TargetProcessedList,
let hold: Hold = Hold {
uuid: Uuid::new_v4(),
source: DataSource::UnstoppableDomains,
transaction: None,
transaction: Some("".to_string()),
id: item
.attributes
.meta
Expand Down Expand Up @@ -388,19 +388,23 @@ async fn fetch_domain(owners: &str, page: &str) -> Result<RecordsForOwnerRespons
})?;

// Parse response body
let result: RecordsForOwnerResponse = match parse_body(&mut body).await {
let result = match parse_body::<RecordsForOwnerResponse>(&mut body).await {
Ok(result) => result,
Err(_) => {
let err: BadResponse = parse_body(&mut body).await?;
let err_message = format!(
"UnstoppableDomains fetch error, Code: {}, Message: {}",
err.code, err.message
);
error!(err_message);
return Err(Error::General(
err_message,
lambda_http::http::StatusCode::INTERNAL_SERVER_ERROR,
));
match parse_body::<BadResponse>(&mut body).await {
Ok(bad) => {
let err_message = format!(
"UnstoppableDomains fetch error, Code: {}, Message: {}",
bad.code, bad.message
);
error!(err_message);
return Err(Error::General(
err_message,
lambda_http::http::StatusCode::INTERNAL_SERVER_ERROR,
));
}
Err(err) => return Err(err),
};
}
};
Ok(result)
Expand Down Expand Up @@ -591,16 +595,19 @@ async fn fetch_owner(domains: &str) -> Result<DomainResponse, Error> {

let result = match parse_body::<DomainResponse>(&mut resp).await {
Ok(result) => result,
Err(_) => {
let err: BadResponse = parse_body(&mut resp).await?;
let err_message = format!(
"UnstoppableDomains fetch | errCode: {}, errMessage: {}",
err.code, err.message
);
error!(err_message);
return Err(Error::General(err_message, resp.status()));
}
Err(_) => match parse_body::<BadResponse>(&mut resp).await {
Ok(bad) => {
let err_message = format!(
"UnstoppableDomains fetch | errCode: {}, errMessage: {}",
bad.code, bad.message
);
error!(err_message);
return Err(Error::General(err_message, resp.status()));
}
Err(err) => return Err(err),
},
};

Ok(result)
}

Expand Down

0 comments on commit 7ff3353

Please sign in to comment.