Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sprint-july-4 #85

Merged
merged 27 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
bf1023e
Add download to memory function
Roshan-Mehta Jul 7, 2023
e1e8ae6
Add chunk memory download
Roshan-Mehta Jul 11, 2023
6f9e5cb
Updated help message
Jayashsatolia403 Jun 30, 2023
3a098dc
clean working dir if it's the default one
Kishan-Dhakan Jul 1, 2023
f172a82
fix max alloc size test
Hitenjain14 Jul 4, 2023
1b6e668
integrate with repo snapshots
stewartie4 Jul 9, 2023
3ccedcc
Merge branch 'staging' into feature/download-in-memory
Roshan-Mehta Jul 11, 2023
ca83f57
Increase chunk size
Roshan-Mehta Jul 11, 2023
8b4b356
Modify chunk size
Roshan-Mehta Jul 11, 2023
1d51e95
todo changes
Hitenjain14 Jul 26, 2023
6373a6f
merge staging
Hitenjain14 Jul 27, 2023
2c512c5
check errChan
Hitenjain14 Jul 27, 2023
a0ea3af
add cancel ctx after multi op
Hitenjain14 Jul 27, 2023
2847c53
fox accepted chunk size
Hitenjain14 Jul 29, 2023
9349b37
update gosdk
Hitenjain14 Jul 29, 2023
db38cbd
increase bacthsize and chunk size
Hitenjain14 Jul 29, 2023
260fb85
reset current size
Hitenjain14 Jul 29, 2023
985d65a
update gosdk
Kishan-Dhakan Aug 3, 2023
012b427
Merge branch 'sprint-july-4' of https://github.com/0chain/s3-migratio…
Kishan-Dhakan Aug 3, 2023
5f27aae
update gosdk
Hitenjain14 Aug 8, 2023
9e43742
move vars to config
Hitenjain14 Aug 8, 2023
6ce9beb
Merge branch 'staging' into sprint-july-4
Kishan-Dhakan Aug 11, 2023
c2d0c42
update gosdk
Kishan-Dhakan Aug 11, 2023
17dae08
change default chunks to 500
Hitenjain14 Aug 13, 2023
aaff333
change ref exist
Hitenjain14 Aug 14, 2023
7babe3b
update gosdk
Hitenjain14 Aug 14, 2023
6b320df
Merge pull request #80 from 0chain/feature/download-in-memory
Kishan-Dhakan Aug 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ var (
retryCount int
deleteSource bool
workDir string
chunkSize int64
chunkNumber int
batchSize int
)

// migrateCmd is the migrateFromS3 sub command to migrate whole objects from some buckets.
Expand Down Expand Up @@ -65,6 +68,9 @@ func init() {
migrateCmd.Flags().IntVar(&retryCount, "retry", 3, "retry count for upload to dstorage")
migrateCmd.Flags().StringVar(&newerThanStr, "newer-than", "", "eg; 7d10h --> migrate objects that is newer than 7 days 10 hours")
migrateCmd.Flags().StringVar(&olderThanStr, "older-than", "", "eg; 7d10h --> migrate objects that is older than 7 days 10 hours")
migrateCmd.Flags().Int64Var(&chunkSize, "chunk-size", 50*1024*1024, "chunk size in bytes")
migrateCmd.Flags().IntVar(&chunkNumber, "chunk-number", 1000, "number of chunks to upload")
migrateCmd.Flags().IntVar(&batchSize, "batch-size", 30, "number of files to upload in a batch")

}

Expand Down Expand Up @@ -196,6 +202,16 @@ var migrateCmd = &cobra.Command{
return err
}

if chunkNumber == 0 {
chunkNumber = 500
}
if chunkSize == 0 {
chunkSize = 50 * 1024 * 1024
}
if batchSize == 0 {
batchSize = 30
}

