Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update:github.com/mongodb/mongo-go-driver v2.0 Migration #4687

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
103 changes: 103 additions & 0 deletions core/stores/mon/v2/bulkinserter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//go:generate mockgen -package mon -destination collection_inserter_mock.go -source bulkinserter.go collectionInserter
package mon

import (
"context"
"time"

"github.com/zeromicro/go-zero/core/executors"
"github.com/zeromicro/go-zero/core/logx"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)

const (
flushInterval = time.Second
maxBulkRows = 1000
)

type (
// ResultHandler is a handler that used to handle results.
ResultHandler func(*mongo.InsertManyResult, error)

// A BulkInserter is used to insert bulk of mongo records.
BulkInserter struct {
executor *executors.PeriodicalExecutor
inserter *dbInserter
}
)

// NewBulkInserter returns a BulkInserter.
func NewBulkInserter(coll Collection, interval ...time.Duration) (*BulkInserter, error) {
cloneColl := coll.Clone()

inserter := &dbInserter{
collection: cloneColl,
}

duration := flushInterval
if len(interval) > 0 {
duration = interval[0]
}

return &BulkInserter{
executor: executors.NewPeriodicalExecutor(duration, inserter),
inserter: inserter,
}, nil
}

// Flush flushes the inserter, writes all pending records.
func (bi *BulkInserter) Flush() {
bi.executor.Flush()
}

// Insert inserts doc.
func (bi *BulkInserter) Insert(doc any) {
bi.executor.Add(doc)
}

// SetResultHandler sets the result handler.
func (bi *BulkInserter) SetResultHandler(handler ResultHandler) {
bi.executor.Sync(func() {
bi.inserter.resultHandler = handler
})
}

type collectionInserter interface {
InsertMany(
ctx context.Context,
documents interface{},
opts ...options.Lister[options.InsertManyOptions],
) (*mongo.InsertManyResult, error)
}

type dbInserter struct {
collection collectionInserter
documents []any
resultHandler ResultHandler
}

func (in *dbInserter) AddTask(doc any) bool {
in.documents = append(in.documents, doc)
return len(in.documents) >= maxBulkRows
}

func (in *dbInserter) Execute(objs any) {
docs := objs.([]any)
if len(docs) == 0 {
return
}

result, err := in.collection.InsertMany(context.Background(), docs)
if in.resultHandler != nil {
in.resultHandler(result, err)
} else if err != nil {
logx.Error(err)
}
}

func (in *dbInserter) RemoveAll() any {
documents := in.documents
in.documents = nil
return documents
}
131 changes: 131 additions & 0 deletions core/stores/mon/v2/bulkinserter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package mon

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.uber.org/mock/gomock"
)

func TestBulkInserter_InsertAndFlush(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockCollection := NewMockCollection(ctrl)
mockCollection.EXPECT().Clone().Return(&mongo.Collection{})
bulkInserter, err := NewBulkInserter(mockCollection, time.Second)
assert.NoError(t, err)
bulkInserter.SetResultHandler(func(result *mongo.InsertManyResult, err error) {
assert.Nil(t, err)
assert.Equal(t, 2, len(result.InsertedIDs))
})
doc := map[string]interface{}{"name": "test"}
bulkInserter.Insert(doc)
bulkInserter.Flush()
}

func TestBulkInserter_SetResultHandler(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockCollection := NewMockCollection(ctrl)
mockCollection.EXPECT().Clone().Return(nil)
bulkInserter, err := NewBulkInserter(mockCollection)
assert.NoError(t, err)
mockHandler := func(result *mongo.InsertManyResult, err error) {}
bulkInserter.SetResultHandler(mockHandler)
}

func TestDbInserter_RemoveAll(t *testing.T) {
inserter := &dbInserter{}
inserter.documents = []interface{}{}
docs := inserter.RemoveAll()
assert.NotNil(t, docs)
assert.Empty(t, inserter.documents)
}

