Skip to content

Commit

Permalink
Merge pull request #120 from 0chain/fix/total-count
Browse files Browse the repository at this point in the history
fix total count
  • Loading branch information
dabasov authored Dec 29, 2023
2 parents 2f9273d + 220055b commit aba2629
Showing 1 changed file with 7 additions and 22 deletions.
29 changes: 7 additions & 22 deletions migration/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,29 +85,16 @@ type MigrationOperation struct {
uploadObj *UploadObjectMeta
}

func updateTotalObjects(awsStorageService *s3.AwsClient, wd string) error {
func updateTotalObjects(totalObjChan chan struct{}, wd string) error {
f, err := os.Create(filepath.Join(wd, "files.total"))
if err != nil {
return err
}
defer f.Close()

var totalFiles int
objCh, errCh := awsStorageService.ListFilesInBucket(context.Background())

L1:
for {
select {
case _, ok := <-objCh:
if !ok {
break L1
}
totalFiles++
case err = <-errCh:
if err != nil {
return err
}
}
for range totalObjChan {
totalFiles++
}

_, err = f.WriteString(strconv.Itoa(totalFiles))
Expand Down Expand Up @@ -147,12 +134,6 @@ func InitMigration(mConfig *MigrationConfig) error {
return err
}

err = updateTotalObjects(awsStorageService, mConfig.WorkDir)
if err != nil {
zlogger.Logger.Error(err)
return err
}

migration = Migration{
zStore: dStorageService,
awsStore: awsStorageService,
Expand Down Expand Up @@ -262,6 +243,9 @@ func StartMigration() error {

func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorker) {
defer migrator.CloseDownloadQueue()
totalObjChan := make(chan struct{}, 100)
defer close(totalObjChan)
go updateTotalObjects(totalObjChan, m.workDir)
objCh, errCh := migration.awsStore.ListFilesInBucket(rootContext)
wg := &sync.WaitGroup{}
ops := make([]MigrationOperation, 0, m.batchSize)
Expand Down Expand Up @@ -315,6 +299,7 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorke
streamWriter := util.NewStreamWriter(dataChan)
go m.processChunkDownload(opCtx, streamWriter, migrator, downloadObjMeta)
// Always return nil as error
totalObjChan <- struct{}{}
op, _ := processOperationForMemory(ctx, downloadObjMeta, streamWriter)
opLock.Lock()
ops = append(ops, op)
Expand Down

0 comments on commit aba2629

Please sign in to comment.