Skip to content

Commit

Permalink
Add support for public extensions in Reports.
Browse files Browse the repository at this point in the history
  • Loading branch information
jhoyla committed Jan 8, 2025
1 parent f839d01 commit 5078b99
Show file tree
Hide file tree
Showing 9 changed files with 546 additions and 19 deletions.
258 changes: 258 additions & 0 deletions crates/daphne-server/tests/e2e/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ async fn leader_upload(version: DapVersion) {
report_metadata: ReportMetadata {
id: ReportId([1; 16]),
time: t.now,
public_extensions: match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(Vec::new()),
},
},
public_share: b"public share".to_vec(),
encrypted_input_shares: [
Expand Down Expand Up @@ -533,6 +537,150 @@ async fn leader_upload_taskprov_wrong_version(version: DapVersion) {

async_test_versions!(leader_upload_taskprov_wrong_version);

#[tokio::test]
async fn leader_upload_taskprov_public() {
let version = DapVersion::Latest;
let t = TestRunner::default_with_version(version).await;
let client = t.http_client();
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();

let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
version,
min_batch_size: 10,
query: DapBatchMode::TimeInterval,
leader_url: t.task_config.leader_url.clone(),
helper_url: t.task_config.helper_url.clone(),
..Default::default()
}
.to_config_with_taskprov(
b"cool task".to_vec(),
t.now,
daphne::roles::aggregator::TaskprovConfig {
hpke_collector_config: &t.taskprov_collector_hpke_receiver.config,
vdaf_verify_key_init: &t.taskprov_vdaf_verify_key_init,
},
)
.unwrap();

let mut report = task_config
.vdaf
.produce_report(
&hpke_config_list,
t.now + 1,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
version,
)
.unwrap();
report.report_metadata.public_extensions = Some(vec![Extension::Taskprov]);
t.leader_request_expect_ok(
client,
&format!("tasks/{}/reports", task_id.to_base64url()),
&http::Method::POST,
DapMediaType::Report,
Some(
&taskprov_advertisement
.serialize_to_header_value(version)
.unwrap(),
),
report.get_encoded_with_param(&version).unwrap(),
)
.await
.unwrap();
}

#[tokio::test]
async fn leader_upload_taksprov_public_errors() {
let version = DapVersion::Latest;
let t = TestRunner::default_with_version(version).await;
let client = t.http_client();
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();

let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
version,
min_batch_size: 10,
query: DapBatchMode::TimeInterval,
leader_url: t.task_config.leader_url.clone(),
helper_url: t.task_config.helper_url.clone(),
..Default::default()
}
.to_config_with_taskprov(
b"cool task".to_vec(),
t.now,
daphne::roles::aggregator::TaskprovConfig {
hpke_collector_config: &t.taskprov_collector_hpke_receiver.config,
vdaf_verify_key_init: &t.taskprov_vdaf_verify_key_init,
},
)
.unwrap();

// Repeated public extension
let mut report = task_config
.vdaf
.produce_report(
&hpke_config_list,
t.now + 1,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
version,
)
.unwrap();
report.report_metadata.public_extensions = Some(vec![Extension::Taskprov, Extension::Taskprov]);
t.leader_request_expect_abort(
client,
None,
&format!("tasks/{}/reports", task_id.to_base64url()),
&http::Method::POST,
DapMediaType::Report,
Some(
&taskprov_advertisement
.serialize_to_header_value(version)
.unwrap(),
),
report.get_encoded_with_param(&version).unwrap(),
400,
"invalidMessage",
)
.await
.unwrap();

// Unsupported public extension
let mut report = task_config
.vdaf
.produce_report(
&hpke_config_list,
t.now + 1,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
version,
)
.unwrap();
report.report_metadata.public_extensions = Some(vec![
Extension::Taskprov,
Extension::NotImplemented {
typ: 3,
payload: b"ignore".to_vec(),
},
]);
t.leader_request_expect_abort(
client,
None,
&format!("tasks/{}/reports", task_id.to_base64url()),
&http::Method::POST,
DapMediaType::Report,
Some(
&taskprov_advertisement
.serialize_to_header_value(version)
.unwrap(),
),
report.get_encoded_with_param(&version).unwrap(),
400,
"unsupportedExtension",
)
.await
.unwrap();
}

