Skip to content

Commit

Permalink
Merge pull request #7247 from TheThingsNetwork/feature/pause-applicat…
Browse files Browse the repository at this point in the history
…ion-webhooks

feature: Pause application webhooks
  • Loading branch information
vlasebian authored Sep 5, 2024
2 parents 5caad01 + a0c23c0 commit d259f3f
Show file tree
Hide file tree
Showing 14 changed files with 443 additions and 156 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ For details about compatibility between different releases, see the **Commitment
- `ListGatewaysRequest` and `ListEndDevicesRequest` RPCs have a new `Filter` field that supports an `updated_since` timestamp.
- Preparation for universal rights assigned to users.
- This requires a database schema migration (`ttn-lw-stack is-db migrate`).
- Option to pause application webhooks.

### Deprecated

Expand Down
1 change: 1 addition & 0 deletions api/ttn/lorawan/v3/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2073,6 +2073,7 @@ The NATS provider settings.
| `service_data` | [`ApplicationWebhook.Message`](#ttn.lorawan.v3.ApplicationWebhook.Message) | | |
| `health_status` | [`ApplicationWebhookHealth`](#ttn.lorawan.v3.ApplicationWebhookHealth) | | |
| `field_mask` | [`google.protobuf.FieldMask`](#google.protobuf.FieldMask) | | |
| `paused` | [`bool`](#bool) | | Set to temporarily pause forwarding uplink data to this end point and receiving downlinks from this end point. |

#### Field Rules

Expand Down
8 changes: 8 additions & 0 deletions api/ttn/lorawan/v3/api.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -20844,6 +20844,10 @@
},
"field_mask": {
"type": "string"
},
"paused": {
"type": "boolean",
"description": "Set to temporarily pause forwarding uplink data to this end point and receiving downlinks from this end point."
}
}
},
Expand Down Expand Up @@ -20983,6 +20987,10 @@
},
"field_mask": {
"type": "string"
},
"paused": {
"type": "boolean",
"description": "Set to temporarily pause forwarding uplink data to this end point and receiving downlinks from this end point."
}
}
},
Expand Down
5 changes: 4 additions & 1 deletion api/ttn/lorawan/v3/applicationserver_web.proto
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ message ApplicationWebhook {
reserved 23;
reserved "queue";

// next: 24
// Set to temporarily pause forwarding uplink data to this end point and receiving downlinks from this end point.
bool paused = 24;

// next: 25
}

message ApplicationWebhooks {
Expand Down
14 changes: 13 additions & 1 deletion pkg/applicationserver/io/web/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var webhookFanOutFieldMask = []string{
"join_accept",
"location_solved",
"service_data",
"paused",
"uplink_message",
"uplink_normalized",
}
Expand Down Expand Up @@ -135,6 +136,12 @@ func (w *webhooks) handleUp(ctx context.Context, msg *ttnpb.ApplicationUp) error
Health: hook.HealthStatus,
})
ctx = log.NewContextWithField(ctx, "hook", hook.Ids.WebhookId)

if hook.Paused {
log.FromContext(ctx).Debug("Webhook is paused")
continue
}

