Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: add context management support to storages #1569

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8d47776
arangodb: add support for context management
efectn Feb 9, 2025
4b05c26
clickhouse: add support for context management
efectn Feb 9, 2025
3fe9c03
dynamodb: add support for context management
efectn Feb 9, 2025
9c0e237
etcd: add support for context management
efectn Feb 9, 2025
de1450c
minio: add support for context management
efectn Feb 9, 2025
3b355ed
mongodb: add support for context management
efectn Feb 9, 2025
260c220
mssql: add support for context management
efectn Feb 9, 2025
fe89536
mysql: add support for context management
efectn Feb 9, 2025
b384945
nats: add support for context management
efectn Feb 10, 2025
2ecc712
postgres: add support for context management
efectn Feb 10, 2025
e7b9b3c
redis: add support for context management
efectn Feb 10, 2025
901d02f
rueidis: add support for context management
efectn Feb 10, 2025
32ee49b
s3: add support for context management
efectn Feb 10, 2025
96a5570
scylladb: add support for context management
efectn Feb 10, 2025
fc5565d
sqlite3: add support for context management
efectn Feb 10, 2025
2dcc594
valkey: add support for context management
efectn Feb 11, 2025
d365f8a
azureblob: add support for context management
efectn Feb 11, 2025
9f1466a
cloudflarekv: add support for context management
efectn Feb 11, 2025
09e0dd0
cloudflarekv: make context functional for test_module
efectn Feb 11, 2025
e57389b
Merge branch 'main' into add-withcontext
efectn Mar 4, 2025
fbea5b5
couchbase: add support for context management
efectn Mar 4, 2025
d935df4
Merge branch 'add-withcontext' of https://github.com/gofiber/storage …
efectn Mar 4, 2025
714501b
Merge branch 'main' into add-withcontext
ReneWerner87 Mar 19, 2025
97ca918
Merge branch 'main' into add-withcontext
gaby Mar 27, 2025
5fb13db
chore: use testcontainers properly in nats
mdelapenya Mar 28, 2025
b2da531
chore: use testcontainers properly in dynamodb
mdelapenya Mar 28, 2025
c15dcd7
chore: use testcontainers properly in mysql
mdelapenya Mar 28, 2025
0c93caa
chore: use testcontainers properly in postgres
mdelapenya Mar 28, 2025
76a2fc7
chore: use testcontainers properly in mongodb
mdelapenya Mar 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions arangodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ A ArangoDB storage driver using `arangodb/go-driver` and [arangodb/go-driver](ht
### Signatures
```go
func New(config ...Config) Storage
func (s *Storage) GetWithContext(ctx context.Context, key string) ([]byte, error)
func (s *Storage) Get(key string) ([]byte, error)
func (s *Storage) SetWithContext(ctx context.Context, key string, val []byte, exp time.Duration) error
func (s *Storage) Set(key string, val []byte, exp time.Duration) error
func (s *Storage) DeleteWithContext(ctx context.Context, key string) error
func (s *Storage) Delete(key string) error
func (s *Storage) ResetWithContext(ctx context.Context) error
func (s *Storage) Reset() error
func (s *Storage) Close() error
func (s *Storage) Conn() driver.Client
Expand Down
39 changes: 28 additions & 11 deletions arangodb/arangodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,12 @@ func New(config ...Config) *Storage {
return store
}

// Get value by key
func (s *Storage) Get(key string) ([]byte, error) {
// GetWithContext value by key with given context
func (s *Storage) GetWithContext(ctx context.Context, key string) ([]byte, error) {
if len(key) <= 0 {
return nil, nil
}

ctx := context.Background()

// Check if the document exists
// to avoid errors later
exists, err := s.collection.DocumentExists(ctx, key)
Expand Down Expand Up @@ -151,8 +149,13 @@ func (s *Storage) Get(key string) ([]byte, error) {
return utils.UnsafeBytes(model.Val), nil
}

// Set key with value
func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
// Get value by key
func (s *Storage) Get(key string) ([]byte, error) {
return s.GetWithContext(context.Background(), key)
}

// SetWithContext key with value with given context
func (s *Storage) SetWithContext(ctx context.Context, key string, val []byte, exp time.Duration) error {
// Ain't Nobody Got Time For That
if len(key) <= 0 || len(val) <= 0 {
return nil
Expand All @@ -169,7 +172,6 @@ func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
Val: valStr,
Exp: expireAt,
}
ctx := context.Background()

// Arango does not support documents with the same key
// So we need to check if the document exists
Expand All @@ -188,20 +190,35 @@ func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
return err
}

// Delete value by key
func (s *Storage) Delete(key string) error {
// Set key with value
func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
return s.SetWithContext(context.Background(), key, val, exp)
}

// DeleteWithContext value by key with given context
func (s *Storage) DeleteWithContext(ctx context.Context, key string) error {
// Ain't Nobody Got Time For That
if len(key) <= 0 {
return nil
}
_, err := s.collection.RemoveDocument(context.Background(), key)
_, err := s.collection.RemoveDocument(ctx, key)
return err
}

// Delete value by key
func (s *Storage) Delete(key string) error {
return s.DeleteWithContext(context.Background(), key)
}

// ResetWithContext all keys with given context
func (s *Storage) ResetWithContext(ctx context.Context) error {
return s.collection.Truncate(ctx)
}

// Reset all keys
// truncate the collection
func (s *Storage) Reset() error {
return s.collection.Truncate(context.Background())
return s.ResetWithContext(context.Background())
}

// Close the database
Expand Down
75 changes: 75 additions & 0 deletions arangodb/arangodb_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package arangodb

import (
"context"
"testing"
"time"

Expand All @@ -21,6 +22,19 @@ func Test_ArangoDB_Set(t *testing.T) {
require.NoError(t, err)
}

func Test_ArangoDB_SetWithContext(t *testing.T) {
var (
key = "john"
val = []byte("doe")
)

ctx, cancel := context.WithCancel(context.Background())
cancel()

err := testStore.SetWithContext(ctx, key, val, 0)
require.ErrorIs(t, err, context.Canceled)
}

func Test_ArangoDB_Upsert(t *testing.T) {
var (
key = "john"
Expand Down Expand Up @@ -48,6 +62,23 @@ func Test_ArangoDB_Get(t *testing.T) {
require.Equal(t, val, result)
}

func Test_ArangoDB_GetWithContext(t *testing.T) {
var (
key = "john"
val = []byte("doe")
)

err := testStore.Set(key, val, 0)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

result, err := testStore.GetWithContext(ctx, key)
require.ErrorIs(t, err, context.Canceled)
require.Zero(t, len(result))
}

func Test_ArangoDB_Set_Expiration(t *testing.T) {
var (
key = "john"
Expand Down Expand Up @@ -92,6 +123,26 @@ func Test_ArangoDB_Delete(t *testing.T) {
require.Zero(t, len(result))
}

func Test_ArangoDB_DeleteWithContext(t *testing.T) {
var (
key = "john"
val = []byte("doe")
)

err := testStore.Set(key, val, 0)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

err = testStore.DeleteWithContext(ctx, key)
require.ErrorIs(t, err, context.Canceled)

result, err := testStore.Get(key)
require.NoError(t, err)
require.Equal(t, val, result)
}

func Test_ArangoDB_Reset(t *testing.T) {
val := []byte("doe")

Expand All @@ -113,6 +164,30 @@ func Test_ArangoDB_Reset(t *testing.T) {
require.Zero(t, len(result))
}

func Test_ArangoDB_ResetWithContext(t *testing.T) {
val := []byte("doe")

err := testStore.Set("john1", val, 0)
require.NoError(t, err)

err = testStore.Set("john2", val, 0)
require.Equal(t, err, nil)

ctx, cancel := context.WithCancel(context.Background())
cancel()

err = testStore.ResetWithContext(ctx)
require.ErrorIs(t, err, context.Canceled)

result, err := testStore.Get("john1")
require.NoError(t, err)
require.Equal(t, val, result)

result, err = testStore.Get("john2")
require.NoError(t, err)
require.Equal(t, val, result)
}

func Test_ArangoDB_Non_UTF8(t *testing.T) {
val := []byte("0xF5")

Expand Down
65 changes: 45 additions & 20 deletions azureblob/azureblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,12 @@ func New(config ...Config) *Storage {
return storage
}

// Get value by key
func (s *Storage) Get(key string) ([]byte, error) {
// GetWithContext gets value by key
func (s *Storage) GetWithContext(ctx context.Context, key string) ([]byte, error) {
if len(key) <= 0 {
return nil, nil
}
ctx, cancel := s.requestContext()
defer cancel()

resp, err := s.client.DownloadStream(ctx, s.container, key, nil)
if err != nil {
return []byte{}, err
Expand All @@ -63,55 +62,81 @@ func (s *Storage) Get(key string) ([]byte, error) {
return data, err
}

// Set key with value
func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
// Get gets value by key
func (s *Storage) Get(key string) ([]byte, error) {
ctx, cancel := s.requestContext()
defer cancel()

return s.GetWithContext(ctx, key)
}

// SetWithContext sets key with value
func (s *Storage) SetWithContext(ctx context.Context, key string, val []byte, exp time.Duration) error {
if len(key) <= 0 {
return nil
}
ctx, cancel := s.requestContext()
defer cancel()

_, err := s.client.UploadBuffer(ctx, s.container, key, val, nil)
return err
}

// Delete entry by key
func (s *Storage) Delete(key string) error {
// Set sets key with value
func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
ctx, cancel := s.requestContext()
defer cancel()

return s.SetWithContext(ctx, key, val, exp)
}

// DeleteWithContext deletes entry by key
func (s *Storage) DeleteWithContext(ctx context.Context, key string) error {
if len(key) <= 0 {
return nil
}

ctx, cancel := s.requestContext()
defer cancel()
_, err := s.client.DeleteBlob(ctx, s.container, key, nil)
return err
}

// Reset all entries
func (s *Storage) Reset() error {
// Delete deletes entry by key
func (s *Storage) Delete(key string) error {
ctx, cancel := s.requestContext()
defer cancel()

return s.DeleteWithContext(ctx, key)
}

// ResetWithContext resets all entries
func (s *Storage) ResetWithContext(ctx context.Context) error {
//_, err := s.client.DeleteContainer(ctx, s.container, nil)
//return err
pager := s.client.NewListBlobsFlatPager(s.container, nil)
errCounter := 0

for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
errCounter = errCounter + 1
return err
}

for _, v := range resp.Segment.BlobItems {
_, err = s.client.DeleteBlob(ctx, s.container, *v.Name, nil)
if err != nil {
errCounter = errCounter + 1
return err
}
}
}
if errCounter > 0 {
return fmt.Errorf("%d errors occured while resetting", errCounter)
}

return nil
}

// Reset resets all entries
func (s *Storage) Reset() error {
ctx, cancel := s.requestContext()
defer cancel()

return s.ResetWithContext(ctx)
}

// Conn returns storage client
func (s *Storage) Conn() *azblob.Client {
return s.client
Expand Down
Loading
Loading