diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index bbbb13277..ca3ff84a8 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -1122,7 +1122,9 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest, opts ...Mul mo.operations = append(mo.operations, operation) } + logger.Logger.Info("Multioperation: processing batch", zap.Int("batch_size", len(mo.operations))) if len(mo.operations) > 0 { + logger.Logger.Info("Multioperation: processing batch", zap.Int("batch_size", len(mo.operations))) err := mo.Process() if err != nil { return err diff --git a/zboxcore/sdk/chunked_upload.go b/zboxcore/sdk/chunked_upload.go index 29d31a8f3..d37be65c2 100644 --- a/zboxcore/sdk/chunked_upload.go +++ b/zboxcore/sdk/chunked_upload.go @@ -652,26 +652,36 @@ func getShardSize(dataSize int64, dataShards int, isEncrypted bool) int64 { } func (su *ChunkedUpload) uploadProcessor() { + logger.Logger.Info("************************************Starting uploadProcessor") for { + logger.Logger.Info("************************************Waiting for upload data") select { case <-su.ctx.Done(): + logger.Logger.Info("************************************Context done, cause: ", context.Cause(su.ctx)) return case uploadData, ok := <-su.uploadChan: + logger.Logger.Info("************************************Received upload data: ", uploadData) if !ok { + logger.Logger.Info("************************************Upload channel closed") return } + logger.Logger.Info("************************************Calling uploadToBlobbers with uploadData: ", uploadData) su.uploadToBlobbers(uploadData) //nolint:errcheck + logger.Logger.Info("************************************uploadToBlobbers completed") su.uploadWG.Done() } } } func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error { + logger.Logger.Info("************************************Starting uploadToBlobbers with uploadData: ", uploadData) select { case <-su.ctx.Done(): + logger.Logger.Error("************************************Context done, cause: ", context.Cause(su.ctx)) return context.Cause(su.ctx) default: } + logger.Logger.Info("************************************Creating Consensus object with consensusThresh: ", su.consensus.consensusThresh, " and fullconsensus: ", su.consensus.fullconsensus) consensus := Consensus{ RWMutex: &sync.RWMutex{}, consensusThresh: su.consensus.consensusThresh, @@ -684,17 +694,23 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error { var pos uint64 var errCount int32 var wg sync.WaitGroup + logger.Logger.Info("******************************Starting loop over uploadMask: ", su.uploadMask) for i := su.uploadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) { - pos = uint64(i.TrailingZeros()) + pos = uint64(i.TrailingZeros()) // !(15 & (1<<3)) + + logger.Logger.Info("***********Chunk number: ", uploadData.chunkStartIndex, " Blobber number: ", pos, "***********") wg.Add(1) go func(pos uint64) { defer wg.Done() + logger.Logger.Info("****************Starting goroutine for blobber at position: ", pos) err := su.blobbers[pos].sendUploadRequest(ctx, su, uploadData.isFinal, su.encryptedKey, uploadData.uploadBody[pos].dataBuffers, uploadData.uploadBody[pos].formData, uploadData.uploadBody[pos].contentSlice, pos, &consensus) if err != nil { if strings.Contains(err.Error(), "duplicate") { + logger.Logger.Fatal("**************************Duplicate upload detected for blobber at position: ", pos) su.consensus.Done() errC := atomic.AddInt32(&su.addConsensus, 1) + logger.Logger.Debug("****************************Consensus count after duplicate detection: ", errC) if errC >= int32(su.consensus.consensusThresh) { wgErrors <- err } @@ -702,16 +718,23 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error { } logger.Logger.Error("error during sendUploadRequest", err, " connectionID: ", su.progress.ConnectionID) errC := atomic.AddInt32(&errCount, 1) + logger.Logger.Info("***********************************Error count after failure: ", errC) if errC > int32(su.allocationObj.ParityShards-1) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later + logger.Logger.Error("************************************Error count exceeded for blobber at position: ", pos) wgErrors <- err } } + logger.Logger.Info("****************************************Goroutine for blobber at position: ", pos, " completed") }(pos) } wg.Wait() + logger.Logger.Info("***************************Loop over uploadMask completed") close(wgErrors) + logger.Logger.Info("***************************Closed wgErrors") for err := range wgErrors { + logger.Logger.Error("Upload failed with error: ", err) su.ctxCncl(thrown.New("upload_failed", fmt.Sprintf("Upload failed. %s", err))) + logger.Logger.Info("***************************Returning error") return err } if !consensus.isConsensusOk() { @@ -723,11 +746,13 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error { if uploadData.uploadLength > 0 { index := uploadData.chunkEndIndex uploadLength := uploadData.uploadLength + logger.Logger.Info("**********************************Updating progress for chunk index: ", index, " upload length: ", uploadLength) go su.updateProgress(index, su.uploadMask) if su.statusCallback != nil { su.statusCallback.InProgress(su.allocationObj.ID, su.fileMeta.RemotePath, su.opCode, int(atomic.AddInt64(&su.progress.UploadLength, uploadLength)), nil) } } uploadData = UploadData{} // release memory + logger.Logger.Info("Upload to blobbers completed successfully") return nil } diff --git a/zboxcore/sdk/chunked_upload_process.go b/zboxcore/sdk/chunked_upload_process.go index da58f19de..f988aa458 100644 --- a/zboxcore/sdk/chunked_upload_process.go +++ b/zboxcore/sdk/chunked_upload_process.go @@ -6,6 +6,8 @@ package sdk import ( "context" "fmt" + "github.com/0chain/gosdk/zboxcore/logger" + "go.uber.org/zap" "sync" "sync/atomic" @@ -163,7 +165,9 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int, } func (su *ChunkedUpload) startProcessor() { + logger.Logger.Debug("Starting upload processor", zap.Int("workers", su.uploadWorkers)) for i := 0; i < su.uploadWorkers; i++ { + logger.Logger.Debug("Starting upload processor", zap.Int("worker", i)) go su.uploadProcessor() } } diff --git a/zboxcore/sdk/multi_operation_worker.go b/zboxcore/sdk/multi_operation_worker.go index 7361508c6..41b18795d 100644 --- a/zboxcore/sdk/multi_operation_worker.go +++ b/zboxcore/sdk/multi_operation_worker.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "go.uber.org/zap" "io/ioutil" "mime/multipart" "net/http" @@ -177,17 +178,20 @@ func (mo *MultiOperation) Process() error { // Check for other goroutines signal select { case <-ctx.Done(): + l.Logger.Debug("Context done, exiting goroutine") return default: } + l.Logger.Debug("Processing operation", zap.Int("index", idx)) refs, mask, err := op.Process(mo.allocationObj, mo.connectionID) // Process with each blobber if err != nil { - l.Logger.Error(err) + l.Logger.Error("Operation process failed", zap.Int("index", idx), zap.Error(err)) errsSlice[idx] = errors.New("", err.Error()) ctxCncl(err) return } + l.Logger.Debug("Operation processed successfully", zap.Int("index", idx)) mo.maskMU.Lock() mo.operationMask = mo.operationMask.Or(mask) mo.maskMU.Unlock() @@ -199,6 +203,7 @@ func (mo *MultiOperation) Process() error { if ctx.Err() != nil { err := context.Cause(ctx) + l.Logger.Error("Context error", zap.Error(err)) return err } @@ -206,6 +211,7 @@ func (mo *MultiOperation) Process() error { if mo.operationMask.CountOnes() < mo.consensusThresh { majorErr := zboxutil.MajorError(errsSlice) if majorErr != nil { + l.Logger.Error("Consensus not met", zap.Int("required", mo.consensusThresh), zap.Int("got", mo.operationMask.CountOnes()), zap.Error(majorErr)) return errors.New("consensus_not_met", fmt.Sprintf("Multioperation failed. Required consensus %d got %d. Major error: %s", mo.consensusThresh, mo.operationMask.CountOnes(), majorErr.Error())) @@ -219,19 +225,22 @@ func (mo *MultiOperation) Process() error { // blobber 2 and so on. start := time.Now() mo.changes = zboxutil.Transpose(mo.changes) + l.Logger.Debug("Transposed changes", zap.Duration("duration", time.Since(start))) writeMarkerMutex, err := CreateWriteMarkerMutex(client.GetClient(), mo.allocationObj) if err != nil { + l.Logger.Error("Failed to create write marker mutex", zap.Error(err)) return fmt.Errorf("Operation failed: %s", err.Error()) } - l.Logger.Debug("Trying to lock write marker.....") + l.Logger.Debug("Trying to lock write marker") if singleClientMode { mo.allocationObj.commitMutex.Lock() } else { err = writeMarkerMutex.Lock(mo.ctx, &mo.operationMask, mo.maskMU, mo.allocationObj.Blobbers, &mo.Consensus, 0, time.Minute, mo.connectionID) if err != nil { + l.Logger.Error("Failed to lock write marker", zap.Error(err)) return fmt.Errorf("Operation failed: %s", err.Error()) } } @@ -241,7 +250,7 @@ func (mo *MultiOperation) Process() error { if !mo.isRepair && !mo.allocationObj.checkStatus { status, _, err = mo.allocationObj.CheckAllocStatus() if err != nil { - logger.Logger.Error("Error checking allocation status", err) + logger.Logger.Error("Error checking allocation status", zap.Error(err)) if singleClientMode { mo.allocationObj.commitMutex.Unlock() } else { @@ -297,7 +306,7 @@ func (mo *MultiOperation) Process() error { commitReq.changes = append(commitReq.changes, mo.changes[pos]...) commitReqs[counter] = commitReq - l.Logger.Debug("Commit request sending to blobber ", commitReq.blobber.Baseurl) + l.Logger.Debug("Commit request sending to blobber", zap.String("blobber", commitReq.blobber.Baseurl)) go AddCommitRequest(commitReq) counter++ } @@ -308,17 +317,17 @@ func (mo *MultiOperation) Process() error { for idx, commitReq := range commitReqs { if commitReq.result != nil { if commitReq.result.Success { - l.Logger.Debug("Commit success", commitReq.blobber.Baseurl) + l.Logger.Debug("Commit success", zap.String("blobber", commitReq.blobber.Baseurl)) if !mo.isRepair { rollbackMask = rollbackMask.Or(zboxutil.NewUint128(1).Lsh(commitReq.blobberInd)) } mo.consensus++ } else { errSlice[idx] = errors.New("commit_failed", commitReq.result.ErrorMessage) - l.Logger.Error("Commit failed", commitReq.blobber.Baseurl, commitReq.result.ErrorMessage) + l.Logger.Error("Commit failed", zap.String("blobber", commitReq.blobber.Baseurl), zap.String("error", commitReq.result.ErrorMessage)) } } else { - l.Logger.Debug("Commit result not set", commitReq.blobber.Baseurl) + l.Logger.Debug("Commit result not set", zap.String("blobber", commitReq.blobber.Baseurl)) } } @@ -339,5 +348,4 @@ func (mo *MultiOperation) Process() error { } return nil - } diff --git a/zboxcore/sdk/upload_worker.go b/zboxcore/sdk/upload_worker.go index 76021f904..2f532c6f4 100644 --- a/zboxcore/sdk/upload_worker.go +++ b/zboxcore/sdk/upload_worker.go @@ -40,23 +40,30 @@ func (uo *UploadOperation) Process(allocObj *Allocation, connectionID string) ([ } } err := uo.chunkedUpload.process() + l.Logger.Info("Started processing chunked upload") if err != nil { l.Logger.Error("UploadOperation Failed", zap.String("name", uo.chunkedUpload.fileMeta.RemoteName), zap.Error(err)) return nil, uo.chunkedUpload.uploadMask, err } + l.Logger.Info("Chunked upload processed successfully") + var pos uint64 numList := len(uo.chunkedUpload.blobbers) uo.refs = make([]*fileref.FileRef, numList) + l.Logger.Info("Processing blobbers", zap.Int("numList", numList)) for i := uo.chunkedUpload.uploadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) { pos = uint64(i.TrailingZeros()) + l.Logger.Info("Processing blobber", zap.Uint64("position", pos)) uo.refs[pos] = uo.chunkedUpload.blobbers[pos].fileRef uo.refs[pos].ChunkSize = uo.chunkedUpload.chunkSize remotePath := uo.chunkedUpload.fileMeta.RemotePath allocationID := allocObj.ID + l.Logger.Info("Blobber details", zap.String("remotePath", remotePath), zap.String("allocationID", allocationID)) if singleClientMode { lookuphash := fileref.GetReferenceLookup(allocationID, remotePath) cacheKey := fileref.GetCacheKey(lookuphash, uo.chunkedUpload.blobbers[pos].blobber.ID) fileref.DeleteFileRef(cacheKey) + l.Logger.Info("Deleted file reference from cache", zap.String("cacheKey", cacheKey)) } } l.Logger.Info("UploadOperation Success", zap.String("name", uo.chunkedUpload.fileMeta.RemoteName)) @@ -122,6 +129,7 @@ func (uo *UploadOperation) Error(allocObj *Allocation, consensus int, err error) func NewUploadOperation(ctx context.Context, workdir string, allocObj *Allocation, connectionID string, fileMeta FileMeta, fileReader io.Reader, isUpdate, isWebstreaming, isRepair, isMemoryDownload, isStreamUpload bool, opts ...ChunkedUploadOption) (*UploadOperation, string, error) { uo := &UploadOperation{} + l.Logger.Info("Creating chunked upload", zap.String("remotePath", fileMeta.RemotePath)) if fileMeta.ActualSize == 0 && !isStreamUpload { byteReader := bytes.NewReader([]byte( emptyFileDataHash)) @@ -132,6 +140,7 @@ func NewUploadOperation(ctx context.Context, workdir string, allocObj *Allocatio cu, err := CreateChunkedUpload(ctx, workdir, allocObj, fileMeta, fileReader, isUpdate, isRepair, isWebstreaming, connectionID, opts...) if err != nil { + l.Logger.Error("CreateChunkedUpload Failed", zap.Error(err)) return nil, "", err }