diff --git a/cmd/migrate.go b/cmd/migrate.go index b5f761b..7701544 100644 --- a/cmd/migrate.go +++ b/cmd/migrate.go @@ -37,6 +37,9 @@ var ( retryCount int deleteSource bool workDir string + chunkSize int64 + chunkNumber int + batchSize int ) // migrateCmd is the migrateFromS3 sub command to migrate whole objects from some buckets. @@ -65,6 +68,9 @@ func init() { migrateCmd.Flags().IntVar(&retryCount, "retry", 3, "retry count for upload to dstorage") migrateCmd.Flags().StringVar(&newerThanStr, "newer-than", "", "eg; 7d10h --> migrate objects that is newer than 7 days 10 hours") migrateCmd.Flags().StringVar(&olderThanStr, "older-than", "", "eg; 7d10h --> migrate objects that is older than 7 days 10 hours") + migrateCmd.Flags().Int64Var(&chunkSize, "chunk-size", 50*1024*1024, "chunk size in bytes") + migrateCmd.Flags().IntVar(&chunkNumber, "chunk-number", 1000, "number of chunks to upload") + migrateCmd.Flags().IntVar(&batchSize, "batch-size", 30, "number of files to upload in a batch") } @@ -196,6 +202,16 @@ var migrateCmd = &cobra.Command{ return err } + if chunkNumber == 0 { + chunkNumber = 500 + } + if chunkSize == 0 { + chunkSize = 50 * 1024 * 1024 + } + if batchSize == 0 { + batchSize = 30 + } + mConfig := migration.MigrationConfig{ AllocationID: allocationId, Region: region, @@ -213,6 +229,9 @@ var migrateCmd = &cobra.Command{ StartAfter: startAfter, StateFilePath: stateFilePath, WorkDir: workDir, + ChunkSize: chunkSize, + ChunkNumber: chunkNumber, + BatchSize: batchSize, } if err := migration.InitMigration(&mConfig); err != nil { diff --git a/dstorage/dstorage.go b/dstorage/dstorage.go index 02049fa..b26d6f4 100644 --- a/dstorage/dstorage.go +++ b/dstorage/dstorage.go @@ -42,6 +42,7 @@ type DStoreI interface { GetAvailableSpace() int64 GetTotalSpace() int64 UpdateAllocationDetails() error + GetChunkWriteSize() int64 } type DStorageService struct { @@ -53,12 +54,12 @@ type DStorageService struct { //then duplicate path should be /path/to/remote/file{duplicateSuffix}.txt duplicateSuffix string workDir string + chunkNumer int } const ( GetRefRetryWaitTime = 500 * time.Millisecond GetRefRetryCount = 2 - ChunkNumber = 1000 ) func (d *DStorageService) GetFileMetaData(ctx context.Context, remotePath string) (*sdk.ORef, error) { @@ -110,7 +111,7 @@ func (d *DStorageService) Upload(ctx context.Context, remotePath string, r io.Re options := []sdk.ChunkedUploadOption{ sdk.WithEncrypt(d.encrypt), - sdk.WithChunkNumber(ChunkNumber), + sdk.WithChunkNumber(d.chunkNumer), } op := sdk.OperationRequest{ @@ -171,7 +172,7 @@ func (d *DStorageService) GetTotalSpace() int64 { return d.allocation.Size } -func GetDStorageService(allocationID, migrateTo, duplicateSuffix, workDir string, encrypt bool) (*DStorageService, error) { +func GetDStorageService(allocationID, migrateTo, duplicateSuffix, workDir string, encrypt bool, chunkNumber int) (*DStorageService, error) { allocation, err := sdk.GetAllocation(allocationID) if err != nil { @@ -196,5 +197,10 @@ func GetDStorageService(allocationID, migrateTo, duplicateSuffix, workDir string migrateTo: migrateTo, duplicateSuffix: duplicateSuffix, workDir: workDir, + chunkNumer: chunkNumber, }, nil } + +func (d *DStorageService) GetChunkWriteSize() int64 { + return d.allocation.GetChunkReadSize(d.encrypt) +} diff --git a/go.mod b/go.mod index eaf066e..031158f 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.20 require ( github.com/0chain/errors v1.0.3 - github.com/0chain/gosdk v1.8.17-0.20230726135820-9629181f245e + github.com/0chain/gosdk v1.8.17-0.20230809212922-e71a28baf114 github.com/aws/aws-sdk-go-v2 v1.17.1 github.com/aws/aws-sdk-go-v2/config v1.17.10 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.37 diff --git a/go.sum b/go.sum index 8df88b0..59422d4 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,8 @@ github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565 h1:z+DtCR8mBsjPnEs github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565/go.mod h1:UyDC8Qyl5z9lGkCnf9RHJPMektnFX8XtCJZHXCCVj8E= github.com/0chain/errors v1.0.3 h1:QQZPFxTfnMcRdt32DXbzRQIfGWmBsKoEdszKQDb0rRM= github.com/0chain/errors v1.0.3/go.mod h1:xymD6nVgrbgttWwkpSCfLLEJbFO6iHGQwk/yeSuYkIc= -github.com/0chain/gosdk v1.8.17-0.20230726135820-9629181f245e h1:6c8JM+0uQhnYcVgxHB+PCOgDyIbk4WY7BXsxmvL8cQM= -github.com/0chain/gosdk v1.8.17-0.20230726135820-9629181f245e/go.mod h1:3NKNYzmnMIYqZwwwOgZwMmTW1DT1ZUAmKyVPmYQOiT4= +github.com/0chain/gosdk v1.8.17-0.20230809212922-e71a28baf114 h1:fgaUQSUpAqhjhD3ONmiY+3yWn56qHADEd0TCoRcDSZ0= +github.com/0chain/gosdk v1.8.17-0.20230809212922-e71a28baf114/go.mod h1:3NKNYzmnMIYqZwwwOgZwMmTW1DT1ZUAmKyVPmYQOiT4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Luzifer/go-openssl/v3 v3.1.0 h1:QqKqo6kYXGGUsvtUoCpRZm8lHw+jDfhbzr36gVj+/gw= @@ -728,4 +728,4 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= -rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= +rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= \ No newline at end of file diff --git a/migration/migrate.go b/migration/migrate.go index b5f27ac..3c73cf1 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -33,7 +33,6 @@ const ( ) const ( - batchSize = 50 maxBatchSize = 1024 * 1024 * 1024 // 1GB ) @@ -75,6 +74,8 @@ type Migration struct { workDir string deleteSource bool bucket string + chunkSize int64 + batchSize int } type MigrationOperation struct { @@ -120,6 +121,7 @@ func InitMigration(mConfig *MigrationConfig) error { mConfig.DuplicateSuffix, mConfig.WorkDir, mConfig.Encrypt, + mConfig.ChunkNumber, ) if err != nil { zlogger.Logger.Error(err) @@ -160,6 +162,8 @@ func InitMigration(mConfig *MigrationConfig) error { workDir: mConfig.WorkDir, bucket: mConfig.Bucket, fs: util.Fs, + chunkSize: mConfig.ChunkSize, + batchSize: mConfig.BatchSize, } rootContext, rootContextCancel = context.WithCancel(context.Background()) @@ -232,7 +236,7 @@ func StartMigration() error { migrationWorker := NewMigrationWorker(migration.workDir) go migration.DownloadWorker(rootContext, migrationWorker) - go migration.UploadWorker(rootContext, migrationWorker) + // go migration.UploadWorker(rootContext, migrationWorker) migration.UpdateStateFile(migrationWorker) err := migrationWorker.GetMigrationError() if err != nil { @@ -247,21 +251,36 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorke defer migrator.CloseDownloadQueue() objCh, errCh := migration.awsStore.ListFilesInBucket(rootContext) wg := &sync.WaitGroup{} + ops := make([]MigrationOperation, 0, m.batchSize) + var opLock sync.Mutex + currentSize := 0 + opCtx, opCtxCancel := context.WithCancel(ctx) for obj := range objCh { zlogger.Logger.Info("Downloading object: ", obj.Key) migrator.PauseDownload() if migrator.IsMigrationError() { + opCtxCancel() return } - wg.Add(1) - + if currentSize >= m.batchSize { + processOps := ops + // Here scope of improvement + wg.Wait() + m.processMultiOperation(opCtx, processOps, migrator) + opCtxCancel() + opCtx, opCtxCancel = context.WithCancel(ctx) + ops = nil + currentSize = 0 + } + currentSize++ downloadObjMeta := &DownloadObjectMeta{ ObjectKey: obj.Key, Size: obj.Size, DoneChan: make(chan struct{}, 1), ErrChan: make(chan error, 1), + mimeType: obj.ContentType, } - + wg.Add(1) go func() { defer wg.Done() err := checkIsFileExist(ctx, downloadObjMeta) @@ -276,22 +295,31 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorke migrator.DownloadDone(downloadObjMeta, "", nil) return } - migrator.DownloadStart(downloadObjMeta) - zlogger.Logger.Info("download start", downloadObjMeta.ObjectKey, downloadObjMeta.Size) - downloadPath, err := m.awsStore.DownloadToFile(ctx, downloadObjMeta.ObjectKey) - migrator.DownloadDone(downloadObjMeta, downloadPath, err) - migrator.SetMigrationError(err) - zlogger.Logger.Info("download done", downloadObjMeta.ObjectKey, downloadObjMeta.Size, err) + dataChan := make(chan *util.DataChan, 200) + streamWriter := util.NewStreamWriter(dataChan) + go m.processChunkDownload(opCtx, streamWriter, migrator, downloadObjMeta) + // Always return nil as error + op, _ := processOperationForMemory(ctx, downloadObjMeta, streamWriter) + opLock.Lock() + ops = append(ops, op) + opLock.Unlock() }() - time.Sleep(1 * time.Second) } + if currentSize > 0 { + wg.Wait() + processOps := ops + // Here scope of improvement + m.processMultiOperation(ctx, processOps, migrator) + ops = nil + } + opCtxCancel() wg.Wait() err := <-errCh if err != nil { zlogger.Logger.Error(err) migrator.SetMigrationError(err) } - + migrator.CloseUploadQueue() } func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) { @@ -301,7 +329,7 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) downloadQueue := migrator.GetDownloadQueue() wg := &sync.WaitGroup{} - ops := make([]MigrationOperation, 0, batchSize) + ops := make([]MigrationOperation, 0, m.batchSize) totalSize := int64(0) for d := range downloadQueue { zlogger.Logger.Info("Uploading object: ", d.ObjectKey) @@ -337,7 +365,7 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) op.uploadObj = uploadObj ops = append(ops, op) totalSize += downloadObj.Size - if len(ops) >= batchSize || totalSize >= maxBatchSize { + if len(ops) >= m.batchSize || totalSize >= maxBatchSize { processOps := ops ops = nil wg.Add(1) @@ -427,6 +455,35 @@ func processOperation(ctx context.Context, downloadObj *DownloadObjectMeta) (Mig return op, nil } +func processOperationForMemory(ctx context.Context, downloadObj *DownloadObjectMeta, r io.Reader) (MigrationOperation, error) { + remotePath := getRemotePath(downloadObj.ObjectKey) + var op MigrationOperation + mimeType := downloadObj.mimeType + var fileOperation sdk.OperationRequest + if downloadObj.IsFileAlreadyExist { + switch migration.skip { + case Replace: + zlogger.Logger.Info("Replacing object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) + fileOperation = migration.zStore.Replace(ctx, remotePath, r, downloadObj.Size, mimeType) + case Duplicate: + zlogger.Logger.Info("Duplicating object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) + fileOperation = migration.zStore.Duplicate(ctx, remotePath, r, downloadObj.Size, mimeType) + } + } else { + zlogger.Logger.Info("Uploading object: " + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) + fileOperation = migration.zStore.Upload(ctx, remotePath, r, downloadObj.Size, mimeType, false) + } + op.Operation = fileOperation + op.uploadObj = &UploadObjectMeta{ + ObjectKey: downloadObj.ObjectKey, + DoneChan: make(chan struct{}, 1), + ErrChan: make(chan error, 1), + Size: downloadObj.Size, + LocalPath: downloadObj.LocalPath, + } + return op, nil +} + func processUpload(ctx context.Context, ops []sdk.OperationRequest) error { err := migration.zStore.MultiUpload(ctx, ops) @@ -467,7 +524,7 @@ func (m *Migration) UpdateStateFile(migrateHandler *MigrationWorker) { } } -func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOperation, migrator *MigrationWorker) { +func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOperation, migrator *MigrationWorker) error { var err error defer func() { for _, op := range ops { @@ -509,4 +566,44 @@ func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOp zlogger.Logger.Info("upload done: ", op.uploadObj.ObjectKey, "size ", op.uploadObj.Size, err) } migrator.SetMigrationError(err) + return err +} + +func (m *Migration) processChunkDownload(ctx context.Context, sw *util.StreamWriter, migrator *MigrationWorker, downloadObjMeta *DownloadObjectMeta) { + // chunk download and pipe data + + migrator.DownloadStart(downloadObjMeta) + offset := 0 + chunkSize := int(m.chunkSize) + acceptedChunkSize := int(m.zStore.GetChunkWriteSize()) + defer close(sw.DataChan) + for { + select { + case <-ctx.Done(): + return + default: + } + data, err := m.awsStore.DownloadToMemory(ctx, downloadObjMeta.ObjectKey, int64(offset), int64(chunkSize)) + if err != nil { + migrator.DownloadDone(downloadObjMeta, "", err) + ctx.Err() + return + } + if len(data) > 0 { + current := 0 + for ; current < len(data); current += acceptedChunkSize { + high := current + acceptedChunkSize + if high > len(data) { + high = len(data) + } + sw.Write(data[current:high]) + } + } + offset += chunkSize + // End of file + if len(data) < chunkSize { + break + } + } + migrator.DownloadDone(downloadObjMeta, "", nil) } diff --git a/migration/migrateConfig.go b/migration/migrateConfig.go index f030295..c7d6b50 100644 --- a/migration/migrateConfig.go +++ b/migration/migrateConfig.go @@ -19,4 +19,7 @@ type MigrationConfig struct { StartAfter string StateFilePath string WorkDir string + ChunkSize int64 + ChunkNumber int + BatchSize int } diff --git a/migration/migration_worker.go b/migration/migration_worker.go index 12969db..3c6e66e 100644 --- a/migration/migration_worker.go +++ b/migration/migration_worker.go @@ -37,6 +37,7 @@ type DownloadObjectMeta struct { DoneChan chan struct{} ErrChan chan error IsFileAlreadyExist bool + mimeType string } type UploadObjectMeta struct { diff --git a/s3/aws.go b/s3/aws.go index b51ef6b..19fc7ca 100644 --- a/s3/aws.go +++ b/s3/aws.go @@ -23,6 +23,7 @@ type AwsI interface { GetFileContent(ctx context.Context, objectKey string) (*Object, error) DeleteFile(ctx context.Context, objectKey string) error DownloadToFile(ctx context.Context, objectKey string) (string, error) + DownloadToMemory(ctx context.Context, objectKey string, offset int64, chunkSize int64) ([]byte, error) } type Object struct { @@ -33,8 +34,9 @@ type Object struct { // ObjectMeta key: object key, size: size of object in bytes type ObjectMeta struct { - Key string - Size int64 + Key string + Size int64 + ContentType string } type AwsClient struct { @@ -185,8 +187,12 @@ func (a *AwsClient) ListFilesInBucket(ctx context.Context) (<-chan *ObjectMeta, if a.olderThan != nil && creationTime.After(*a.olderThan) { continue } - - objectMetaChan <- &ObjectMeta{Key: aws.ToString(obj.Key), Size: obj.Size} + contentType, err := a.GetContentType(ctx, aws.ToString(obj.Key)) + if err != nil { + errChan <- err + return + } + objectMetaChan <- &ObjectMeta{Key: aws.ToString(obj.Key), Size: obj.Size, ContentType: contentType} } } }() @@ -206,6 +212,14 @@ func (a *AwsClient) GetFileContent(ctx context.Context, objectKey string) (*Obje }, nil } +func (a *AwsClient) GetContentType(ctx context.Context, objectKey string) (string, error) { + out, err := a.client.HeadObject(ctx, &awsS3.HeadObjectInput{Bucket: aws.String(a.bucket), Key: aws.String(objectKey)}) + if err != nil { + return "", err + } + return aws.ToString(out.ContentType), nil +} + func (a *AwsClient) DeleteFile(ctx context.Context, objectKey string) error { if !a.deleteSource { return nil @@ -222,7 +236,6 @@ func (a *AwsClient) DownloadToFile(ctx context.Context, objectKey string) (strin Bucket: aws.String(a.bucket), Key: aws.String(objectKey), } - fileName := encryption.Hash(objectKey) downloadPath := filepath.Join(a.workDir, fileName) f, err := os.Create(downloadPath) @@ -234,3 +247,18 @@ func (a *AwsClient) DownloadToFile(ctx context.Context, objectKey string) (strin _, err = a.downloader.Download(ctx, f, params) return downloadPath, err } + +func (a *AwsClient) DownloadToMemory(ctx context.Context, objectKey string, offset int64, chunkSize int64) ([]byte, error) { + ran := fmt.Sprintf("bytes=%d-%d", offset, offset+chunkSize-1) + params := &awsS3.GetObjectInput{ + Bucket: aws.String(a.bucket), + Key: aws.String(objectKey), + Range: &ran, + } + maxSize := chunkSize + bytearray := make([]byte, 0, maxSize) + buffer := manager.NewWriteAtBuffer(bytearray) + + n, err := a.downloader.Download(ctx, buffer, params) + return buffer.Bytes()[:n], err +} diff --git a/util/streamwriter.go b/util/streamwriter.go new file mode 100644 index 0000000..9ed5206 --- /dev/null +++ b/util/streamwriter.go @@ -0,0 +1,40 @@ +package util + +import ( + "io" +) + +type DataChan struct { + data []byte + err error +} + +func NewStreamWriter(dataChan chan *DataChan) *StreamWriter { + return &StreamWriter{dataChan} +} + +type StreamWriter struct { + DataChan chan *DataChan +} + +func (sw *StreamWriter) Read(p []byte) (int, error) { + recieveData, ok := <-sw.DataChan + if !ok && recieveData == nil { + return 0, io.EOF + } + if recieveData.err != nil { + return 0, recieveData.err + } + if len(recieveData.data) > len(p) { + return 0, io.ErrShortBuffer + } + copy(p, recieveData.data) + return len(recieveData.data), nil +} + +func (sw *StreamWriter) Write(p []byte) (int, error) { + data := &DataChan{data: p, err: nil} + sw.DataChan <- data + return len(p), nil + +}