From 716c74c8575efa8df253ffbaefb79396fb53e97f Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sun, 25 Jun 2023 02:12:07 +0530 Subject: [PATCH 1/5] add multi upload --- dstorage/dstorage.go | 93 +++++++++++++------- migration/migrate.go | 161 ++++++++++++++++++++++------------ migration/migration_worker.go | 1 + 3 files changed, 169 insertions(+), 86 deletions(-) diff --git a/dstorage/dstorage.go b/dstorage/dstorage.go index 84168e8..f0c4792 100644 --- a/dstorage/dstorage.go +++ b/dstorage/dstorage.go @@ -14,8 +14,8 @@ import ( "github.com/0chain/s3migration/util" zerror "github.com/0chain/s3migration/zErrors" + "github.com/0chain/gosdk/constants" "github.com/0chain/gosdk/zboxcore/sdk" - "github.com/0chain/gosdk/zboxcore/zboxutil" ) //use rate limiter here. @@ -34,9 +34,10 @@ import ( //go:generate mockgen -destination mocks/mock_dstorage.go -package mock_dstorage github.com/0chain/s3migration/dstorage DStoreI type DStoreI interface { GetFileMetaData(ctx context.Context, remotePath string) (*sdk.ORef, error) - Replace(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) error - Duplicate(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) error - Upload(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string, isUpdate bool) error + Replace(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) sdk.OperationRequest + Duplicate(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) sdk.OperationRequest + Upload(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string, isUpdate bool) sdk.OperationRequest + MultiUpload(ctx context.Context, ops []sdk.OperationRequest) error IsFileExist(ctx context.Context, remotePath string) (bool, error) GetAvailableSpace() int64 GetTotalSpace() int64 @@ -84,12 +85,55 @@ func (d *DStorageService) GetFileMetaData(ctx context.Context, remotePath string return &oResult.Refs[0], nil } -func (d *DStorageService) Upload(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string, isUpdate bool) (err error) { - cb := &statusCB{ - doneCh: make(chan struct{}, 1), - errCh: make(chan error, 1), +// func (d *DStorageService) Upload(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string, isUpdate bool) (err error) { +// cb := &statusCB{ +// doneCh: make(chan struct{}, 1), +// errCh: make(chan error, 1), +// } + +// fileMeta := sdk.FileMeta{ +// RemotePath: filepath.Clean(remotePath), +// ActualSize: size, +// MimeType: contentType, +// RemoteName: filepath.Base(remotePath), +// } + +// chunkUpload, err := sdk.CreateChunkedUpload(d.workDir, d.allocation, fileMeta, util.NewStreamReader(r), isUpdate, false, false, zboxutil.NewConnectionId(), +// sdk.WithStatusCallback(cb), +// sdk.WithEncrypt(d.encrypt), +// ) + +// if err != nil { +// return +// } + +// err = chunkUpload.Start() +// if err != nil { +// return +// } + +// select { +// case <-cb.doneCh: +// case err = <-cb.errCh: +// } + +// return +// } + +func (d *DStorageService) MultiUpload(ctx context.Context, ops []sdk.OperationRequest) (err error) { + err = d.allocation.DoMultiOperation(ops) + for _, op := range ops { + f := op.FileReader.(util.File) + f.Close() } + return err +} + +func (d *DStorageService) Replace(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) sdk.OperationRequest { + return d.Upload(ctx, remotePath, r, size, contentType, true) +} +func (d *DStorageService) Upload(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string, isUpdate bool) sdk.OperationRequest { fileMeta := sdk.FileMeta{ RemotePath: filepath.Clean(remotePath), ActualSize: size, @@ -97,33 +141,22 @@ func (d *DStorageService) Upload(ctx context.Context, remotePath string, r io.Re RemoteName: filepath.Base(remotePath), } - chunkUpload, err := sdk.CreateChunkedUpload(d.workDir, d.allocation, fileMeta, util.NewStreamReader(r), isUpdate, false, false, zboxutil.NewConnectionId(), - sdk.WithStatusCallback(cb), - sdk.WithEncrypt(d.encrypt), - ) - - if err != nil { - return + opType := constants.FileOperationInsert + if isUpdate { + opType = constants.FileOperationUpdate } - err = chunkUpload.Start() - if err != nil { - return + op := sdk.OperationRequest{ + OperationType: opType, + FileMeta: fileMeta, + Workdir: d.workDir, + FileReader: util.NewStreamReader(r), + RemotePath: remotePath, } - - select { - case <-cb.doneCh: - case err = <-cb.errCh: - } - - return -} - -func (d *DStorageService) Replace(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) error { - return d.Upload(ctx, remotePath, r, size, contentType, true) + return op } -func (d *DStorageService) Duplicate(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) error { +func (d *DStorageService) Duplicate(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string) sdk.OperationRequest { li := strings.LastIndex(remotePath, ".") var duplicateSuffix string diff --git a/migration/migrate.go b/migration/migrate.go index 25ab923..2c4b3e1 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -11,6 +11,7 @@ import ( "syscall" "time" + "github.com/0chain/gosdk/zboxcore/sdk" "github.com/0chain/gosdk/zboxcore/zboxutil" dStorage "github.com/0chain/s3migration/dstorage" zlogger "github.com/0chain/s3migration/logger" @@ -31,6 +32,11 @@ const ( sourceDeleteFailed = "source_delete.failed" ) +const ( + batchSize = 10 + maxBatchSize = 1024 * 1024 * 200 // 200 MB +) + var migration Migration // Use context for all requests. @@ -71,6 +77,12 @@ type Migration struct { bucket string } +type MigrationOperation struct { + Operation sdk.OperationRequest + uploadObj *UploadObjectMeta + closeFile func() error +} + func updateTotalObjects(awsStorageService *s3.AwsClient, wd string) error { f, err := os.Create(filepath.Join(wd, "files.total")) if err != nil { @@ -290,6 +302,8 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) downloadQueue := migrator.GetDownloadQueue() wg := &sync.WaitGroup{} + ops := make([]MigrationOperation, 0, batchSize) + totalSize := int64(0) for d := range downloadQueue { zlogger.Logger.Info("Uploading object: ", d.ObjectKey) migrator.PauseUpload() @@ -299,40 +313,48 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) DoneChan: make(chan struct{}, 1), ErrChan: make(chan error, 1), Size: downloadObj.Size, + LocalPath: downloadObj.LocalPath, } - wg.Add(1) - go func() { - defer wg.Done() - err := checkDownloadStatus(downloadObj) - if err != nil { - zlogger.Logger.Error(err) - migrator.SetMigrationError(err) - return - } - if downloadObj.IsFileAlreadyExist { - switch migration.skip { - case Skip: - migrator.UploadStart(uploadObj) - migrator.UploadDone(uploadObj, nil) - return - } + err := checkDownloadStatus(downloadObj) + if err != nil { + zlogger.Logger.Error(err) + migrator.SetMigrationError(err) + continue + } + if downloadObj.IsFileAlreadyExist { + switch migration.skip { + case Skip: + migrator.UploadStart(uploadObj) + migrator.UploadDone(uploadObj, nil) + continue } - - defer func() { - _ = m.fs.Remove(downloadObj.LocalPath) - }() - migrator.UploadStart(uploadObj) - zlogger.Logger.Info("upload start: ", uploadObj.ObjectKey, "size: ", uploadObj.Size) - err = util.Retry(3, time.Second*5, func() error { - err := processUpload(ctx, downloadObj) - return err - }) - - migrator.UploadDone(uploadObj, err) + } + op, err := processOperation(ctx, downloadObj) + if err != nil { + zlogger.Logger.Error(err) migrator.SetMigrationError(err) - zlogger.Logger.Info("upload done: ", uploadObj.ObjectKey, "size ", uploadObj.Size, err) - }() - time.Sleep(1 * time.Second) + continue + } + op.uploadObj = uploadObj + ops = append(ops, op) + totalSize += downloadObj.Size + if len(ops) >= batchSize || totalSize >= maxBatchSize { + processOps := ops + ops = nil + wg.Add(1) + go func(ops []MigrationOperation) { + m.processMultiOperation(ctx, ops, migrator) + wg.Done() + }(processOps) + time.Sleep(1 * time.Second) + } + } + if len(ops) > 0 { + wg.Add(1) + go func(ops []MigrationOperation) { + m.processMultiOperation(ctx, ops, migrator) + wg.Done() + }(ops) } wg.Wait() } @@ -369,58 +391,50 @@ func checkDownloadStatus(downloadObj *DownloadObjectMeta) error { } } -func processUpload(ctx context.Context, downloadObj *DownloadObjectMeta) error { +func processOperation(ctx context.Context, downloadObj *DownloadObjectMeta) (MigrationOperation, error) { remotePath := getRemotePath(downloadObj.ObjectKey) - + var op MigrationOperation fileObj, err := migration.fs.Open(downloadObj.LocalPath) if err != nil { zlogger.Logger.Error(err) - return err + return op, err } - - defer fileObj.Close() - fileInfo, err := fileObj.Stat() if err != nil { zlogger.Logger.Error(err) - return err + return op, err } mimeType, err := zboxutil.GetFileContentType(fileObj) if err != nil { zlogger.Logger.Error("content type error: ", err, " file: ", fileInfo.Name(), " objKey:", downloadObj.ObjectKey) - return err + return op, err } - + var fileOperation sdk.OperationRequest if downloadObj.IsFileAlreadyExist { switch migration.skip { case Replace: zlogger.Logger.Info("Replacing object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) - err = migration.zStore.Replace(ctx, remotePath, fileObj, fileInfo.Size(), mimeType) + fileOperation = migration.zStore.Replace(ctx, remotePath, fileObj, downloadObj.Size, mimeType) case Duplicate: zlogger.Logger.Info("Duplicating object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) - err = migration.zStore.Duplicate(ctx, remotePath, fileObj, fileInfo.Size(), mimeType) + fileOperation = migration.zStore.Duplicate(ctx, remotePath, fileObj, downloadObj.Size, mimeType) } } else { zlogger.Logger.Info("Uploading object: " + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) - err = migration.zStore.Upload(ctx, remotePath, fileObj, fileInfo.Size(), mimeType, false) + fileOperation = migration.zStore.Upload(ctx, remotePath, fileObj, downloadObj.Size, mimeType, false) } + op.closeFile = fileObj.Close + op.Operation = fileOperation + return op, nil +} + +func processUpload(ctx context.Context, ops []sdk.OperationRequest) error { + err := migration.zStore.MultiUpload(ctx, ops) if err != nil { zlogger.Logger.Error(err) - return err - } else { - if migration.deleteSource { - if err := migration.awsStore.DeleteFile(ctx, downloadObj.ObjectKey); err != nil { - zlogger.Logger.Error(err) - dsFileHandler.Write([]byte(downloadObj.ObjectKey + "\n")) - } - } - migration.szCtMu.Lock() - migration.migratedSize += uint64(downloadObj.Size) - migration.totalMigratedObjects++ - migration.szCtMu.Unlock() - return nil } + return err } func (m *Migration) UpdateStateFile(migrateHandler *MigrationWorker) { @@ -453,3 +467,38 @@ func (m *Migration) UpdateStateFile(migrateHandler *MigrationWorker) { } } } + +func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOperation, migrator *MigrationWorker) { + var err error + defer func() { + for _, op := range ops { + if migration.deleteSource && err == nil { + if deleteErr := migration.awsStore.DeleteFile(ctx, op.uploadObj.ObjectKey); deleteErr != nil { + zlogger.Logger.Error(deleteErr) + dsFileHandler.Write([]byte(op.uploadObj.ObjectKey + "\n")) + } + } + migration.szCtMu.Lock() + migration.migratedSize += uint64(op.uploadObj.Size) + migration.totalMigratedObjects++ + migration.szCtMu.Unlock() + _ = op.closeFile() + _ = m.fs.Remove(op.uploadObj.LocalPath) + } + }() + fileOps := make([]sdk.OperationRequest, 0, len(ops)) + for _, op := range ops { + migrator.UploadStart(op.uploadObj) + zlogger.Logger.Info("upload start: ", op.uploadObj.ObjectKey, "size: ", op.uploadObj.Size) + fileOps = append(fileOps, op.Operation) + } + err = util.Retry(3, time.Second*5, func() error { + err := processUpload(ctx, fileOps) + return err + }) + for _, op := range ops { + migrator.UploadDone(op.uploadObj, err) + zlogger.Logger.Info("upload done: ", op.uploadObj.ObjectKey, "size ", op.uploadObj.Size, err) + } + migrator.SetMigrationError(err) +} diff --git a/migration/migration_worker.go b/migration/migration_worker.go index 7c100e9..12969db 100644 --- a/migration/migration_worker.go +++ b/migration/migration_worker.go @@ -44,6 +44,7 @@ type UploadObjectMeta struct { Size int64 DoneChan chan struct{} ErrChan chan error + LocalPath string } func NewMigrationWorker(wd string) *MigrationWorker { From 9916e4950590eaf96f54170bc15d2a4dfc70ecb1 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sun, 25 Jun 2023 03:54:10 +0530 Subject: [PATCH 2/5] reset size and add opts --- dstorage/dstorage.go | 7 +++++++ migration/migrate.go | 1 + 2 files changed, 8 insertions(+) diff --git a/dstorage/dstorage.go b/dstorage/dstorage.go index f0c4792..6f68ca1 100644 --- a/dstorage/dstorage.go +++ b/dstorage/dstorage.go @@ -58,6 +58,7 @@ type DStorageService struct { const ( GetRefRetryWaitTime = 500 * time.Millisecond GetRefRetryCount = 2 + ChunkNumber = 100 ) func (d *DStorageService) GetFileMetaData(ctx context.Context, remotePath string) (*sdk.ORef, error) { @@ -146,12 +147,18 @@ func (d *DStorageService) Upload(ctx context.Context, remotePath string, r io.Re opType = constants.FileOperationUpdate } + options := []sdk.ChunkedUploadOption{ + sdk.WithEncrypt(d.encrypt), + sdk.WithChunkNumber(ChunkNumber), + } + op := sdk.OperationRequest{ OperationType: opType, FileMeta: fileMeta, Workdir: d.workDir, FileReader: util.NewStreamReader(r), RemotePath: remotePath, + Opts: options, } return op } diff --git a/migration/migrate.go b/migration/migrate.go index 2c4b3e1..2346a1a 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -346,6 +346,7 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) m.processMultiOperation(ctx, ops, migrator) wg.Done() }(processOps) + totalSize = 0 time.Sleep(1 * time.Second) } } From 3d81945e262e538ff1313edeaedf7c3abaa7319d Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sun, 25 Jun 2023 03:58:55 +0530 Subject: [PATCH 3/5] update batchSize and chunk number --- dstorage/dstorage.go | 2 +- migration/migrate.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dstorage/dstorage.go b/dstorage/dstorage.go index 6f68ca1..b237fbd 100644 --- a/dstorage/dstorage.go +++ b/dstorage/dstorage.go @@ -58,7 +58,7 @@ type DStorageService struct { const ( GetRefRetryWaitTime = 500 * time.Millisecond GetRefRetryCount = 2 - ChunkNumber = 100 + ChunkNumber = 1000 ) func (d *DStorageService) GetFileMetaData(ctx context.Context, remotePath string) (*sdk.ORef, error) { diff --git a/migration/migrate.go b/migration/migrate.go index 2346a1a..2f9cf9f 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -33,8 +33,8 @@ const ( ) const ( - batchSize = 10 - maxBatchSize = 1024 * 1024 * 200 // 200 MB + batchSize = 50 + maxBatchSize = 1024 * 1024 * 1024 // 1GB ) var migration Migration From 602a266602806c42eb14fdf1f434e58fa3d03008 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Tue, 27 Jun 2023 00:38:48 +0530 Subject: [PATCH 4/5] add close on utilstream reader --- dstorage/dstorage.go | 35 ----------------------------------- migration/migrate.go | 6 +++--- util/streamreader.go | 7 +++++++ 3 files changed, 10 insertions(+), 38 deletions(-) diff --git a/dstorage/dstorage.go b/dstorage/dstorage.go index b237fbd..5d19fe4 100644 --- a/dstorage/dstorage.go +++ b/dstorage/dstorage.go @@ -86,41 +86,6 @@ func (d *DStorageService) GetFileMetaData(ctx context.Context, remotePath string return &oResult.Refs[0], nil } -// func (d *DStorageService) Upload(ctx context.Context, remotePath string, r io.Reader, size int64, contentType string, isUpdate bool) (err error) { -// cb := &statusCB{ -// doneCh: make(chan struct{}, 1), -// errCh: make(chan error, 1), -// } - -// fileMeta := sdk.FileMeta{ -// RemotePath: filepath.Clean(remotePath), -// ActualSize: size, -// MimeType: contentType, -// RemoteName: filepath.Base(remotePath), -// } - -// chunkUpload, err := sdk.CreateChunkedUpload(d.workDir, d.allocation, fileMeta, util.NewStreamReader(r), isUpdate, false, false, zboxutil.NewConnectionId(), -// sdk.WithStatusCallback(cb), -// sdk.WithEncrypt(d.encrypt), -// ) - -// if err != nil { -// return -// } - -// err = chunkUpload.Start() -// if err != nil { -// return -// } - -// select { -// case <-cb.doneCh: -// case err = <-cb.errCh: -// } - -// return -// } - func (d *DStorageService) MultiUpload(ctx context.Context, ops []sdk.OperationRequest) (err error) { err = d.allocation.DoMultiOperation(ops) for _, op := range ops { diff --git a/migration/migrate.go b/migration/migrate.go index 2f9cf9f..0a967a1 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -80,7 +80,6 @@ type Migration struct { type MigrationOperation struct { Operation sdk.OperationRequest uploadObj *UploadObjectMeta - closeFile func() error } func updateTotalObjects(awsStorageService *s3.AwsClient, wd string) error { @@ -424,7 +423,6 @@ func processOperation(ctx context.Context, downloadObj *DownloadObjectMeta) (Mig zlogger.Logger.Info("Uploading object: " + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) fileOperation = migration.zStore.Upload(ctx, remotePath, fileObj, downloadObj.Size, mimeType, false) } - op.closeFile = fileObj.Close op.Operation = fileOperation return op, nil } @@ -483,7 +481,9 @@ func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOp migration.migratedSize += uint64(op.uploadObj.Size) migration.totalMigratedObjects++ migration.szCtMu.Unlock() - _ = op.closeFile() + if closer, ok := op.Operation.FileReader.(io.Closer); ok { + _ = closer.Close() + } _ = m.fs.Remove(op.uploadObj.LocalPath) } }() diff --git a/util/streamreader.go b/util/streamreader.go index 700f29a..ccc7bbc 100644 --- a/util/streamreader.go +++ b/util/streamreader.go @@ -23,3 +23,10 @@ func (r *StreamReader) Read(p []byte) (int, error) { } return bLen, nil } + +func (r *StreamReader) Close() error { + if closer, ok := r.Reader.(io.Closer); ok { + return closer.Close() + } + return nil +} From 574257147793d740ead3ea631f52ec34a341b4ec Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Tue, 27 Jun 2023 19:12:36 +0530 Subject: [PATCH 5/5] convert to stream reader --- migration/migrate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/migration/migrate.go b/migration/migrate.go index 0a967a1..5b811a2 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -481,7 +481,7 @@ func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOp migration.migratedSize += uint64(op.uploadObj.Size) migration.totalMigratedObjects++ migration.szCtMu.Unlock() - if closer, ok := op.Operation.FileReader.(io.Closer); ok { + if closer, ok := op.Operation.FileReader.(*util.StreamReader); ok { _ = closer.Close() } _ = m.fs.Remove(op.uploadObj.LocalPath)