@@ -31,6 +31,7 @@ type SubscriptionFinder interface {
3131
3232type DeliveryPublisher interface {
3333 PublishEmailJob (ctx context.Context , job workertypes.EmailDeliveryJob ) error
34+ PublishWebhookJob (ctx context.Context , job workertypes.WebhookDeliveryJob ) error
3435}
3536
3637// SummaryParser abstracts the logic for parsing the event summary blob.
@@ -56,17 +57,19 @@ func (d *Dispatcher) ProcessEvent(ctx context.Context,
5657 metadata workertypes.DispatchEventMetadata , summary []byte ) error {
5758 slog .InfoContext (ctx , "processing event" , "event_id" , metadata .EventID , "search_id" , metadata .SearchID )
5859
59- // 1. Generate Delivery Jobs from Event Summary
60+ // 1. Generate Delivery Jobs from Event Summary.
6061 gen := & deliveryJobGenerator {
6162 finder : d .finder ,
6263 metadata : metadata ,
6364 // We pass the raw summary bytes down so it can be attached to the jobs
6465 // without needing to re-marshal the struct.
65- rawSummary : summary ,
66- emailJobs : nil ,
66+ rawSummary : summary ,
67+ emailJobs : nil ,
68+ webhookJobs : nil ,
6769 }
6870
6971 if err := d .parser (gen .rawSummary , gen ); err != nil {
72+
7073 return fmt .Errorf ("failed to parse event summary: %w" , err )
7174 }
7275
@@ -79,11 +82,11 @@ func (d *Dispatcher) ProcessEvent(ctx context.Context,
7982
8083 slog .InfoContext (ctx , "dispatching jobs" , "count" , totalJobs )
8184
82- // 2. Publish Delivery Jobs
85+ // 2. Publish Delivery Jobs.
8386 successCount := 0
8487 failCount := 0
8588
86- // Publish Email Jobs
89+ // Publish Email Jobs.
8790 for _ , job := range gen .emailJobs {
8891 if err := d .publisher .PublishEmailJob (ctx , job ); err != nil {
8992 slog .ErrorContext (ctx , "failed to publish email job" ,
@@ -94,8 +97,16 @@ func (d *Dispatcher) ProcessEvent(ctx context.Context,
9497 }
9598 }
9699
97- // TODO: Webhook jobs would be published here similarly
98- // https://github.com/GoogleChrome/webstatus.dev/issues/1859
100+ // Publish Webhook Jobs.
101+ for _ , job := range gen .webhookJobs {
102+ if err := d .publisher .PublishWebhookJob (ctx , job ); err != nil {
103+ slog .ErrorContext (ctx , "failed to publish webhook job" ,
104+ "subscription_id" , job .SubscriptionID , "error" , err )
105+ failCount ++
106+ } else {
107+ successCount ++
108+ }
109+ }
99110
100111 slog .InfoContext (ctx , "dispatch complete" ,
101112 "event_id" , metadata .EventID ,
@@ -112,10 +123,11 @@ func (d *Dispatcher) ProcessEvent(ctx context.Context,
112123
113124// deliveryJobGenerator implements workertypes.SummaryVisitor to generate jobs from V1 summaries.
114125type deliveryJobGenerator struct {
115- finder SubscriptionFinder
116- metadata workertypes.DispatchEventMetadata
117- rawSummary []byte
118- emailJobs []workertypes.EmailDeliveryJob
126+ finder SubscriptionFinder
127+ metadata workertypes.DispatchEventMetadata
128+ rawSummary []byte
129+ emailJobs []workertypes.EmailDeliveryJob
130+ webhookJobs []workertypes.WebhookDeliveryJob
119131}
120132
121133func (g * deliveryJobGenerator ) VisitV1 (s workertypes.EventSummary ) error {
@@ -127,6 +139,7 @@ func (g *deliveryJobGenerator) VisitV1(s workertypes.EventSummary) error {
127139 g .metadata .SearchID ,
128140 g .metadata .Frequency )
129141 if err != nil {
142+
130143 return fmt .Errorf ("failed to find subscribers: %w" , err )
131144 }
132145
@@ -144,7 +157,7 @@ func (g *deliveryJobGenerator) VisitV1(s workertypes.EventSummary) error {
144157 }
145158
146159 // 2. Filter & Create Jobs
147- // Iterate Emails
160+ // Iterate Emails.
148161 for _ , sub := range subscribers .Emails {
149162 if ! shouldNotifyV1 (sub .Triggers , s ) {
150163 continue
@@ -159,17 +172,28 @@ func (g *deliveryJobGenerator) VisitV1(s workertypes.EventSummary) error {
159172 })
160173 }
161174
162- // TODO: Iterate Webhooks when supported.
163- // https://github.com/GoogleChrome/webstatus.dev/issues/1859
175+ // Iterate Webhooks.
176+ for _ , sub := range subscribers .Webhooks {
177+ if ! shouldNotifyV1 (sub .Triggers , s ) {
178+ continue
179+ }
180+ g .webhookJobs = append (g .webhookJobs , workertypes.WebhookDeliveryJob {
181+ SubscriptionID : sub .SubscriptionID ,
182+ WebhookURL : sub .WebhookURL ,
183+ WebhookType : sub .WebhookType ,
184+ SummaryRaw : g .rawSummary ,
185+ Metadata : deliveryMetadata ,
186+ ChannelID : sub .ChannelID ,
187+ Triggers : sub .Triggers ,
188+ })
189+ }
164190
165191 return nil
166192}
167193
168194// JobCount returns the total number of delivery jobs generated.
169195func (g * deliveryJobGenerator ) JobCount () int {
170- // TODO: When we add Webhook jobs, sum them here too.
171-
172- return len (g .emailJobs )
196+ return len (g .emailJobs ) + len (g .webhookJobs )
173197}
174198
175199// shouldNotifyV1 determines if the V1 event summary matches any of the user's triggers.
0 commit comments