Skip to content

Commit

Permalink
feat: signed event validation
Browse files Browse the repository at this point in the history
  • Loading branch information
dav1do committed Sep 12, 2024
1 parent c78a262 commit 42d2ccb
Show file tree
Hide file tree
Showing 7 changed files with 487 additions and 24 deletions.
2 changes: 1 addition & 1 deletion event-svc/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl OrderingState {
/// Relies on `add_stream_event` to handle updating the internal state.
fn add_inserted_events(&mut self, events: Vec<DiscoveredEvent>) {
for ev in events {
let stream_cid = ev.stream_cid();
let stream_cid = ev.id;
let event = ev.into();
self.add_stream_event(stream_cid, event);
}
Expand Down
18 changes: 5 additions & 13 deletions event-svc/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::{
migration::Migrator,
order_events::OrderEvents,
ordering_task::{DeliverableTask, OrderingTask},
validator::{EventValidator, UnvalidatedEvent, ValidatedEvents},
validator::{SignedEventValidator, UnvalidatedEvent, ValidatedEvents},
};
use async_trait::async_trait;
use ceramic_core::{EventId, Network, SerializeExt};
Expand Down Expand Up @@ -222,7 +222,8 @@ impl EventService {
valid,
pending,
invalid,
} = EventValidator::validate_events(&self.pool, deliverable_req, to_validate).await?;
} = SignedEventValidator::validate_events(&self.pool, deliverable_req, to_validate)
.await?;
invalid_events.extend(invalid);
(valid, pending)
} else {
Expand Down Expand Up @@ -469,7 +470,7 @@ impl EventService {
self.send_discovered_event(DiscoveredEvent {
cid: *ev.cid(),
prev: ev.event().prev().copied(),
id: Some(*ev.event().id()),
id: *ev.event().id(),
known_deliverable: ev.deliverable(),
})
.await?;
Expand Down Expand Up @@ -520,16 +521,7 @@ pub(crate) struct DiscoveredEvent {
/// The prev event that this event builds on.
pub(crate) prev: Option<Cid>,
/// The Cid of the init event that identifies the stream this event belongs to.
pub(crate) id: Option<Cid>,
pub(crate) id: Cid,
/// Whether this event is known to already be deliverable.
pub(crate) known_deliverable: bool,
}

impl DiscoveredEvent {
pub(crate) fn stream_cid(&self) -> Cid {
match self.id {
None => self.cid, // init event
Some(id) => id,
}
}
}
61 changes: 54 additions & 7 deletions event-svc/src/event/validator/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ use ceramic_core::{Cid, EventId};
use ceramic_event::unvalidated;
use ipld_core::ipld::Ipld;
use recon::ReconItem;
use tokio::try_join;

use crate::{
event::{service::InvalidItem, DeliverableRequirement},
event::{service::InvalidItem, validator::signed::SignedValidator, DeliverableRequirement},
store::{EventInsertable, SqlitePool},
Result,
};

use super::grouped::{GroupedEvents, SignedEvents, Time, Unsigned};

#[derive(Debug)]
pub struct ValidatedEvents {
/// These events are valid
Expand Down Expand Up @@ -113,7 +116,7 @@ impl ValidatedEvents {
}

#[derive(Debug)]
pub struct EventValidator<'a> {
pub struct SignedEventValidator<'a> {
pool: &'a SqlitePool,
/// Whether to require the init event exists, and if we should check the the signature against
/// the init controller. True in all cases except a data migration.
Expand All @@ -122,7 +125,7 @@ pub struct EventValidator<'a> {
check_exp: bool,
}

impl<'a> EventValidator<'a> {
impl<'a> SignedEventValidator<'a> {
fn new(pool: &'a SqlitePool, deliverable_req: DeliverableRequirement) -> Self {
let (require_init_event, check_exp) = match deliverable_req {
DeliverableRequirement::Immediate => (true, true),
Expand All @@ -135,15 +138,59 @@ impl<'a> EventValidator<'a> {
check_exp,
}
}

pub(crate) async fn validate_events(
_pool: &'a SqlitePool,
_deliverable_req: DeliverableRequirement,
pool: &'a SqlitePool,
deliverable_req: DeliverableRequirement,
parsed_events: Vec<UnvalidatedEvent>,
) -> Result<ValidatedEvents> {
// let _validator = Self::new(pool, deliverable_req);
let validator = Self::new(pool, deliverable_req);

let mut validated = ValidatedEvents::new_with_expected_valid(parsed_events.len());
// partition the events by type of validation needed and delegate to validators
let grouped = GroupedEvents::from(parsed_events);

let (validated_signed, validated_time) = try_join!(
validator.validate_signed_events(grouped.signed, grouped.unsigned),
validator.validate_time_events(grouped.time)
)?;

validated.extend_with(validated_signed);
validated.extend_with(validated_time);

if !validated.invalid.is_empty() {
tracing::warn!(count=%validated.invalid.len(), "invalid events discovered");
}
Ok(validated)
}

async fn validate_signed_events(
&self,
events: SignedEvents,
unsigned: Vec<Unsigned>,
) -> Result<ValidatedEvents> {
let opts = if self.check_exp {
ceramic_validation::VerifyJwsOpts::default()
} else {
ceramic_validation::VerifyJwsOpts {
at_time: ceramic_validation::AtTime::SkipTimeChecks,
..Default::default()
}
};
SignedValidator::validate_events(
self.pool,
self.require_init_event,
&opts,
events,
unsigned,
)
.await
}

async fn validate_time_events(&self, events: Vec<Time>) -> Result<ValidatedEvents> {
// TODO: IMPLEMENT THIS
Ok(ValidatedEvents {
valid: parsed_events.into_iter().map(|t| t.into()).collect(),
valid: events.into_iter().map(|t| t.into_inner().into()).collect(),
pending: Vec::new(),
invalid: Vec::new(),
})
Expand Down
133 changes: 133 additions & 0 deletions event-svc/src/event/validator/grouped.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use ceramic_event::unvalidated::{self, signed};
use ipld_core::ipld::Ipld;

use super::UnvalidatedEvent;

#[derive(Debug)]
pub enum ValidationNeeded {
SignedInit(SignedInit),
SignedData(SignedData),
Time(Time),
None(Unsigned),
}

impl From<UnvalidatedEvent> for ValidationNeeded {
fn from(value: UnvalidatedEvent) -> Self {
match value.event.as_ref() {
unvalidated::Event::Time(_) => Self::Time(Time(value)),
unvalidated::Event::Signed(signed) => match signed.payload() {
unvalidated::Payload::Data(_) => Self::SignedData(SignedData(value)),
unvalidated::Payload::Init(_) => Self::SignedInit(SignedInit(value)),
},
unvalidated::Event::Unsigned(_) => Self::None(Unsigned(value)),
}
}
}

/// This is a simple macro to create a tuple struct around UnvalidatedEvent and allow access
/// to the inner value. It's purpose is to support implementing `From<UnvalidatedEvent> for ValidationNeeded`
/// and get the correct structs that can be required in fn signatures, but still dereferenced as the
/// original event type without having to match on all the branches when we require it to be a specific variant.
macro_rules! unvalidated_wrapper {
($name: ident) => {
#[derive(Debug)]
pub(crate) struct $name(UnvalidatedEvent);

impl $name {
#[allow(dead_code)]
/// Get the inner tuple struct value
pub fn into_inner(self) -> UnvalidatedEvent {
self.0
}

#[allow(dead_code)]
/// Get the inner tuple struct value by reference
pub fn as_inner(&self) -> &UnvalidatedEvent {
&self.0
}
}
};
}

/// Create an as `as_signed` function that panics for unsigned events.
/// Will be derived on our signed structs to
macro_rules! as_signed {
($name: ident) => {
impl $name {
pub fn as_signed(&self) -> &signed::Event<Ipld> {
match self.0.event.as_ref() {
unvalidated::Event::Time(_) => unreachable!("time event is not signed"),
unvalidated::Event::Signed(s) => s,
unvalidated::Event::Unsigned(_) => unreachable!("unsigned event is not signed"),
}
}
}
};
}

impl Time {
#[allow(dead_code)]
pub fn as_time(&self) -> &unvalidated::TimeEvent {
match self.0.event.as_ref() {
unvalidated::Event::Time(t) => t,
unvalidated::Event::Signed(_) => unreachable!("signed event event is not time"),
unvalidated::Event::Unsigned(_) => unreachable!("unsigned event is not time"),
}
}
}

// Generate the tuple structs to use in the `ValidationNeeded` enum variants
unvalidated_wrapper!(SignedData);
unvalidated_wrapper!(SignedInit);
unvalidated_wrapper!(Time);
unvalidated_wrapper!(Unsigned);

// Add `as_signed(&self) -> &signed::Event<Ipld>` functions to the signed types
as_signed!(SignedData);
as_signed!(SignedInit);

#[derive(Debug)]
pub struct GroupedEvents {
pub time: Vec<Time>,
pub unsigned: Vec<Unsigned>,
pub signed: SignedEvents,
}

#[derive(Debug)]
pub struct SignedEvents {
pub data: Vec<SignedData>,
pub init: Vec<SignedInit>,
}

impl From<Vec<UnvalidatedEvent>> for GroupedEvents {
fn from(value: Vec<UnvalidatedEvent>) -> Self {
let mut grouped = GroupedEvents::new(value.len() / 2);

value
.into_iter()
.for_each(|v| grouped.add(ValidationNeeded::from(v)));
grouped
}
}

impl GroupedEvents {
fn new(capacity: usize) -> Self {
Self {
time: Vec::with_capacity(capacity),
unsigned: Vec::with_capacity(capacity),
signed: SignedEvents {
data: Vec::with_capacity(capacity),
init: Vec::with_capacity(capacity),
},
}
}

fn add(&mut self, new: ValidationNeeded) {
match new {
ValidationNeeded::SignedInit(v) => self.signed.init.push(v),
ValidationNeeded::SignedData(v) => self.signed.data.push(v),
ValidationNeeded::Time(v) => self.time.push(v),
ValidationNeeded::None(v) => self.unsigned.push(v),
}
}
}
5 changes: 3 additions & 2 deletions event-svc/src/event/validator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[allow(dead_code)]
mod event;
mod grouped;
mod signed;

// ValidatedEvent is only used in tests currently
#[allow(unused_imports)]
pub use event::{EventValidator, UnvalidatedEvent, ValidatedEvent, ValidatedEvents};
pub use event::{SignedEventValidator, UnvalidatedEvent, ValidatedEvent, ValidatedEvents};
Loading

0 comments on commit 42d2ccb

Please sign in to comment.