Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[workspace]
resolver = "2"
members = [
"tap_aggregator",
"tap_core",
"tap_eip712_message",
"tap_graph",
"tap_integration_tests",
"tap_receipt",
"crates/aggregator",
"crates/core",
"crates/eip712_message",
"crates/graph",
"crates/integration_tests",
"crates/receipt",
]

[workspace.package]
Expand Down Expand Up @@ -46,11 +46,11 @@ serde = { version = "1.0.219", features = ["derive"] }
serde_json = { version = "1.0.140", features = ["raw_value"] }
strum = { version = "0.27.1", features = ["derive"] }
rstest = "0.25.0"
tap_aggregator = { version = "0.6.0", path = "tap_aggregator" }
tap_eip712_message = { version = "0.2.2", path = "tap_eip712_message" }
tap_core = { version = "6.0.0", path = "tap_core" }
tap_graph = { version = "0.3.4", path = "tap_graph" }
tap_receipt = { version = "1.1.3", path = "tap_receipt" }
graph_tally_aggregator = { version = "0.6.4", path = "crates/aggregator" }
graph_tally_eip712_message = { version = "0.2.3", path = "crates/eip712_message" }
graph_tally_core = { version = "6.0.3", path = "crates/core" }
graph_tally_graph = { version = "0.3.5", path = "crates/graph" }
graph_tally_receipt = { version = "1.1.4", path = "crates/receipt" }
thegraph-core = "0.15.1"
thiserror = "2.0.12"
tokio = { version = "1.44.2", features = ["macros", "signal"] }
Expand Down
File renamed without changes.
10 changes: 5 additions & 5 deletions tap_aggregator/Cargo.toml → crates/aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
[package]
name = "tap_aggregator"
name = "graph_tally_aggregator"
version = "0.6.4"
edition.workspace = true
rust-version.workspace = true
license.workspace = true
repository.workspace = true
readme = "README.md"
description = "A JSON-RPC service for the Timeline Aggregation Protocol that lets clients request an aggregate receipt from a list of individual receipts."
description = "A JSON-RPC service for Graph Tally that lets clients request an aggregate receipt from a list of individual receipts."

[[bin]]
name = "tap_aggregator"
name = "graph_tally_aggregator"
path = "src/main.rs"

[dependencies]
Expand All @@ -28,8 +28,8 @@ rdkafka.workspace = true
serde.workspace = true
serde_json.workspace = true
strum.workspace = true
tap_core.workspace = true
tap_graph.workspace = true
graph_tally_core.workspace = true
graph_tally_graph.workspace = true
thegraph-core = { workspace = true, features = ["alloy-eip712"] }
tokio.workspace = true
tonic.workspace = true
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::HashSet;

use anyhow::{bail, Ok, Result};
use graph_tally_core::{receipt::WithUniqueId, signed_message::Eip712SignedMessage};
use graph_tally_graph::{Receipt, ReceiptAggregateVoucher};
use rayon::prelude::*;
use tap_core::{receipt::WithUniqueId, signed_message::Eip712SignedMessage};
use tap_graph::{Receipt, ReceiptAggregateVoucher};
use thegraph_core::alloy::{
dyn_abi::Eip712Domain,
primitives::{Address, FixedBytes},
Expand Down Expand Up @@ -45,7 +45,7 @@ pub fn check_and_aggregate_receipts(
receipt.message.data_service,
receipt.message.service_provider,
),
None => return Err(tap_core::Error::NoValidReceiptsForRavRequest.into()),
None => return Err(graph_tally_core::Error::NoValidReceiptsForRavRequest.into()),
};