async fn internal_leader_process(version: DapVersion) {
let t = TestRunner::default_with_version(version).await;
let path = t.upload_path();
Expand Down Expand Up @@ -1348,6 +1496,116 @@ async fn leader_selected() {
.unwrap();
}

#[tokio::test]
async fn leader_collect_taskprov_repeated_abort() {
let version = DapVersion::Latest;
const DAP_TASKPROV_COLLECTOR_TOKEN: &str = "I-am-the-collector";
let t = TestRunner::default_with_version(version).await;
let batch_interval = t.batch_interval();

let client = t.http_client();
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();

let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
version,
min_batch_size: 10,
query: DapBatchMode::TimeInterval,
leader_url: t.task_config.leader_url.clone(),
helper_url: t.task_config.helper_url.clone(),
..Default::default()
}
.to_config_with_taskprov(
b"cool task".to_vec(),
t.now,
daphne::roles::aggregator::TaskprovConfig {
hpke_collector_config: &t.taskprov_collector_hpke_receiver.config,
vdaf_verify_key_init: &t.taskprov_vdaf_verify_key_init,
},
)
.unwrap();

let path = TestRunner::upload_path_for_task(&task_id);
let method = &Method::POST;
// The reports are uploaded in the background.
let mut rng = thread_rng();
for _ in 0..t.task_config.min_batch_size {
let extensions = vec![Extension::Taskprov];
let now = rng.gen_range(TestRunner::report_interval(&batch_interval));
t.leader_request_expect_ok(
client,
&path,
method,
DapMediaType::Report,
Some(
&taskprov_advertisement
.serialize_to_header_value(version)
.unwrap(),
),
{
let mut report = task_config
.vdaf
.produce_report_with_extensions(
&hpke_config_list,
now,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
extensions,
version,
)
.unwrap();
report.report_metadata.public_extensions = Some(vec![Extension::Taskprov]);
report.get_encoded_with_param(&version).unwrap()
},
)
.await
.unwrap();
}

let agg_param = DapAggregationParam::Empty;

// Get the collect URI.
let collect_req = CollectionReq {
query: Query::TimeInterval { batch_interval },
agg_param: agg_param.get_encoded().unwrap(),
};
let collect_uri = t
.leader_post_collect_using_token(
client,
DAP_TASKPROV_COLLECTOR_TOKEN,
Some(&taskprov_advertisement),
Some(&task_id),
collect_req.get_encoded_with_param(&t.version).unwrap(),
)
.await
.unwrap();
println!("collect_uri: {collect_uri}");

// Poll the collect URI before the CollectResp is ready.
let resp = t
.poll_collection_url_using_token(client, &collect_uri, DAP_TASKPROV_COLLECTOR_TOKEN)
.await
.unwrap();
#[expect(clippy::format_in_format_args)]
{
assert_eq!(
resp.status(),
202,
"response: {} {}",
format!("{resp:?}"),
resp.text().await.unwrap()
);
}

// The reports are aggregated in the background.
let agg_telem = t.internal_process(client).await.unwrap();
assert_eq!(
agg_telem.reports_processed, task_config.min_batch_size,
"reports processed"
);
assert_eq!(agg_telem.reports_aggregated, 0, "reports aggregated");
assert_eq!(agg_telem.reports_collected, 0, "reports collected");
}

async fn leader_collect_taskprov_ok(version: DapVersion) {
const DAP_TASKPROV_COLLECTOR_TOKEN: &str = "I-am-the-collector";
let t = TestRunner::default_with_version(version).await;
Expand Down
24 changes: 23 additions & 1 deletion crates/daphne/src/error/aborts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ pub enum DapAbort {
/// Unrecognized DAP task. Sent in response to a request indicating an unrecognized task ID.
#[error("unrecognizedTask")]
UnrecognizedTask { task_id: TaskId },

/// Unsupported Extension. Sent in response to a report upload with an unsupported extension.
#[error("unsupportedExtension")]
UnsupportedExtension { detail: String, task_id: TaskId },
}

impl DapAbort {
Expand All @@ -116,7 +120,8 @@ impl DapAbort {
| Self::InvalidBatchSize { detail, task_id }
| Self::BatchModeMismatch { detail, task_id }
| Self::UnauthorizedRequest { detail, task_id }
| Self::InvalidMessage { detail, task_id } => (
| Self::InvalidMessage { detail, task_id }
| Self::UnsupportedExtension { detail, task_id } => (
Some(task_id),
Some(detail),
None,
Expand Down Expand Up @@ -259,6 +264,20 @@ impl DapAbort {
})
}

pub fn unsupported_extension(
task_id: &TaskId,
unknown_extensions: &[u16],
) -> Result<Self, DapError> {
let detail = serde_json::to_string(&unknown_extensions);
match detail {
Ok(s) => Ok(Self::UnsupportedExtension {
detail: s,
task_id: *task_id,
}),
Err(x) => Err(fatal_error!(err = %x,)),
}
}

fn title_and_type(&self) -> (&'static str, Option<String>) {
let (title, dap_abort_type) = match self {
Self::BatchInvalid { .. } => ("Batch boundary check failed", Some(self.to_string())),
Expand Down Expand Up @@ -300,6 +319,9 @@ impl DapAbort {
Some(self.to_string()),
),
Self::BadRequest(..) => ("Bad request", None),
Self::UnsupportedExtension { .. } => {
("Unsupported extensions in report", Some(self.to_string()))
}
};

(
Expand Down
8 changes: 8 additions & 0 deletions crates/daphne/src/hpke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,10 @@ mod test {
report_metadata: &ReportMetadata {
id: ReportId(rand::random()),
time: rand::random(),
public_extensions: match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(Vec::new()),
},
},
};
let plaintext = b"plaintext";
Expand Down Expand Up @@ -703,6 +707,10 @@ mod test {
let report_metadata = &ReportMetadata {
id: ReportId(rand::random()),
time: rand::random(),
public_extensions: match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(Vec::new()),
},
};
let public_share = &vec![rand::random(); (0..100).choose(&mut rand::thread_rng()).unwrap()];

Expand Down
Loading

0 comments on commit 5078b99

Please sign in to comment.