Skip to content

Commit

Permalink
Merge pull request #7418 from TheThingsNetwork/feature/redis-default-…
Browse files Browse the repository at this point in the history
…pagination-limit

Add default pagination limit to Redis
  • Loading branch information
halimi authored Dec 9, 2024
2 parents 52748a4 + 8a527ba commit 1c52800
Show file tree
Hide file tree
Showing 19 changed files with 1,030 additions and 59 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ For details about compatibility between different releases, see the **Commitment

### Fixed

- Enforce default page limit on AS and NS List RPCs if a value is not provided in the request.

### Security

## [3.33.0] - unreleased
Expand Down
48 changes: 44 additions & 4 deletions cmd/ttn-lw-stack/commands/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ func NewNetworkServerDownlinkTaskRedis(conf *Config) *redis.Client {
return redis.New(conf.Redis.WithNamespace("ns", "tasks"))
}

// NewNetworkServerMACSettingsProfileRegistryRedis instantiates a new redis client
// with the Network Server MAC Settings Profile Registry namespace.
func NewNetworkServerMACSettingsProfileRegistryRedis(conf *Config) *redis.Client {
redis.SetPaginationDefaults(redis.PaginationDefaults{
DefaultLimit: conf.NS.Pagination.DefaultLimit,
})

return redis.New(conf.Redis.WithNamespace("ns", "mac-settings-profiles"))
}