// Check that the receipts all have the same collection id
Expand All @@ -64,29 +64,29 @@ pub fn check_and_aggregate_receipts(
let prev_data_service = previous_rav.message.dataService;
let prev_service_provider = previous_rav.message.serviceProvider;
if prev_id != collection_id {
return Err(tap_core::Error::RavCollectionIdMismatch {
return Err(graph_tally_core::Error::RavCollectionIdMismatch {
prev_id: format!("{prev_id:#X}"),
new_id: format!("{collection_id:#X}"),
}
.into());
}
if prev_payer != payer {
return Err(tap_core::Error::RavCollectionIdMismatch {
return Err(graph_tally_core::Error::RavCollectionIdMismatch {
prev_id: format!("{prev_id:#X}"),
new_id: format!("{collection_id:#X}"),
}
.into());
}

if prev_data_service != data_service {
return Err(tap_core::Error::RavCollectionIdMismatch {
return Err(graph_tally_core::Error::RavCollectionIdMismatch {
prev_id: format!("{prev_id:#X}"),
new_id: format!("{collection_id:#X}"),
}
.into());
}
if prev_service_provider != service_provider {
return Err(tap_core::Error::RavCollectionIdMismatch {
return Err(graph_tally_core::Error::RavCollectionIdMismatch {
prev_id: format!("{prev_id:#X}"),
new_id: format!("{collection_id:#X}"),
}
Expand Down Expand Up @@ -115,7 +115,7 @@ fn check_signature_is_from_one_of_addresses<M: SolStruct>(
) -> Result<()> {
let recovered_address = message.recover_signer(domain_separator)?;
if !accepted_addresses.contains(&recovered_address) {
bail!(tap_core::Error::InvalidRecoveredSigner {
bail!(graph_tally_core::Error::InvalidRecoveredSigner {
address: recovered_address,
});
}
Expand All @@ -132,16 +132,16 @@ fn check_collection_id(
for receipt in receipts.iter() {
let receipt = &receipt.message;
if receipt.collection_id != collection_id {
return Err(tap_core::Error::RavCollectionIdNotUniform.into());
return Err(graph_tally_core::Error::RavCollectionIdNotUniform.into());
}
if receipt.payer != payer {
return Err(tap_core::Error::RavCollectionIdNotUniform.into());
return Err(graph_tally_core::Error::RavCollectionIdNotUniform.into());
}
if receipt.data_service != data_service {
return Err(tap_core::Error::RavCollectionIdNotUniform.into());
return Err(graph_tally_core::Error::RavCollectionIdNotUniform.into());
}
if receipt.service_provider != service_provider {
return Err(tap_core::Error::RavCollectionIdNotUniform.into());
return Err(graph_tally_core::Error::RavCollectionIdNotUniform.into());
}
}
Ok(())
Expand All @@ -152,7 +152,7 @@ fn check_signatures_unique(receipts: &[Eip712SignedMessage<Receipt>]) -> Result<
for receipt in receipts.iter() {
let signature = receipt.unique_id();
if !receipt_signatures.insert(signature) {
return Err(tap_core::Error::DuplicateReceiptSignature(format!(
return Err(graph_tally_core::Error::DuplicateReceiptSignature(format!(
"{:?}",
receipt.unique_id()
))
Expand All @@ -170,7 +170,7 @@ fn check_receipt_timestamps(
for receipt in receipts.iter() {
let receipt = &receipt.message;
if previous_rav.message.timestampNs >= receipt.timestamp_ns {
return Err(tap_core::Error::ReceiptTimestampLowerThanRav {
return Err(graph_tally_core::Error::ReceiptTimestampLowerThanRav {
rav_ts: previous_rav.message.timestampNs,
receipt_ts: receipt.timestamp_ns,
}
Expand All @@ -184,9 +184,9 @@ fn check_receipt_timestamps(

#[cfg(test)]
mod tests {
use graph_tally_core::{signed_message::Eip712SignedMessage, tap_eip712_domain};
use graph_tally_graph::{Receipt, ReceiptAggregateVoucher};
use rstest::*;
use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain};
use tap_graph::{Receipt, ReceiptAggregateVoucher};
use thegraph_core::alloy::{
dyn_abi::Eip712Domain,
primitives::{address, fixed_bytes, Address, Bytes, FixedBytes},
Expand Down
File renamed without changes.
34 changes: 17 additions & 17 deletions tap_aggregator/src/grpc.rs → crates/aggregator/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ pub mod uint128 {

pub mod v2 {
use anyhow::anyhow;
use tap_core::signed_message::Eip712SignedMessage;
use graph_tally_core::signed_message::Eip712SignedMessage;
use thegraph_core::alloy::primitives::Bytes;

tonic::include_proto!("tap_aggregator.v2");

impl TryFrom<self::Receipt> for tap_graph::Receipt {
impl TryFrom<self::Receipt> for graph_tally_graph::Receipt {
type Error = anyhow::Error;
fn try_from(receipt: self::Receipt) -> Result<Self, Self::Error> {
Ok(Self {
Expand All @@ -38,7 +38,7 @@ pub mod v2 {
}
}

impl TryFrom<self::SignedReceipt> for tap_graph::SignedReceipt {
impl TryFrom<self::SignedReceipt> for graph_tally_graph::SignedReceipt {
type Error = anyhow::Error;
fn try_from(receipt: self::SignedReceipt) -> Result<Self, Self::Error> {
Ok(Self {
Expand All @@ -51,8 +51,8 @@ pub mod v2 {
}
}

impl From<tap_graph::Receipt> for self::Receipt {
fn from(value: tap_graph::Receipt) -> Self {
impl From<graph_tally_graph::Receipt> for self::Receipt {
fn from(value: graph_tally_graph::Receipt) -> Self {
Self {
collection_id: value.collection_id.as_slice().to_vec(),
timestamp_ns: value.timestamp_ns,
Expand All @@ -65,16 +65,16 @@ pub mod v2 {
}
}

impl From<tap_graph::SignedReceipt> for self::SignedReceipt {
fn from(value: tap_graph::SignedReceipt) -> Self {
impl From<graph_tally_graph::SignedReceipt> for self::SignedReceipt {
fn from(value: graph_tally_graph::SignedReceipt) -> Self {
Self {
message: Some(value.message.into()),
signature: value.signature.as_bytes().to_vec(),
}
}
}

impl TryFrom<self::SignedRav> for Eip712SignedMessage<tap_graph::ReceiptAggregateVoucher> {
impl TryFrom<self::SignedRav> for Eip712SignedMessage<graph_tally_graph::ReceiptAggregateVoucher> {
type Error = anyhow::Error;
fn try_from(voucher: self::SignedRav) -> Result<Self, Self::Error> {
Ok(Self {
Expand All @@ -87,16 +87,16 @@ pub mod v2 {
}
}

impl From<Eip712SignedMessage<tap_graph::ReceiptAggregateVoucher>> for self::SignedRav {
fn from(voucher: Eip712SignedMessage<tap_graph::ReceiptAggregateVoucher>) -> Self {
impl From<Eip712SignedMessage<graph_tally_graph::ReceiptAggregateVoucher>> for self::SignedRav {
fn from(voucher: Eip712SignedMessage<graph_tally_graph::ReceiptAggregateVoucher>) -> Self {
Self {
signature: voucher.signature.as_bytes().to_vec(),
message: Some(voucher.message.into()),
}
}
}

impl TryFrom<self::ReceiptAggregateVoucher> for tap_graph::ReceiptAggregateVoucher {
impl TryFrom<self::ReceiptAggregateVoucher> for graph_tally_graph::ReceiptAggregateVoucher {
type Error = anyhow::Error;
fn try_from(voucher: self::ReceiptAggregateVoucher) -> Result<Self, Self::Error> {
Ok(Self {
Expand All @@ -114,8 +114,8 @@ pub mod v2 {
}
}

impl From<tap_graph::ReceiptAggregateVoucher> for self::ReceiptAggregateVoucher {
fn from(voucher: tap_graph::ReceiptAggregateVoucher) -> Self {
impl From<graph_tally_graph::ReceiptAggregateVoucher> for self::ReceiptAggregateVoucher {
fn from(voucher: graph_tally_graph::ReceiptAggregateVoucher) -> Self {
Self {
collection_id: voucher.collectionId.to_vec(),
timestamp_ns: voucher.timestampNs,
Expand All @@ -130,8 +130,8 @@ pub mod v2 {

impl self::RavRequest {
pub fn new(
receipts: Vec<tap_graph::SignedReceipt>,
previous_rav: Option<tap_graph::SignedRav>,
receipts: Vec<graph_tally_graph::SignedReceipt>,
previous_rav: Option<graph_tally_graph::SignedRav>,
) -> Self {
Self {
receipts: receipts.into_iter().map(Into::into).collect(),
Expand All @@ -141,8 +141,8 @@ pub mod v2 {
}

impl self::RavResponse {
pub fn signed_rav(mut self) -> anyhow::Result<tap_graph::SignedRav> {
let signed_rav: tap_graph::SignedRav = self
pub fn signed_rav(mut self) -> anyhow::Result<graph_tally_graph::SignedRav> {
let signed_rav: graph_tally_graph::SignedRav = self
.rav
.take()
.ok_or(anyhow!("Couldn't find rav"))?
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions tap_aggregator/src/main.rs → crates/aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use std::{collections::HashSet, str::FromStr, time::Duration};

use anyhow::Result;
use clap::Parser;
use graph_tally_aggregator::{metrics, server};
use graph_tally_core::tap_eip712_domain;
use log::{debug, info};
use tap_aggregator::{metrics, server};
use tap_core::tap_eip712_domain;
use thegraph_core::alloy::{
dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner,
};
Expand Down
File renamed without changes.
12 changes: 6 additions & 6 deletions tap_aggregator/src/server.rs → crates/aggregator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::{collections::HashSet, fmt::Debug, str::FromStr, time::Duration};

use anyhow::Result;
use axum::{error_handling::HandleError, routing::post_service, BoxError, Router};
use graph_tally_core::signed_message::Eip712SignedMessage;
use graph_tally_graph::{Receipt, ReceiptAggregateVoucher};
use hyper::StatusCode;
use jsonrpsee::{
proc_macros::rpc,
Expand All @@ -10,8 +12,6 @@ use jsonrpsee::{
use lazy_static::lazy_static;
use log::{error, info};
use prometheus::{register_counter, register_int_counter, Counter, IntCounter};
use tap_core::signed_message::Eip712SignedMessage;
use tap_graph::{Receipt, ReceiptAggregateVoucher};
use thegraph_core::alloy::{
dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner,
};
Expand Down Expand Up @@ -80,7 +80,7 @@ lazy_static! {
///
/// Note that because of the way the `rpc` macro works, we cannot document the RpcServer trait here.
/// (So even this very docstring will not appear in the generated documentation...)
/// As a result, we document the JSON-RPC API in the `tap_aggregator/README.md` file.
/// As a result, we document the JSON-RPC API in the `graph_tally_aggregator/README.md` file.
/// Do not forget to update the documentation there if you make any changes to the JSON-RPC API.
#[rpc(server)]
pub trait Rpc {
Expand Down Expand Up @@ -203,7 +203,7 @@ impl v2::tap_aggregator_server::TapAggregator for RpcImpl {
request: Request<v2::RavRequest>,
) -> Result<Response<v2::RavResponse>, Status> {
let rav_request = request.into_inner();
let receipts: Vec<tap_graph::SignedReceipt> = rav_request
let receipts: Vec<graph_tally_graph::SignedReceipt> = rav_request
.receipts
.into_iter()
.map(TryFrom::try_from)
Expand Down Expand Up @@ -479,10 +479,10 @@ fn produce_kafka_records<K: Debug>(
mod tests {
use std::{collections::HashSet, str::FromStr, time::Duration};

use graph_tally_core::{signed_message::Eip712SignedMessage, tap_eip712_domain};
use graph_tally_graph::{Receipt, ReceiptAggregateVoucher};
use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params};
use rstest::*;
use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain};
use tap_graph::{Receipt, ReceiptAggregateVoucher};
use thegraph_core::alloy::{
dyn_abi::Eip712Domain,
primitives::{Address, FixedBytes},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{collections::HashSet, str::FromStr, time::Duration};

use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params};
use tap_aggregator::{
use graph_tally_aggregator::{
grpc::v2::{tap_aggregator_client::TapAggregatorClient, RavRequest},
jsonrpsee_helpers::JsonRpcResponse,
server,
};
use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain};
use tap_graph::{Receipt, ReceiptAggregateVoucher};
use graph_tally_core::{signed_message::Eip712SignedMessage, tap_eip712_domain};
use graph_tally_graph::{Receipt, ReceiptAggregateVoucher};
use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params};
use thegraph_core::alloy::{
primitives::{Address, FixedBytes},
signers::local::PrivateKeySigner,
Expand Down Expand Up @@ -67,11 +67,11 @@ async fn aggregation_test() {

let rav_request = RavRequest::new(receipts.clone(), None);
let res = client.aggregate_receipts(rav_request).await.unwrap();
let signed_rav: tap_graph::SignedRav = res.into_inner().signed_rav().unwrap();
let signed_rav: graph_tally_graph::SignedRav = res.into_inner().signed_rav().unwrap();

let sender_aggregator = HttpClientBuilder::default().build(&endpoint).unwrap();

let previous_rav: Option<tap_graph::SignedRav> = None;
let previous_rav: Option<graph_tally_graph::SignedRav> = None;

let response: JsonRpcResponse<Eip712SignedMessage<ReceiptAggregateVoucher>> = sender_aggregator
.request(
Expand Down
File renamed without changes.
Loading
Loading