func Test_dbInserter_Execute(t *testing.T) {
type fields struct {
collection collectionInserter
documents []any
resultHandler ResultHandler
}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockCollection := NewMockcollectionInserter(ctrl)
type args struct {
objs any
}
tests := []struct {
name string
fields fields
args args
mock func()
}{
{
name: "empty doc",
fields: fields{
collection: nil,
documents: nil,
resultHandler: nil,
},
args: args{
objs: make([]any, 0),
},
mock: func() {},
},
{
name: "result handler",
fields: fields{
collection: mockCollection,
resultHandler: func(result *mongo.InsertManyResult, err error) {
assert.NotNil(t, err)
},
},
args: args{
objs: make([]any, 1),
},
mock: func() {
mockCollection.EXPECT().InsertMany(gomock.Any(), gomock.Any()).Return(&mongo.InsertManyResult{}, errors.New("error"))
},
},
{
name: "normal error handler",
fields: fields{
collection: mockCollection,
resultHandler: nil,
},
args: args{
objs: make([]any, 1),
},
mock: func() {
mockCollection.EXPECT().InsertMany(gomock.Any(), gomock.Any()).Return(&mongo.InsertManyResult{}, errors.New("error"))
},
},
{
name: "no error",
fields: fields{
collection: mockCollection,
resultHandler: nil,
},
args: args{
objs: make([]any, 1),
},
mock: func() {
mockCollection.EXPECT().InsertMany(gomock.Any(), gomock.Any()).Return(&mongo.InsertManyResult{}, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.mock()
in := &dbInserter{
collection: tt.fields.collection,
documents: tt.fields.documents,
resultHandler: tt.fields.resultHandler,
}
in.Execute(tt.args.objs)
})
}
}
59 changes: 59 additions & 0 deletions core/stores/mon/v2/clientmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package mon

import (
"context"
"io"

"github.com/zeromicro/go-zero/core/syncx"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)

var clientManager = syncx.NewResourceManager()

// ClosableClient wraps *mongo.Client and provides a Close method.
type ClosableClient struct {
*mongo.Client
}

// Close disconnects the underlying *mongo.Client.
func (cs *ClosableClient) Close() error {
return cs.Client.Disconnect(context.Background())

Check warning on line 21 in core/stores/mon/v2/clientmanager.go

View check run for this annotation

Codecov / codecov/patch

core/stores/mon/v2/clientmanager.go#L20-L21

Added lines #L20 - L21 were not covered by tests
}

// Inject injects a *mongo.Client into the client manager.
// Typically, this is used to inject a *mongo.Client for test purpose.
func Inject(key string, client *mongo.Client) {
clientManager.Inject(key, &ClosableClient{client})
}

func getClient(url string, opts ...Option) (*mongo.Client, error) {
val, err := clientManager.GetResource(url, func() (io.Closer, error) {
o := options.Client().ApplyURI(url)
opts = append([]Option{defaultTimeoutOption()}, opts...)
for _, opt := range opts {
opt(o)
}

cli, err := mongo.Connect(o)
if err != nil {
return nil, err
}

err = cli.Ping(context.Background(), nil)
if err != nil {
return nil, err
}

Check warning on line 46 in core/stores/mon/v2/clientmanager.go

View check run for this annotation

Codecov / codecov/patch

core/stores/mon/v2/clientmanager.go#L43-L46

Added lines #L43 - L46 were not covered by tests

concurrentSess := &ClosableClient{
Client: cli,
}

return concurrentSess, nil

Check warning on line 52 in core/stores/mon/v2/clientmanager.go

View check run for this annotation

Codecov / codecov/patch

core/stores/mon/v2/clientmanager.go#L48-L52

Added lines #L48 - L52 were not covered by tests
})
if err != nil {
return nil, err
}

return val.(*ClosableClient).Client, nil
}
16 changes: 16 additions & 0 deletions core/stores/mon/v2/clientmanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package mon

import (
"testing"

"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/v2/mongo"
)

func TestClientManger_getClient(t *testing.T) {
c := &mongo.Client{}
Inject("foo", c)
cli, err := getClient("foo")
assert.Nil(t, err)
assert.Equal(t, c, cli)
}
Loading