-
Notifications
You must be signed in to change notification settings - Fork 8
/
storage.go
677 lines (553 loc) · 19 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
package storageredis
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/fs"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/bsm/redislock"
"github.com/caddyserver/certmagic"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
)
const (
// Redis client type
defaultClientType = "simple"
// Redis server host
defaultHost = "127.0.0.1"
// Redis server port
defaultPort = "6379"
// Redis server database
defaultDb = 0
// Prepended to every Redis key
defaultKeyPrefix = "caddy"
// Compress values before storing
defaultCompression = false
// Connect to Redis via TLS
defaultTLS = false
// Do not verify TLS cerficate
defaultTLSInsecure = true
// Routing option for cluster client
defaultRouteByLatency = false
// Routing option for cluster client
defaultRouteRandomly = false
// Redis lock time-to-live
lockTTL = 5 * time.Second
// Delay between attempts to obtain Lock
lockPollInterval = 1 * time.Second
// How frequently the Lock's TTL should be updated
lockRefreshInterval = 3 * time.Second
)
// RedisStorage implements a Caddy storage backend for Redis
// It supports Single (Standalone), Cluster, or Sentinal (Failover) Redis server configurations.
type RedisStorage struct {
// ClientType specifies the Redis client type. Valid values are "cluster" or "failover"
ClientType string `json:"client_type"`
// Address The full address of the Redis server. Example: "127.0.0.1:6379"
// If not defined, will be generated from Host and Port parameters.
Address []string `json:"address"`
// Host The Redis server hostname or IP address. Default: "127.0.0.1"
Host []string `json:"host"`
// Host The Redis server port number. Default: "6379"
Port []string `json:"port"`
// DB The Redis server database number. Default: 0
DB int `json:"db"`
// Timeout The Redis server timeout in seconds. Default: 5
Timeout string `json:"timeout"`
// Username The username for authenticating with the Redis server. Default: "" (No authentication)
Username string `json:"username"`
// Password The password for authenticating with the Redis server. Default: "" (No authentication)
Password string `json:"password"`
// MasterName Only required when connecting to Redis via Sentinal (Failover mode). Default ""
MasterName string `json:"master_name"`
// KeyPrefix A string prefix that is appended to Redis keys. Default: "caddy"
// Useful when the Redis server is used by multiple applications.
KeyPrefix string `json:"key_prefix"`
// EncryptionKey A key string used to symmetrically encrypt and decrypt data stored in Redis.
// The key must be exactly 32 characters, longer values will be truncated. Default: "" (No encryption)
EncryptionKey string `json:"encryption_key"`
// Compression Specifies whether values should be compressed before storing in Redis. Default: false
Compression bool `json:"compression"`
// TlsEnabled controls whether TLS will be used to connect to the Redis
// server. False by default.
TlsEnabled bool `json:"tls_enabled"`
// TlsInsecure controls whether the client will verify the server
// certificate. See `InsecureSkipVerify` in `tls.Config` for details. True
// by default.
// https://pkg.go.dev/crypto/tls#Config
TlsInsecure bool `json:"tls_insecure"`
// TlsServerCertsPEM is a series of PEM encoded certificates that will be
// used by the client to validate trust in the Redis server's certificate
// instead of the system trust store. May not be specified alongside
// `TlsServerCertsPath`. See `x509.CertPool.AppendCertsFromPem` for details.
// https://pkg.go.dev/crypto/x509#CertPool.AppendCertsFromPEM
TlsServerCertsPEM string `json:"tls_server_certs_pem"`
// TlsServerCertsPath is the path to a file containing a series of PEM
// encoded certificates that will be used by the client to validate trust in
// the Redis server's certificate instead of the system trust store. May not
// be specified alongside `TlsServerCertsPem`. See
// `x509.CertPool.AppendCertsFromPem` for details.
// https://pkg.go.dev/crypto/x509#CertPool.AppendCertsFromPEM
TlsServerCertsPath string `json:"tls_server_certs_path"`
// RouteByLatency Route commands by latency, only used in Cluster mode. Default: false
RouteByLatency bool `json:"route_by_latency"`
// RouteRandomly Route commands randomly, only used in Cluster mode. Default: false
RouteRandomly bool `json:"route_randomly"`
client redis.UniversalClient
locker *redislock.Client
logger *zap.SugaredLogger
locks *sync.Map
}
type StorageData struct {
Value []byte `json:"value"`
Modified time.Time `json:"modified"`
Size int64 `json:"size"`
Compression int `json:"compression"`
Encryption int `json:"encryption"`
}
// create a new RedisStorage struct with default values
func New() *RedisStorage {
rs := RedisStorage{
ClientType: defaultClientType,
Host: []string{defaultHost},
Port: []string{defaultPort},
DB: defaultDb,
KeyPrefix: defaultKeyPrefix,
Compression: defaultCompression,
TlsEnabled: defaultTLS,
TlsInsecure: defaultTLSInsecure,
}
return &rs
}
// Initilalize Redis client and locker
func (rs *RedisStorage) initRedisClient(ctx context.Context) error {
// Configure options for all client types
clientOpts := redis.UniversalOptions{
Addrs: rs.Address,
MasterName: rs.MasterName,
Username: rs.Username,
Password: rs.Password,
DB: rs.DB,
}
// Configure timeout values if defined
if rs.Timeout != "" {
// Was already sanity-checked in UnmarshalCaddyfile
timeout, _ := strconv.Atoi(rs.Timeout)
clientOpts.DialTimeout = time.Duration(timeout) * time.Second
clientOpts.ReadTimeout = time.Duration(timeout) * time.Second
clientOpts.WriteTimeout = time.Duration(timeout) * time.Second
}
// Configure cluster routing options
if rs.RouteByLatency || rs.RouteRandomly {
clientOpts.RouteByLatency = rs.RouteByLatency
clientOpts.RouteRandomly = rs.RouteRandomly
}
// Configure TLS support if enabled
if rs.TlsEnabled {
clientOpts.TLSConfig = &tls.Config{
InsecureSkipVerify: rs.TlsInsecure,
}
if len(rs.TlsServerCertsPEM) > 0 && len(rs.TlsServerCertsPath) > 0 {
return fmt.Errorf("Cannot specify TlsServerCertsPEM alongside TlsServerCertsPath")
}
if len(rs.TlsServerCertsPEM) > 0 || len(rs.TlsServerCertsPath) > 0 {
certPool := x509.NewCertPool()
pem := []byte(rs.TlsServerCertsPEM)
if len(rs.TlsServerCertsPath) > 0 {
var err error
pem, err = os.ReadFile(rs.TlsServerCertsPath)
if err != nil {
return fmt.Errorf("Failed to load PEM server certs from file %s: %v", rs.TlsServerCertsPath, err)
}
}
if !certPool.AppendCertsFromPEM(pem) {
return fmt.Errorf("Failed to load PEM server certs")
}
clientOpts.TLSConfig.RootCAs = certPool
}
}
// Create appropriate Redis client type
if rs.ClientType == "failover" && clientOpts.MasterName != "" {
// Create new Redis Failover Cluster client
clusterClient := redis.NewFailoverClusterClient(clientOpts.Failover())
// Test connection to the Redis cluster
err := clusterClient.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
return shard.Ping(ctx).Err()
})
if err != nil {
return err
}
rs.client = clusterClient
} else if rs.ClientType == "cluster" || len(clientOpts.Addrs) > 1 {
// Create new Redis Cluster client
clusterClient := redis.NewClusterClient(clientOpts.Cluster())
// Test connection to the Redis cluster
err := clusterClient.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
return shard.Ping(ctx).Err()
})
if err != nil {
return err
}
rs.client = clusterClient
} else {
// Create new Redis simple standalone client
rs.client = redis.NewClient(clientOpts.Simple())
// Test connection to the Redis server
err := rs.client.Ping(ctx).Err()
if err != nil {
return err
}
}
// Create new redislock client
rs.locker = redislock.New(rs.client)
rs.locks = &sync.Map{}
return nil
}
func (rs RedisStorage) Store(ctx context.Context, key string, value []byte) error {
var size = len(value)
var compressionFlag = 0
var encryptionFlag = 0
// Compress value if compression enabled
if rs.Compression {
compressedValue, err := rs.compress(value)
if err != nil {
return fmt.Errorf("Unable to compress value for %s: %v", key, err)
}
// Check compression efficiency
if size > len(compressedValue) {
value = compressedValue
compressionFlag = 1
}
}
// Encrypt value if encryption enabled
if rs.EncryptionKey != "" {
encryptedValue, err := rs.encrypt(value)
if err != nil {
return fmt.Errorf("Unable to encrypt value for %s: %v", key, err)
}
value = encryptedValue
encryptionFlag = 1
}
sd := &StorageData{
Value: value,
Modified: time.Now(),
Size: int64(size),
Compression: compressionFlag,
Encryption: encryptionFlag,
}
jsonValue, err := json.Marshal(sd)
if err != nil {
return fmt.Errorf("Unable to marshal value for %s: %v", key, err)
}
var prefixedKey = rs.prefixKey(key)
// Create directory structure set for current key
score := float64(sd.Modified.Unix())
if err := rs.storeDirectoryRecord(ctx, prefixedKey, score, false, false); err != nil {
return fmt.Errorf("Unable to create directory for key %s: %v", key, err)
}
// Store the key value in the Redis database
if err := rs.client.Set(ctx, prefixedKey, jsonValue, 0).Err(); err != nil {
return fmt.Errorf("Unable to set value for %s: %v", key, err)
}
return nil
}
func (rs RedisStorage) Load(ctx context.Context, key string) ([]byte, error) {
var sd *StorageData
var value []byte
var err error
sd, err = rs.loadStorageData(ctx, key)
if err != nil {
return nil, err
}
value = sd.Value
// Decrypt value if encrypted
if sd.Encryption > 0 {
value, err = rs.decrypt(value)
if err != nil {
return nil, fmt.Errorf("Unable to decrypt value for %s: %v", key, err)
}
}
// Uncompress value if compressed
if sd.Compression > 0 {
value, err = rs.uncompress(value)
if err != nil {
return nil, fmt.Errorf("Unable to uncompress value for %s: %v", key, err)
}
}
return value, nil
}
func (rs RedisStorage) Delete(ctx context.Context, key string) error {
var prefixedKey = rs.prefixKey(key)
// Remove current key from directory structure
if err := rs.deleteDirectoryRecord(ctx, prefixedKey, false); err != nil {
return fmt.Errorf("Unable to delete directory for key %s: %v", key, err)
}
if err := rs.client.Del(ctx, prefixedKey).Err(); err != nil {
return fmt.Errorf("Unable to delete key %s: %v", key, err)
}
return nil
}
func (rs RedisStorage) Exists(ctx context.Context, key string) bool {
// Redis returns a count of the number of keys found
exists := rs.client.Exists(ctx, rs.prefixKey(key)).Val()
return exists > 0
}
func (rs RedisStorage) List(ctx context.Context, dir string, recursive bool) ([]string, error) {
var keyList []string
var currKey = rs.prefixKey(dir)
// Obtain range of all direct children stored in the Sorted Set
keys, err := rs.client.ZRange(ctx, currKey, 0, -1).Result()
if err != nil {
return keyList, fmt.Errorf("Unable to get range on sorted set '%s': %v", currKey, err)
}
// Iterate over each child key
for _, k := range keys {
// Directory keys will have a "/" suffix
trimmedKey := strings.TrimSuffix(k, "/")
// Reconstruct the full path of child key
fullPathKey := path.Join(dir, trimmedKey)
// If current key is a directory
if recursive && k != trimmedKey {
// Recursively traverse all child directories
childKeys, err := rs.List(ctx, fullPathKey, recursive)
if err != nil {
return keyList, err
}
keyList = append(keyList, childKeys...)
} else {
keyList = append(keyList, fullPathKey)
}
}
return keyList, nil
}
func (rs RedisStorage) Stat(ctx context.Context, key string) (certmagic.KeyInfo, error) {
sd, err := rs.loadStorageData(ctx, key)
if err != nil {
return certmagic.KeyInfo{}, err
}
return certmagic.KeyInfo{
Key: key,
Modified: sd.Modified,
Size: sd.Size,
IsTerminal: true,
}, nil
}
func (rs *RedisStorage) Lock(ctx context.Context, name string) error {
key := rs.prefixLock(name)
for {
// try to obtain lock
lock, err := rs.locker.Obtain(ctx, key, lockTTL, &redislock.Options{})
// lock successfully obtained
if err == nil {
// store the lock in sync.map, needed for unlocking
rs.locks.Store(key, lock)
// keep the lock fresh as long as we hold it
go func(ctx context.Context, lock *redislock.Lock) {
for {
// refresh the Redis lock
err := lock.Refresh(ctx, lockTTL, nil)
if err != nil {
return
}
select {
case <-time.After(lockRefreshInterval):
case <-ctx.Done():
return
}
}
}(ctx, lock)
return nil
}
// check for unexpected error
if err != redislock.ErrNotObtained {
return fmt.Errorf("Unable to obtain lock for %s: %v", key, err)
}
// lock already exists, wait and try again until cancelled
select {
case <-time.After(lockPollInterval):
case <-ctx.Done():
return ctx.Err()
}
}
}
func (rs *RedisStorage) Unlock(ctx context.Context, name string) error {
key := rs.prefixLock(name)
// load and delete lock from sync.Map
if syncMapLock, loaded := rs.locks.LoadAndDelete(key); loaded {
// type assertion for Redis lock
if lock, ok := syncMapLock.(*redislock.Lock); ok {
// release the Redis lock
if err := lock.Release(ctx); err != nil {
return fmt.Errorf("Unable to release lock for %s: %v", key, err)
}
}
}
return nil
}
func (rs *RedisStorage) Repair(ctx context.Context, dir string) error {
var currKey = rs.prefixKey(dir)
// Perform recursive full key scan only from the root directory
if dir == "" {
var pointer uint64 = 0
var scanCount int64 = 500
for {
// Scan for keys matching the search query and iterate until all found
keys, nextPointer, err := rs.client.Scan(ctx, pointer, currKey+"*", scanCount).Result()
if err != nil {
return fmt.Errorf("Unable to scan path %s: %v", currKey, err)
}
// Iterate over returned keys
for _, key := range keys {
// Proceed only if key type is regular string value
keyType := rs.client.Type(ctx, key).Val()
if keyType != "string" {
continue
}
// Load the Storage Data struct to obtain modified time
trimmedKey := rs.trimKey(key)
sd, err := rs.loadStorageData(ctx, trimmedKey)
if err != nil {
rs.logger.Infof("Unable to load storage data for key '%s'", trimmedKey)
continue
}
// Repair directory structure set for current key
score := float64(sd.Modified.Unix())
if err := rs.storeDirectoryRecord(ctx, key, score, true, false); err != nil {
return fmt.Errorf("Unable to repair directory index for key '%s'", trimmedKey)
}
}
// End of results reached
if nextPointer == 0 {
break
}
pointer = nextPointer
}
}
// Obtain range of all direct children stored in the Sorted Set
keys, err := rs.client.ZRange(ctx, currKey, 0, -1).Result()
if err != nil {
return fmt.Errorf("Unable to get range on sorted set '%s': %v", currKey, err)
}
// Iterate over each child key
for _, k := range keys {
// Directory keys will have a "/" suffix
trimmedKey := strings.TrimSuffix(k, "/")
// Reconstruct the full path of child key
fullPathKey := path.Join(dir, trimmedKey)
// Remove key from set if it does not exist
if !rs.Exists(ctx, fullPathKey) {
rs.client.ZRem(ctx, currKey, k)
rs.logger.Infof("Removed non-existant record '%s' from directory '%s'", k, currKey)
continue
}
// If current key is a directory
if k != trimmedKey {
// Recursively traverse all child directories
if err := rs.Repair(ctx, fullPathKey); err != nil {
return err
}
}
}
return nil
}
func (rs *RedisStorage) trimKey(key string) string {
return strings.TrimPrefix(strings.TrimPrefix(key, rs.KeyPrefix), "/")
}
func (rs *RedisStorage) prefixKey(key string) string {
return path.Join(rs.KeyPrefix, key)
}
func (rs *RedisStorage) prefixLock(key string) string {
return rs.prefixKey(path.Join("locks", key))
}
func (rs RedisStorage) loadStorageData(ctx context.Context, key string) (*StorageData, error) {
data, err := rs.client.Get(ctx, rs.prefixKey(key)).Bytes()
if data == nil || errors.Is(err, redis.Nil) {
return nil, fs.ErrNotExist
} else if err != nil {
return nil, fmt.Errorf("Unable to get data for %s: %v", key, err)
}
sd := &StorageData{}
if err := json.Unmarshal(data, sd); err != nil {
return nil, fmt.Errorf("Unable to unmarshal value for %s: %v", key, err)
}
return sd, nil
}
// Store directory index in Redis ZSet structure for fast and efficient travseral in List()
func (rs RedisStorage) storeDirectoryRecord(ctx context.Context, key string, score float64, repair, baseIsDir bool) error {
// Extract parent directory and base (file) names from key
dir, base := rs.splitDirectoryKey(key, baseIsDir)
// Reached the top-level directory
if dir == "." {
return nil
}
// Insert "base" value into Set "dir"
success, err := rs.client.ZAdd(ctx, dir, redis.Z{Score: score, Member: base}).Result()
if err != nil {
return fmt.Errorf("Unable to add %s to Set %s: %v", base, dir, err)
}
// Non-zero success means base was added to the set (not already there)
if success > 0 || repair {
if success > 0 && repair {
rs.logger.Infof("Repaired index for record '%s' in directory '%s'", base, dir)
}
// recursively create parent directory until already
// created (success == 0) or top level reached
rs.storeDirectoryRecord(ctx, dir, score, repair, true)
}
return nil
}
// Delete record from directory index Redis ZSet structure
func (rs RedisStorage) deleteDirectoryRecord(ctx context.Context, key string, baseIsDir bool) error {
dir, base := rs.splitDirectoryKey(key, baseIsDir)
// Reached the top-level directory
if dir == "." {
return nil
}
// Remove "base" value from Set "dir"
if err := rs.client.ZRem(ctx, dir, base).Err(); err != nil {
return fmt.Errorf("Unable to remove %s from Set %s: %v", base, dir, err)
}
// Check if Set "dir" still exists (removing the last item deletes the set)
if exists := rs.client.Exists(ctx, dir).Val(); exists == 0 {
// Recursively delete parent directory until parent
// is not empty (exists > 0) or top level reached
rs.deleteDirectoryRecord(ctx, dir, true)
}
return nil
}
func (rs RedisStorage) splitDirectoryKey(key string, baseIsDir bool) (string, string) {
dir := path.Dir(key)
base := path.Base(key)
// Append slash to indicate directory
if baseIsDir {
base = base + "/"
}
return dir, base
}
func (rs RedisStorage) String() string {
redacted := `REDACTED`
if rs.Password != "" {
rs.Password = redacted
}
if rs.EncryptionKey != "" {
rs.EncryptionKey = redacted
}
strVal, _ := json.Marshal(rs)
return string(strVal)
}
// GetClient returns the Redis client initialized by this storage.
//
// This is useful for other modules that need to interact with the same Redis instance.
// The return type of GetClient is "any" for forward-compatibility new versions of go-redis.
// The returned value must usually be cast to redis.UniversalClient.
func (rs *RedisStorage) GetClient() any {
return rs.client
}