Skip to content

Commit

Permalink
as: Change paused webhook response to not acceptable
Browse files Browse the repository at this point in the history
  • Loading branch information
Vlad Vitan committed Sep 5, 2024
1 parent c5a237c commit a0c23c0
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 3 deletions.
4 changes: 2 additions & 2 deletions pkg/applicationserver/io/web/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (w *webhooks) handleDown(
"webhook_id", hookID.WebhookId,
))

hook, err := w.registry.Get(ctx, hookID, []string{"format"})
hook, err := w.registry.Get(ctx, hookID, []string{"format", "paused"})
if err != nil {
webhandlers.Error(res, req, err)
return
Expand All @@ -203,7 +203,7 @@ func (w *webhooks) handleDown(
}
if hook.Paused {
logger.Debug("Webhook is paused")
res.WriteHeader(http.StatusAccepted)
res.WriteHeader(http.StatusNotAcceptable)
return
}
format, ok := formats[hook.Format]
Expand Down
86 changes: 85 additions & 1 deletion pkg/applicationserver/io/web/webhooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func TestWebhooks(t *testing.T) {
})

//nolint:paralleltest
t.Run("Paused", func(t *testing.T) {
t.Run("PausedUplink", func(t *testing.T) {
_, ctx := test.New(t)

// Create an active webhook.
Expand Down Expand Up @@ -691,4 +691,88 @@ func TestWebhooks(t *testing.T) {
// Webhook was not received.
}
})

//nolint:paralleltest
t.Run("PausedDownlink", func(t *testing.T) {
is, isAddr, closeIS := mockis.New(ctx)
defer closeIS()

is.ApplicationRegistry().Add(ctx, registeredApplicationID, registeredApplicationKey,
ttnpb.Right_RIGHT_APPLICATION_SETTINGS_BASIC,
ttnpb.Right_RIGHT_APPLICATION_DEVICES_READ,
ttnpb.Right_RIGHT_APPLICATION_DEVICES_WRITE,
ttnpb.Right_RIGHT_APPLICATION_TRAFFIC_READ,
ttnpb.Right_RIGHT_APPLICATION_TRAFFIC_DOWN_WRITE)
conf := &component.Config{
ServiceBase: config.ServiceBase{
GRPC: config.GRPC{
Listen: ":0",
AllowInsecureForCredentials: true,
},
Cluster: cluster.Config{
IdentityServer: isAddr,
},
},
}

_, err := registry.Set(ctx, ids, nil,
func(_ *ttnpb.ApplicationWebhook) (*ttnpb.ApplicationWebhook, []string, error) {
return &ttnpb.ApplicationWebhook{
Ids: ids,
BaseUrl: "https://myapp.com/api/ttn/v3/{/appID,devID}",
Format: "json",
Paused: true,
UplinkMessage: &ttnpb.ApplicationWebhook_Message{Path: "/"},
},
[]string{
"ids.application_ids",
"ids.webhook_id",
"base_url",
"format",
"paused",
"uplink_message",
"field_mask",
}, nil
})
if err != nil {
t.Fatalf("Failed to set webhook in registry: %s", err)
}

sinkCh := make(chan *http.Request, 1)
testSink := mocksink.New(sinkCh)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

c := componenttest.NewComponent(t, conf)
as := mock.NewServer(c)
w, err := web.NewWebhooks(ctx, as, registry, testSink, downlinks)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
c.RegisterWeb(w)
componenttest.StartComponent(t, c)
defer c.Close()

mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY)

// Check the status error code when scheduling downlink to a paused webhook.
url := fmt.Sprintf("/api/v3/as/applications/%s/webhooks/%s/devices/%s/down/replace",
ids.ApplicationIds.ApplicationId, ids.WebhookId, registeredDeviceID.DeviceId,
)
body := bytes.NewReader([]byte(`{"downlinks":[]}`))
req := httptest.NewRequest(http.MethodPost, url, body)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", registeredApplicationKey))

res := httptest.NewRecorder()
c.ServeHTTP(res, req)
a.So(res.Code, should.Equal, http.StatusNotAcceptable)
downlinks, err := as.DownlinkQueueList(ctx, registeredDeviceID)
if !a.So(err, should.BeNil) {
t.FailNow()
}

a.So(downlinks, should.Resemble, []*ttnpb.ApplicationDownlink{})
})
}

0 comments on commit a0c23c0

Please sign in to comment.