mConfig := migration.MigrationConfig{
AllocationID: allocationId,
Region: region,
Expand All @@ -213,6 +229,9 @@ var migrateCmd = &cobra.Command{
StartAfter: startAfter,
StateFilePath: stateFilePath,
WorkDir: workDir,
ChunkSize: chunkSize,
ChunkNumber: chunkNumber,
BatchSize: batchSize,
}

if err := migration.InitMigration(&mConfig); err != nil {
Expand Down
12 changes: 9 additions & 3 deletions dstorage/dstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type DStoreI interface {
GetAvailableSpace() int64
GetTotalSpace() int64
UpdateAllocationDetails() error
GetChunkWriteSize() int64
}

type DStorageService struct {
Expand All @@ -53,12 +54,12 @@ type DStorageService struct {
//then duplicate path should be /path/to/remote/file{duplicateSuffix}.txt
duplicateSuffix string
workDir string
chunkNumer int
}

const (
GetRefRetryWaitTime = 500 * time.Millisecond
GetRefRetryCount = 2
ChunkNumber = 1000
)

func (d *DStorageService) GetFileMetaData(ctx context.Context, remotePath string) (*sdk.ORef, error) {
Expand Down Expand Up @@ -110,7 +111,7 @@ func (d *DStorageService) Upload(ctx context.Context, remotePath string, r io.Re

options := []sdk.ChunkedUploadOption{
sdk.WithEncrypt(d.encrypt),
sdk.WithChunkNumber(ChunkNumber),
sdk.WithChunkNumber(d.chunkNumer),
}

op := sdk.OperationRequest{
Expand Down Expand Up @@ -171,7 +172,7 @@ func (d *DStorageService) GetTotalSpace() int64 {
return d.allocation.Size
}

func GetDStorageService(allocationID, migrateTo, duplicateSuffix, workDir string, encrypt bool) (*DStorageService, error) {
func GetDStorageService(allocationID, migrateTo, duplicateSuffix, workDir string, encrypt bool, chunkNumber int) (*DStorageService, error) {
allocation, err := sdk.GetAllocation(allocationID)

if err != nil {
Expand All @@ -196,5 +197,10 @@ func GetDStorageService(allocationID, migrateTo, duplicateSuffix, workDir string
migrateTo: migrateTo,
duplicateSuffix: duplicateSuffix,
workDir: workDir,
chunkNumer: chunkNumber,
}, nil
}

func (d *DStorageService) GetChunkWriteSize() int64 {
return d.allocation.GetChunkReadSize(d.encrypt)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/0chain/errors v1.0.3
github.com/0chain/gosdk v1.8.17-0.20230726135820-9629181f245e
github.com/0chain/gosdk v1.8.17-0.20230809212922-e71a28baf114
github.com/aws/aws-sdk-go-v2 v1.17.1
github.com/aws/aws-sdk-go-v2/config v1.17.10
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.37
Expand Down
6 changes: 3 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565 h1:z+DtCR8mBsjPnEs
github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565/go.mod h1:UyDC8Qyl5z9lGkCnf9RHJPMektnFX8XtCJZHXCCVj8E=
github.com/0chain/errors v1.0.3 h1:QQZPFxTfnMcRdt32DXbzRQIfGWmBsKoEdszKQDb0rRM=
github.com/0chain/errors v1.0.3/go.mod h1:xymD6nVgrbgttWwkpSCfLLEJbFO6iHGQwk/yeSuYkIc=
github.com/0chain/gosdk v1.8.17-0.20230726135820-9629181f245e h1:6c8JM+0uQhnYcVgxHB+PCOgDyIbk4WY7BXsxmvL8cQM=
github.com/0chain/gosdk v1.8.17-0.20230726135820-9629181f245e/go.mod h1:3NKNYzmnMIYqZwwwOgZwMmTW1DT1ZUAmKyVPmYQOiT4=
github.com/0chain/gosdk v1.8.17-0.20230809212922-e71a28baf114 h1:fgaUQSUpAqhjhD3ONmiY+3yWn56qHADEd0TCoRcDSZ0=
github.com/0chain/gosdk v1.8.17-0.20230809212922-e71a28baf114/go.mod h1:3NKNYzmnMIYqZwwwOgZwMmTW1DT1ZUAmKyVPmYQOiT4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Luzifer/go-openssl/v3 v3.1.0 h1:QqKqo6kYXGGUsvtUoCpRZm8lHw+jDfhbzr36gVj+/gw=
Expand Down Expand Up @@ -728,4 +728,4 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
129 changes: 113 additions & 16 deletions migration/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const (
)

const (
batchSize = 50
maxBatchSize = 1024 * 1024 * 1024 // 1GB
)

Expand Down Expand Up @@ -75,6 +74,8 @@ type Migration struct {
workDir string
deleteSource bool
bucket string
chunkSize int64
batchSize int
}

type MigrationOperation struct {
Expand Down Expand Up @@ -120,6 +121,7 @@ func InitMigration(mConfig *MigrationConfig) error {
mConfig.DuplicateSuffix,
mConfig.WorkDir,
mConfig.Encrypt,
mConfig.ChunkNumber,
)
if err != nil {
zlogger.Logger.Error(err)
Expand Down Expand Up @@ -160,6 +162,8 @@ func InitMigration(mConfig *MigrationConfig) error {
workDir: mConfig.WorkDir,
bucket: mConfig.Bucket,
fs: util.Fs,
chunkSize: mConfig.ChunkSize,
batchSize: mConfig.BatchSize,
}

rootContext, rootContextCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -232,7 +236,7 @@ func StartMigration() error {

migrationWorker := NewMigrationWorker(migration.workDir)
go migration.DownloadWorker(rootContext, migrationWorker)
go migration.UploadWorker(rootContext, migrationWorker)
// go migration.UploadWorker(rootContext, migrationWorker)
migration.UpdateStateFile(migrationWorker)
err := migrationWorker.GetMigrationError()
if err != nil {
Expand All @@ -247,21 +251,36 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorke
defer migrator.CloseDownloadQueue()
objCh, errCh := migration.awsStore.ListFilesInBucket(rootContext)
wg := &sync.WaitGroup{}
ops := make([]MigrationOperation, 0, m.batchSize)
var opLock sync.Mutex
currentSize := 0
opCtx, opCtxCancel := context.WithCancel(ctx)
for obj := range objCh {
zlogger.Logger.Info("Downloading object: ", obj.Key)
migrator.PauseDownload()
if migrator.IsMigrationError() {
opCtxCancel()
return
}
wg.Add(1)

if currentSize >= m.batchSize {
processOps := ops
// Here scope of improvement
wg.Wait()
m.processMultiOperation(opCtx, processOps, migrator)
opCtxCancel()
opCtx, opCtxCancel = context.WithCancel(ctx)
ops = nil
currentSize = 0
}
currentSize++
downloadObjMeta := &DownloadObjectMeta{
ObjectKey: obj.Key,
Size: obj.Size,
DoneChan: make(chan struct{}, 1),
ErrChan: make(chan error, 1),
mimeType: obj.ContentType,
}

wg.Add(1)
go func() {
defer wg.Done()
err := checkIsFileExist(ctx, downloadObjMeta)
Expand All @@ -276,22 +295,31 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorke
migrator.DownloadDone(downloadObjMeta, "", nil)
return
}
migrator.DownloadStart(downloadObjMeta)
zlogger.Logger.Info("download start", downloadObjMeta.ObjectKey, downloadObjMeta.Size)
downloadPath, err := m.awsStore.DownloadToFile(ctx, downloadObjMeta.ObjectKey)
migrator.DownloadDone(downloadObjMeta, downloadPath, err)
migrator.SetMigrationError(err)
zlogger.Logger.Info("download done", downloadObjMeta.ObjectKey, downloadObjMeta.Size, err)
dataChan := make(chan *util.DataChan, 200)
streamWriter := util.NewStreamWriter(dataChan)
go m.processChunkDownload(opCtx, streamWriter, migrator, downloadObjMeta)
// Always return nil as error
op, _ := processOperationForMemory(ctx, downloadObjMeta, streamWriter)
opLock.Lock()
ops = append(ops, op)
opLock.Unlock()
}()
time.Sleep(1 * time.Second)
}
if currentSize > 0 {
wg.Wait()
processOps := ops
// Here scope of improvement
m.processMultiOperation(ctx, processOps, migrator)
ops = nil
}
opCtxCancel()
wg.Wait()
err := <-errCh
if err != nil {
zlogger.Logger.Error(err)
migrator.SetMigrationError(err)
}

migrator.CloseUploadQueue()
}

func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) {
Expand All @@ -301,7 +329,7 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker)

downloadQueue := migrator.GetDownloadQueue()
wg := &sync.WaitGroup{}
ops := make([]MigrationOperation, 0, batchSize)
ops := make([]MigrationOperation, 0, m.batchSize)
totalSize := int64(0)
for d := range downloadQueue {
zlogger.Logger.Info("Uploading object: ", d.ObjectKey)
Expand Down Expand Up @@ -337,7 +365,7 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker)
op.uploadObj = uploadObj
ops = append(ops, op)
totalSize += downloadObj.Size
if len(ops) >= batchSize || totalSize >= maxBatchSize {
if len(ops) >= m.batchSize || totalSize >= maxBatchSize {
processOps := ops
ops = nil
wg.Add(1)
Expand Down Expand Up @@ -427,6 +455,35 @@ func processOperation(ctx context.Context, downloadObj *DownloadObjectMeta) (Mig
return op, nil
}

func processOperationForMemory(ctx context.Context, downloadObj *DownloadObjectMeta, r io.Reader) (MigrationOperation, error) {
remotePath := getRemotePath(downloadObj.ObjectKey)
var op MigrationOperation
mimeType := downloadObj.mimeType
var fileOperation sdk.OperationRequest
if downloadObj.IsFileAlreadyExist {
switch migration.skip {
case Replace:
zlogger.Logger.Info("Replacing object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10))
fileOperation = migration.zStore.Replace(ctx, remotePath, r, downloadObj.Size, mimeType)
case Duplicate:
zlogger.Logger.Info("Duplicating object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10))
fileOperation = migration.zStore.Duplicate(ctx, remotePath, r, downloadObj.Size, mimeType)
}
} else {
zlogger.Logger.Info("Uploading object: " + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10))
fileOperation = migration.zStore.Upload(ctx, remotePath, r, downloadObj.Size, mimeType, false)
}
op.Operation = fileOperation
op.uploadObj = &UploadObjectMeta{
ObjectKey: downloadObj.ObjectKey,
DoneChan: make(chan struct{}, 1),
ErrChan: make(chan error, 1),
Size: downloadObj.Size,
LocalPath: downloadObj.LocalPath,
}
return op, nil
}

func processUpload(ctx context.Context, ops []sdk.OperationRequest) error {

err := migration.zStore.MultiUpload(ctx, ops)
Expand Down Expand Up @@ -467,7 +524,7 @@ func (m *Migration) UpdateStateFile(migrateHandler *MigrationWorker) {
}
}

func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOperation, migrator *MigrationWorker) {
func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOperation, migrator *MigrationWorker) error {
var err error
defer func() {
for _, op := range ops {
Expand Down Expand Up @@ -509,4 +566,44 @@ func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOp
zlogger.Logger.Info("upload done: ", op.uploadObj.ObjectKey, "size ", op.uploadObj.Size, err)
}
migrator.SetMigrationError(err)
return err
}

func (m *Migration) processChunkDownload(ctx context.Context, sw *util.StreamWriter, migrator *MigrationWorker, downloadObjMeta *DownloadObjectMeta) {
// chunk download and pipe data

migrator.DownloadStart(downloadObjMeta)
offset := 0
chunkSize := int(m.chunkSize)
acceptedChunkSize := int(m.zStore.GetChunkWriteSize())
defer close(sw.DataChan)
for {
select {
case <-ctx.Done():
return
default:
}
data, err := m.awsStore.DownloadToMemory(ctx, downloadObjMeta.ObjectKey, int64(offset), int64(chunkSize))
if err != nil {
migrator.DownloadDone(downloadObjMeta, "", err)
ctx.Err()
return
}
if len(data) > 0 {
current := 0
for ; current < len(data); current += acceptedChunkSize {
high := current + acceptedChunkSize
if high > len(data) {
high = len(data)
}
sw.Write(data[current:high])
}
}
offset += chunkSize
// End of file
if len(data) < chunkSize {
break
}
}
migrator.DownloadDone(downloadObjMeta, "", nil)
}
3 changes: 3 additions & 0 deletions migration/migrateConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ type MigrationConfig struct {
StartAfter string
StateFilePath string
WorkDir string
ChunkSize int64
ChunkNumber int
BatchSize int
}
1 change: 1 addition & 0 deletions migration/migration_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type DownloadObjectMeta struct {
DoneChan chan struct{}
ErrChan chan error
IsFileAlreadyExist bool
mimeType string
}

type UploadObjectMeta struct {
Expand Down
Loading
Loading