f := func(ctx context.Context) error {
req, err := NewRequest(ctx, w.downlinks, msg, hook)
if err != nil {
Expand Down Expand Up @@ -185,7 +192,7 @@ func (w *webhooks) handleDown(
"webhook_id", hookID.WebhookId,
))

hook, err := w.registry.Get(ctx, hookID, []string{"format"})
hook, err := w.registry.Get(ctx, hookID, []string{"format", "paused"})
if err != nil {
webhandlers.Error(res, req, err)
return
Expand All @@ -194,6 +201,11 @@ func (w *webhooks) handleDown(
webhandlers.Error(res, req, errWebhookNotFound.New())
return
}
if hook.Paused {
logger.Debug("Webhook is paused")
res.WriteHeader(http.StatusNotAcceptable)
return
}
format, ok := formats[hook.Format]
if !ok {
webhandlers.Error(res, req, errFormatNotFound.WithAttributes("format", hook.Format))
Expand Down
201 changes: 201 additions & 0 deletions pkg/applicationserver/io/web/webhooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func TestWebhooks(t *testing.T) {
ServiceData: &ttnpb.ApplicationWebhook_Message{
Path: tc.prefix + "service/data",
},
Paused: false,
FieldMask: ttnpb.FieldMask(
"correlation_ids",
"end_device_ids",
Expand All @@ -162,6 +163,7 @@ func TestWebhooks(t *testing.T) {
"up.downlink_sent",
"up.join_accept",
"up.location_solved",
"up.paused",
"up.service_data",
"up.uplink_message",
"up.uplink_normalized",
Expand All @@ -183,6 +185,7 @@ func TestWebhooks(t *testing.T) {
"join_accept",
"location_solved",
"service_data",
"paused",
"uplink_message",
"uplink_normalized",
}, nil
Expand Down Expand Up @@ -574,4 +577,202 @@ func TestWebhooks(t *testing.T) {
}
})
})

//nolint:paralleltest
t.Run("PausedUplink", func(t *testing.T) {
_, ctx := test.New(t)

// Create an active webhook.
_, err := registry.Set(ctx, ids, nil,
func(_ *ttnpb.ApplicationWebhook) (*ttnpb.ApplicationWebhook, []string, error) {
return &ttnpb.ApplicationWebhook{
Ids: ids,
BaseUrl: "https://myapp.com/api/ttn/v3/{/appID,devID}",
Format: "json",
Paused: false,
UplinkMessage: &ttnpb.ApplicationWebhook_Message{Path: "/"},
},
[]string{
"ids.application_ids",
"ids.webhook_id",
"base_url",
"format",
"paused",
"uplink_message",
"field_mask",
}, nil
})
if err != nil {
t.Fatalf("Failed to set webhook in registry: %s", err)
}

sinkCh := make(chan *http.Request, 1)
testSink := mocksink.New(sinkCh)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

c := componenttest.NewComponent(t, &component.Config{})
componenttest.StartComponent(t, c)
defer c.Close()

as := mock.NewServer(c)
_, err = web.NewWebhooks(ctx, as, registry, testSink, downlinks)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

message := &ttnpb.ApplicationUp{
EndDeviceIds: registeredDeviceID,
Up: &ttnpb.ApplicationUp_UplinkMessage{
UplinkMessage: &ttnpb.ApplicationUplink{
SessionKeyId: []byte{0x11},
FPort: 42,
FCnt: 42,
FrmPayload: []byte{0x1, 0x2, 0x3},
},
},
}

a := assertions.New(t)
err = as.Publish(ctx, message)
if !a.So(err, should.BeNil) {
t.FailNow()
}

var req *http.Request
select {
case req = <-sinkCh:
case <-time.After(timeout):
t.Fatal("Expected message but nothing received")
}

actualBody, err := stdio.ReadAll(req.Body)
if !a.So(err, should.BeNil) {
t.FailNow()
}
expectedBody, err := formatters.JSON.FromUp(message)
if !a.So(err, should.BeNil) {
t.FailNow()
}
a.So(actualBody, should.Resemble, expectedBody)

// Pause the webhook.
_, err = registry.Set(ctx, ids, nil,
func(_ *ttnpb.ApplicationWebhook) (*ttnpb.ApplicationWebhook, []string, error) {
return &ttnpb.ApplicationWebhook{
Ids: ids,
BaseUrl: "https://myapp.com/api/ttn/v3/{/appID,devID}",
Format: "json",
Paused: true,
},
[]string{
"ids.application_ids",
"ids.webhook_id",
"base_url",
"format",
"paused",
"field_mask",
}, nil
})
if err != nil {
t.Fatalf("Failed to set webhook in registry: %s", err)
}

err = as.Publish(ctx, message)
if !a.So(err, should.BeNil) {
t.FailNow()
}

select {
case req = <-sinkCh:
t.Fatalf("Did not expect message but received: %v", req)
case <-time.After(timeout):
// Webhook was not received.
}
})

//nolint:paralleltest
t.Run("PausedDownlink", func(t *testing.T) {
is, isAddr, closeIS := mockis.New(ctx)
defer closeIS()

is.ApplicationRegistry().Add(ctx, registeredApplicationID, registeredApplicationKey,
ttnpb.Right_RIGHT_APPLICATION_SETTINGS_BASIC,
ttnpb.Right_RIGHT_APPLICATION_DEVICES_READ,
ttnpb.Right_RIGHT_APPLICATION_DEVICES_WRITE,
ttnpb.Right_RIGHT_APPLICATION_TRAFFIC_READ,
ttnpb.Right_RIGHT_APPLICATION_TRAFFIC_DOWN_WRITE)
conf := &component.Config{
ServiceBase: config.ServiceBase{
GRPC: config.GRPC{
Listen: ":0",
AllowInsecureForCredentials: true,
},
Cluster: cluster.Config{
IdentityServer: isAddr,
},
},
}

_, err := registry.Set(ctx, ids, nil,
func(_ *ttnpb.ApplicationWebhook) (*ttnpb.ApplicationWebhook, []string, error) {
return &ttnpb.ApplicationWebhook{
Ids: ids,
BaseUrl: "https://myapp.com/api/ttn/v3/{/appID,devID}",
Format: "json",
Paused: true,
UplinkMessage: &ttnpb.ApplicationWebhook_Message{Path: "/"},
},
[]string{
"ids.application_ids",
"ids.webhook_id",
"base_url",
"format",
"paused",
"uplink_message",
"field_mask",
}, nil
})
if err != nil {
t.Fatalf("Failed to set webhook in registry: %s", err)
}

sinkCh := make(chan *http.Request, 1)
testSink := mocksink.New(sinkCh)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

c := componenttest.NewComponent(t, conf)
as := mock.NewServer(c)
w, err := web.NewWebhooks(ctx, as, registry, testSink, downlinks)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
c.RegisterWeb(w)
componenttest.StartComponent(t, c)
defer c.Close()

mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY)

// Check the status error code when scheduling downlink to a paused webhook.
url := fmt.Sprintf("/api/v3/as/applications/%s/webhooks/%s/devices/%s/down/replace",
ids.ApplicationIds.ApplicationId, ids.WebhookId, registeredDeviceID.DeviceId,
)
body := bytes.NewReader([]byte(`{"downlinks":[]}`))
req := httptest.NewRequest(http.MethodPost, url, body)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", registeredApplicationKey))

res := httptest.NewRecorder()
c.ServeHTTP(res, req)
a.So(res.Code, should.Equal, http.StatusNotAcceptable)
downlinks, err := as.DownlinkQueueList(ctx, registeredDeviceID)
if !a.So(err, should.BeNil) {
t.FailNow()
}

a.So(downlinks, should.Resemble, []*ttnpb.ApplicationDownlink{})
})
}
Loading

0 comments on commit d259f3f

Please sign in to comment.