// NewIdentityServerTelemetryTaskRedis instantiates a new redis client
// with the Identity Server Telemetry Task namespace.
func NewIdentityServerTelemetryTaskRedis(conf *Config) *redis.Client {
Expand All @@ -91,6 +101,36 @@ func NewApplicationServerDeviceRegistryRedis(conf *Config) *redis.Client {
return NewComponentDeviceRegistryRedis(conf, "as")
}

// NewApplicationServerPubSubRegistryRedis instantiates a new redis client
// with the Application Server PubSub Registry namespace.
func NewApplicationServerPubSubRegistryRedis(conf *Config) *redis.Client {
redis.SetPaginationDefaults(redis.PaginationDefaults{
DefaultLimit: conf.AS.Pagination.DefaultLimit,
})

return redis.New(config.Redis.WithNamespace("as", "io", "pubsub"))
}

// NewApplicationServerPackagesRegistryRedis instantiates a new redis client
// with the Application Server Packages Registry namespace.
func NewApplicationServerPackagesRegistryRedis(conf *Config) *redis.Client {
redis.SetPaginationDefaults(redis.PaginationDefaults{
DefaultLimit: conf.AS.Pagination.DefaultLimit,
})

return redis.New(config.Redis.WithNamespace("as", "io", "applicationpackages"))
}

// NewApplicationServerWebhookRegistryRedis instantiates a new redis client
// with the Application Server Webhook Registry namespace.
func NewApplicationServerWebhookRegistryRedis(conf *Config) *redis.Client {
redis.SetPaginationDefaults(redis.PaginationDefaults{
DefaultLimit: conf.AS.Pagination.DefaultLimit,
})

return redis.New(config.Redis.WithNamespace("as", "io", "webhooks"))
}

// NewJoinServerDeviceRegistryRedis instantiates a new redis client
// with the Join Server Device Registry namespace.
func NewJoinServerDeviceRegistryRedis(conf *Config) *redis.Client {
Expand Down Expand Up @@ -343,7 +383,7 @@ var startCommand = &cobra.Command{
Redis: redis.New(config.Cache.Redis.WithNamespace("ns", "scheduled-downlinks")),
}
macSettingsProfiles := &nsredis.MACSettingsProfileRegistry{
Redis: redis.New(config.Redis.WithNamespace("ns", "mac-settings-profiles")),
Redis: NewNetworkServerMACSettingsProfileRegistryRedis(config),
LockTTL: defaultLockTTL,
}
if err := macSettingsProfiles.Init(ctx); err != nil {
Expand Down Expand Up @@ -379,7 +419,7 @@ var startCommand = &cobra.Command{
Redis: redis.New(config.Cache.Redis.WithNamespace("as", "traffic")),
}
pubsubRegistry := &asiopsredis.PubSubRegistry{
Redis: redis.New(config.Redis.WithNamespace("as", "io", "pubsub")),
Redis: NewApplicationServerPubSubRegistryRedis(config),
LockTTL: defaultLockTTL,
}
if err := pubsubRegistry.Init(ctx); err != nil {
Expand All @@ -388,7 +428,7 @@ var startCommand = &cobra.Command{
config.AS.PubSub.Registry = pubsubRegistry
applicationPackagesRegistry, err := asioapredis.NewApplicationPackagesRegistry(
ctx,
redis.New(config.Redis.WithNamespace("as", "io", "applicationpackages")),
NewApplicationServerPackagesRegistryRedis(config),
defaultLockTTL,
)
if err != nil {
Expand All @@ -397,7 +437,7 @@ var startCommand = &cobra.Command{
config.AS.Packages.Registry = applicationPackagesRegistry
if config.AS.Webhooks.Target != "" {
webhookRegistry := &asiowebredis.WebhookRegistry{
Redis: redis.New(config.Redis.WithNamespace("as", "io", "webhooks")),
Redis: NewApplicationServerWebhookRegistryRedis(config),
LockTTL: defaultLockTTL,
}
if err := webhookRegistry.Init(ctx); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/applicationserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ type DownlinksConfig struct {
ConfirmationConfig ConfirmationConfig `name:"confirmation" description:"Configuration for confirmed downlink"`
}

// PaginationConfig represents the configuration for pagination.
type PaginationConfig struct {
DefaultLimit int64 `name:"default-limit" description:"Default limit for pagination"`
}

// Config represents the ApplicationServer configuration.
type Config struct {
LinkMode string `name:"link-mode" description:"Deprecated - mode to link applications to their Network Server (all, explicit)"`
Expand All @@ -123,6 +128,7 @@ type Config struct {
DeviceKEKLabel string `name:"device-kek-label" description:"Label of KEK used to encrypt device keys at rest"`
DeviceLastSeen LastSeenConfig `name:"device-last-seen" description:"End Device last seen batch update configuration"`
Downlinks DownlinksConfig `name:"downlinks" description:"Downlink configuration"`
Pagination PaginationConfig `name:"pagination" description:"Pagination configuration"`
}

func (c Config) toProto() *ttnpb.AsConfiguration {
Expand Down
7 changes: 5 additions & 2 deletions pkg/applicationserver/io/packages/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.thethings.network/lorawan-stack/v3/pkg/config"
"go.thethings.network/lorawan-stack/v3/pkg/errors"
mockis "go.thethings.network/lorawan-stack/v3/pkg/identityserver/mock"
ttnredis "go.thethings.network/lorawan-stack/v3/pkg/redis"
"go.thethings.network/lorawan-stack/v3/pkg/rpcmetadata"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
"go.thethings.network/lorawan-stack/v3/pkg/unique"
Expand Down Expand Up @@ -371,6 +372,8 @@ func TestAssociations(t *testing.T) {
t.Run("Pagination", func(t *testing.T) {
a := assertions.New(t)

ttnredis.SetPaginationDefaults(ttnredis.PaginationDefaults{DefaultLimit: 10})

for i := 1; i < 21; i++ {
association := &ttnpb.ApplicationPackageAssociation{
Ids: &ttnpb.ApplicationPackageAssociationIdentifiers{
Expand Down Expand Up @@ -424,8 +427,8 @@ func TestAssociations(t *testing.T) {
limit: 0,
page: 0,
portLow: 1,
portHigh: 20,
length: 20,
portHigh: 10,
length: 10,
},
} {
t.Run(fmt.Sprintf("limit:%v_page:%v", tc.limit, tc.page),
Expand Down
66 changes: 56 additions & 10 deletions pkg/applicationserver/io/pubsub/redis/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package redis

import (
"context"
"runtime/trace"
"time"

"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -132,19 +133,54 @@ func (r PubSubRegistry) Range(ctx context.Context, paths []string, f func(contex
func (r PubSubRegistry) List(ctx context.Context, ids *ttnpb.ApplicationIdentifiers, paths []string) ([]*ttnpb.ApplicationPubSub, error) {
var pbs []*ttnpb.ApplicationPubSub
appUID := unique.ID(ctx, ids)
err := ttnredis.FindProtos(ctx, r.Redis, r.appKey(appUID), r.makeUIDKeyFunc(appUID)).Range(func() (proto.Message, func() (bool, error)) {
pb := &ttnpb.ApplicationPubSub{}
return pb, func() (bool, error) {
pb, err := applyPubSubFieldMask(nil, pb, appendImplicitPubSubGetPaths(paths...)...)
uidKey := r.appKey(appUID)

opts := []ttnredis.FindProtosOption{}
limit, offset := ttnredis.PaginationLimitAndOffsetFromContext(ctx)
if limit != 0 {
opts = append(opts,
ttnredis.FindProtosSorted(true),
ttnredis.FindProtosWithOffsetAndCount(offset, limit),
)
}

rangeProtos := func(c redis.Cmdable) error {
return ttnredis.FindProtos(ctx, c, uidKey, r.makeUIDKeyFunc(appUID), opts...).Range(
func() (proto.Message, func() (bool, error)) {
pb := &ttnpb.ApplicationPubSub{}
return pb, func() (bool, error) {
pb, err := applyPubSubFieldMask(nil, pb, appendImplicitPubSubGetPaths(paths...)...)
if err != nil {
return false, err
}
pbs = append(pbs, pb)
return true, nil
}
})
}

defer trace.StartRegion(ctx, "list pubsub by application id").End()

var err error
if limit != 0 {
var lockerID string
lockerID, err = ttnredis.GenerateLockerID()
if err != nil {
return nil, err
}
err = ttnredis.LockedWatch(ctx, r.Redis, uidKey, lockerID, r.LockTTL, func(tx *redis.Tx) (err error) {
total, err := tx.SCard(ctx, uidKey).Result()
if err != nil {
return false, err
return err
}
pbs = append(pbs, pb)
return true, nil
}
})
ttnredis.SetPaginationTotal(ctx, total)
return rangeProtos(tx)
})
} else {
err = rangeProtos(r.Redis)
}
if err != nil {
return nil, err
return nil, ttnredis.ConvertError(err)
}
return pbs, nil
}
Expand Down Expand Up @@ -283,3 +319,13 @@ func (r PubSubRegistry) Set(ctx context.Context, ids *ttnpb.ApplicationPubSubIde
}
return pb, nil
}

// WithPagination returns a new context with pagination parameters.
func (PubSubRegistry) WithPagination(
ctx context.Context,
limit uint32,
page uint32,
total *int64,
) context.Context {
return ttnredis.NewContextWithPagination(ctx, int64(limit), int64(page), total)
}
Loading

0 comments on commit 1c52800

Please sign in to comment.