Skip to content

Commit

Permalink
Merge pull request #72 from 0chain/feature/mutli-upload
Browse files Browse the repository at this point in the history
Add multi upload
  • Loading branch information
dabasov authored Jun 27, 2023
2 parents 047662c + c50c009 commit c0d99e5
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 84 deletions.
61 changes: 33 additions & 28 deletions dstorage/dstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/0chain/s3migration/util"
zerror "github.com/0chain/s3migration/zErrors"

"github.com/0chain/gosdk/constants"
"github.com/0chain/gosdk/zboxcore/sdk"
"github.com/0chain/gosdk/zboxcore/zboxutil"
)

//use rate limiter here.
Expand All @@ -34,9 +34,10 @@ import (
//go:generate mockgen -destination mocks/mock_dstorage.go -package mock_dstorage github.com/0chain/s3migration/dstorage DStoreI
type DStoreI interface {
GetFileMetaData(ctx context.Context, remotePath string) (*sdk.ORef, error)
Replace(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) error
Duplicate(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) error
Upload(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string, isUpdate bool) error
Replace(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) sdk.OperationRequest
Duplicate(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) sdk.OperationRequest
Upload(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string, isUpdate bool) sdk.OperationRequest
MultiUpload(ctx context.Context, ops []sdk.OperationRequest) error
IsFileExist(ctx context.Context, remotePath string) (bool, error)
GetAvailableSpace() int64
GetTotalSpace() int64
Expand All @@ -57,6 +58,7 @@ type DStorageService struct {
const (
GetRefRetryWaitTime = 500 * time.Millisecond
GetRefRetryCount = 2
ChunkNumber = 1000
)

func (d *DStorageService) GetFileMetaData(ctx context.Context, remotePath string) (*sdk.ORef, error) {
Expand Down Expand Up @@ -84,46 +86,49 @@ func (d *DStorageService) GetFileMetaData(ctx context.Context, remotePath string
return &oResult.Refs[0], nil
}

func (d *DStorageService) Upload(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string, isUpdate bool) (err error) {
cb := &statusCB{
doneCh: make(chan struct{}, 1),
errCh: make(chan error, 1),
func (d *DStorageService) MultiUpload(ctx context.Context, ops []sdk.OperationRequest) (err error) {
err = d.allocation.DoMultiOperation(ops)
for _, op := range ops {
f := op.FileReader.(util.File)
f.Close()
}
return err
}

func (d *DStorageService) Replace(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) sdk.OperationRequest {
return d.Upload(ctx, remotePath, r, size, contentType, true)
}

func (d *DStorageService) Upload(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string, isUpdate bool) sdk.OperationRequest {
fileMeta := sdk.FileMeta{
RemotePath: filepath.Clean(remotePath),
ActualSize: size,
MimeType: contentType,
RemoteName: filepath.Base(remotePath),
}

chunkUpload, err := sdk.CreateChunkedUpload(d.workDir, d.allocation, fileMeta, util.NewStreamReader(r), isUpdate, false, false, zboxutil.NewConnectionId(),
sdk.WithStatusCallback(cb),
sdk.WithEncrypt(d.encrypt),
)

if err != nil {
return
opType := constants.FileOperationInsert
if isUpdate {
opType = constants.FileOperationUpdate
}

err = chunkUpload.Start()
if err != nil {
return
options := []sdk.ChunkedUploadOption{
sdk.WithEncrypt(d.encrypt),
sdk.WithChunkNumber(ChunkNumber),
}

select {
case <-cb.doneCh:
case err = <-cb.errCh:
op := sdk.OperationRequest{
OperationType: opType,
FileMeta: fileMeta,
Workdir: d.workDir,
FileReader: util.NewStreamReader(r),
RemotePath: remotePath,
Opts: options,
}

return
}

func (d *DStorageService) Replace(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) error {
return d.Upload(ctx, remotePath, r, size, contentType, true)
return op
}

func (d *DStorageService) Duplicate(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) error {
func (d *DStorageService) Duplicate(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) sdk.OperationRequest {
li := strings.LastIndex(remotePath, ".")

var duplicateSuffix string
Expand Down
162 changes: 106 additions & 56 deletions migration/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"syscall"
"time"

"github.com/0chain/gosdk/zboxcore/sdk"
"github.com/0chain/gosdk/zboxcore/zboxutil"
dStorage "github.com/0chain/s3migration/dstorage"
zlogger "github.com/0chain/s3migration/logger"
Expand All @@ -31,6 +32,11 @@ const (
sourceDeleteFailed = "source_delete.failed"
)

const (
batchSize = 50
maxBatchSize = 1024 * 1024 * 1024 // 1GB
)

var migration Migration

// Use context for all requests.
Expand Down Expand Up @@ -71,6 +77,11 @@ type Migration struct {
bucket string
}

type MigrationOperation struct {
Operation sdk.OperationRequest
uploadObj *UploadObjectMeta
}

func updateTotalObjects(awsStorageService *s3.AwsClient, wd string) error {
f, err := os.Create(filepath.Join(wd, "files.total"))
if err != nil {
Expand Down Expand Up @@ -290,6 +301,8 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker)

downloadQueue := migrator.GetDownloadQueue()
wg := &sync.WaitGroup{}
ops := make([]MigrationOperation, 0, batchSize)
totalSize := int64(0)
for d := range downloadQueue {
zlogger.Logger.Info("Uploading object: ", d.ObjectKey)
migrator.PauseUpload()
Expand All @@ -299,40 +312,49 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker)
DoneChan: make(chan struct{}, 1),
ErrChan: make(chan error, 1),
Size: downloadObj.Size,
LocalPath: downloadObj.LocalPath,
}
wg.Add(1)
go func() {
defer wg.Done()
err := checkDownloadStatus(downloadObj)
if err != nil {
zlogger.Logger.Error(err)
migrator.SetMigrationError(err)
return
}
if downloadObj.IsFileAlreadyExist {
switch migration.skip {
case Skip:
migrator.UploadStart(uploadObj)
migrator.UploadDone(uploadObj, nil)
return
}
err := checkDownloadStatus(downloadObj)
if err != nil {
zlogger.Logger.Error(err)
migrator.SetMigrationError(err)
continue
}
if downloadObj.IsFileAlreadyExist {
switch migration.skip {
case Skip:
migrator.UploadStart(uploadObj)
migrator.UploadDone(uploadObj, nil)
continue
}

defer func() {
_ = m.fs.Remove(downloadObj.LocalPath)
}()
migrator.UploadStart(uploadObj)
zlogger.Logger.Info("upload start: ", uploadObj.ObjectKey, "size: ", uploadObj.Size)
err = util.Retry(3, time.Second*5, func() error {
err := processUpload(ctx, downloadObj)
return err
})

migrator.UploadDone(uploadObj, err)
}
op, err := processOperation(ctx, downloadObj)
if err != nil {
zlogger.Logger.Error(err)
migrator.SetMigrationError(err)
zlogger.Logger.Info("upload done: ", uploadObj.ObjectKey, "size ", uploadObj.Size, err)
}()
time.Sleep(1 * time.Second)
continue
}
op.uploadObj = uploadObj
ops = append(ops, op)
totalSize += downloadObj.Size
if len(ops) >= batchSize || totalSize >= maxBatchSize {
processOps := ops
ops = nil
wg.Add(1)
go func(ops []MigrationOperation) {
m.processMultiOperation(ctx, ops, migrator)
wg.Done()
}(processOps)
totalSize = 0
time.Sleep(1 * time.Second)
}
}
if len(ops) > 0 {
wg.Add(1)
go func(ops []MigrationOperation) {
m.processMultiOperation(ctx, ops, migrator)
wg.Done()
}(ops)
}
wg.Wait()
}
Expand Down Expand Up @@ -369,58 +391,49 @@ func checkDownloadStatus(downloadObj *DownloadObjectMeta) error {
}
}

func processUpload(ctx context.Context, downloadObj *DownloadObjectMeta) error {
func processOperation(ctx context.Context, downloadObj *DownloadObjectMeta) (MigrationOperation, error) {
remotePath := getRemotePath(downloadObj.ObjectKey)

var op MigrationOperation
fileObj, err := migration.fs.Open(downloadObj.LocalPath)
if err != nil {
zlogger.Logger.Error(err)
return err
return op, err
}

defer fileObj.Close()

fileInfo, err := fileObj.Stat()
if err != nil {
zlogger.Logger.Error(err)
return err
return op, err
}
mimeType, err := zboxutil.GetFileContentType(fileObj)
if err != nil {
zlogger.Logger.Error("content type error: ", err, " file: ", fileInfo.Name(), " objKey:", downloadObj.ObjectKey)
return err
return op, err
}

var fileOperation sdk.OperationRequest
if downloadObj.IsFileAlreadyExist {
switch migration.skip {
case Replace:
zlogger.Logger.Info("Replacing object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10))
err = migration.zStore.Replace(ctx, remotePath, fileObj, fileInfo.Size(), mimeType)
fileOperation = migration.zStore.Replace(ctx, remotePath, fileObj, downloadObj.Size, mimeType)
case Duplicate:
zlogger.Logger.Info("Duplicating object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10))
err = migration.zStore.Duplicate(ctx, remotePath, fileObj, fileInfo.Size(), mimeType)
fileOperation = migration.zStore.Duplicate(ctx, remotePath, fileObj, downloadObj.Size, mimeType)
}
} else {
zlogger.Logger.Info("Uploading object: " + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10))
err = migration.zStore.Upload(ctx, remotePath, fileObj, fileInfo.Size(), mimeType, false)
fileOperation = migration.zStore.Upload(ctx, remotePath, fileObj, downloadObj.Size, mimeType, false)
}
op.Operation = fileOperation
return op, nil
}

func processUpload(ctx context.Context, ops []sdk.OperationRequest) error {

err := migration.zStore.MultiUpload(ctx, ops)
if err != nil {
zlogger.Logger.Error(err)
return err
} else {
if migration.deleteSource {
if err := migration.awsStore.DeleteFile(ctx, downloadObj.ObjectKey); err != nil {
zlogger.Logger.Error(err)
dsFileHandler.Write([]byte(downloadObj.ObjectKey + "\n"))
}
}
migration.szCtMu.Lock()
migration.migratedSize += uint64(downloadObj.Size)
migration.totalMigratedObjects++
migration.szCtMu.Unlock()
return nil
}
return err
}

func (m *Migration) UpdateStateFile(migrateHandler *MigrationWorker) {
Expand Down Expand Up @@ -453,3 +466,40 @@ func (m *Migration) UpdateStateFile(migrateHandler *MigrationWorker) {
}
}
}

func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOperation, migrator *MigrationWorker) {
var err error
defer func() {
for _, op := range ops {
if migration.deleteSource && err == nil {
if deleteErr := migration.awsStore.DeleteFile(ctx, op.uploadObj.ObjectKey); deleteErr != nil {
zlogger.Logger.Error(deleteErr)
dsFileHandler.Write([]byte(op.uploadObj.ObjectKey + "\n"))
}
}
migration.szCtMu.Lock()
migration.migratedSize += uint64(op.uploadObj.Size)
migration.totalMigratedObjects++
migration.szCtMu.Unlock()
if closer, ok := op.Operation.FileReader.(*util.StreamReader); ok {
_ = closer.Close()
}
_ = m.fs.Remove(op.uploadObj.LocalPath)
}
}()
fileOps := make([]sdk.OperationRequest, 0, len(ops))
for _, op := range ops {
migrator.UploadStart(op.uploadObj)
zlogger.Logger.Info("upload start: ", op.uploadObj.ObjectKey, "size: ", op.uploadObj.Size)
fileOps = append(fileOps, op.Operation)
}
err = util.Retry(3, time.Second*5, func() error {
err := processUpload(ctx, fileOps)
return err
})
for _, op := range ops {
migrator.UploadDone(op.uploadObj, err)
zlogger.Logger.Info("upload done: ", op.uploadObj.ObjectKey, "size ", op.uploadObj.Size, err)
}
migrator.SetMigrationError(err)
}
1 change: 1 addition & 0 deletions migration/migration_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type UploadObjectMeta struct {
Size int64
DoneChan chan struct{}
ErrChan chan error
LocalPath string
}

func NewMigrationWorker(wd string) *MigrationWorker {
Expand Down
7 changes: 7 additions & 0 deletions util/streamreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,10 @@ func (r *StreamReader) Read(p []byte) (int, error) {
}
return bLen, nil
}

func (r *StreamReader) Close() error {
if closer, ok := r.Reader.(io.Closer); ok {
return closer.Close()
}
return nil
}

0 comments on commit c0d99e5

Please sign in to comment.