Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add logs for debugging. #1617

Draft
wants to merge 3 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,9 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest, opts ...Mul
mo.operations = append(mo.operations, operation)
}

logger.Logger.Info("Multioperation: processing batch", zap.Int("batch_size", len(mo.operations)))
if len(mo.operations) > 0 {
logger.Logger.Info("Multioperation: processing batch", zap.Int("batch_size", len(mo.operations)))
err := mo.Process()
if err != nil {
return err
Expand Down
27 changes: 26 additions & 1 deletion zboxcore/sdk/chunked_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,26 +652,36 @@ func getShardSize(dataSize int64, dataShards int, isEncrypted bool) int64 {
}

func (su *ChunkedUpload) uploadProcessor() {
logger.Logger.Info("************************************Starting uploadProcessor")
for {
logger.Logger.Info("************************************Waiting for upload data")
select {
case <-su.ctx.Done():
logger.Logger.Info("************************************Context done, cause: ", context.Cause(su.ctx))
return
case uploadData, ok := <-su.uploadChan:
logger.Logger.Info("************************************Received upload data: ", uploadData)
if !ok {
logger.Logger.Info("************************************Upload channel closed")
return
}
logger.Logger.Info("************************************Calling uploadToBlobbers with uploadData: ", uploadData)
su.uploadToBlobbers(uploadData) //nolint:errcheck
logger.Logger.Info("************************************uploadToBlobbers completed")
su.uploadWG.Done()
}
}
}

func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error {
logger.Logger.Info("************************************Starting uploadToBlobbers with uploadData: ", uploadData)
select {
case <-su.ctx.Done():
logger.Logger.Error("************************************Context done, cause: ", context.Cause(su.ctx))
return context.Cause(su.ctx)
default:
}
logger.Logger.Info("************************************Creating Consensus object with consensusThresh: ", su.consensus.consensusThresh, " and fullconsensus: ", su.consensus.fullconsensus)
consensus := Consensus{
RWMutex: &sync.RWMutex{},
consensusThresh: su.consensus.consensusThresh,
Expand All @@ -684,34 +694,47 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error {
var pos uint64
var errCount int32
var wg sync.WaitGroup
logger.Logger.Info("******************************Starting loop over uploadMask: ", su.uploadMask)
for i := su.uploadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) {
pos = uint64(i.TrailingZeros())
pos = uint64(i.TrailingZeros()) // !(15 & (1<<3))

logger.Logger.Info("***********Chunk number: ", uploadData.chunkStartIndex, " Blobber number: ", pos, "***********")
wg.Add(1)
go func(pos uint64) {
defer wg.Done()
logger.Logger.Info("****************Starting goroutine for blobber at position: ", pos)
err := su.blobbers[pos].sendUploadRequest(ctx, su, uploadData.isFinal, su.encryptedKey, uploadData.uploadBody[pos].dataBuffers, uploadData.uploadBody[pos].formData, uploadData.uploadBody[pos].contentSlice, pos, &consensus)

if err != nil {
if strings.Contains(err.Error(), "duplicate") {
logger.Logger.Fatal("**************************Duplicate upload detected for blobber at position: ", pos)
su.consensus.Done()
errC := atomic.AddInt32(&su.addConsensus, 1)
logger.Logger.Debug("****************************Consensus count after duplicate detection: ", errC)
if errC >= int32(su.consensus.consensusThresh) {
wgErrors <- err
}
return
}
logger.Logger.Error("error during sendUploadRequest", err, " connectionID: ", su.progress.ConnectionID)
errC := atomic.AddInt32(&errCount, 1)
logger.Logger.Info("***********************************Error count after failure: ", errC)
if errC > int32(su.allocationObj.ParityShards-1) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later
logger.Logger.Error("************************************Error count exceeded for blobber at position: ", pos)
wgErrors <- err
}
}
logger.Logger.Info("****************************************Goroutine for blobber at position: ", pos, " completed")
}(pos)
}
wg.Wait()
logger.Logger.Info("***************************Loop over uploadMask completed")
close(wgErrors)
logger.Logger.Info("***************************Closed wgErrors")
for err := range wgErrors {
logger.Logger.Error("Upload failed with error: ", err)
su.ctxCncl(thrown.New("upload_failed", fmt.Sprintf("Upload failed. %s", err)))
logger.Logger.Info("***************************Returning error")
return err
}
if !consensus.isConsensusOk() {
Expand All @@ -723,11 +746,13 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error {
if uploadData.uploadLength > 0 {
index := uploadData.chunkEndIndex
uploadLength := uploadData.uploadLength
logger.Logger.Info("**********************************Updating progress for chunk index: ", index, " upload length: ", uploadLength)
go su.updateProgress(index, su.uploadMask)
if su.statusCallback != nil {
su.statusCallback.InProgress(su.allocationObj.ID, su.fileMeta.RemotePath, su.opCode, int(atomic.AddInt64(&su.progress.UploadLength, uploadLength)), nil)
}
}
uploadData = UploadData{} // release memory
logger.Logger.Info("Upload to blobbers completed successfully")
return nil
}
4 changes: 4 additions & 0 deletions zboxcore/sdk/chunked_upload_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package sdk
import (
"context"
"fmt"
"github.com/0chain/gosdk/zboxcore/logger"
"go.uber.org/zap"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -163,7 +165,9 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int,
}

func (su *ChunkedUpload) startProcessor() {
logger.Logger.Debug("Starting upload processor", zap.Int("workers", su.uploadWorkers))
for i := 0; i < su.uploadWorkers; i++ {
logger.Logger.Debug("Starting upload processor", zap.Int("worker", i))
go su.uploadProcessor()
}
}
24 changes: 16 additions & 8 deletions zboxcore/sdk/multi_operation_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"go.uber.org/zap"
"io/ioutil"
"mime/multipart"
"net/http"
Expand Down Expand Up @@ -177,17 +178,20 @@ func (mo *MultiOperation) Process() error {
// Check for other goroutines signal
select {
case <-ctx.Done():
l.Logger.Debug("Context done, exiting goroutine")
return
default:
}

l.Logger.Debug("Processing operation", zap.Int("index", idx))
refs, mask, err := op.Process(mo.allocationObj, mo.connectionID) // Process with each blobber
if err != nil {
l.Logger.Error(err)
l.Logger.Error("Operation process failed", zap.Int("index", idx), zap.Error(err))
errsSlice[idx] = errors.New("", err.Error())
ctxCncl(err)
return
}
l.Logger.Debug("Operation processed successfully", zap.Int("index", idx))
mo.maskMU.Lock()
mo.operationMask = mo.operationMask.Or(mask)
mo.maskMU.Unlock()
Expand All @@ -199,13 +203,15 @@ func (mo *MultiOperation) Process() error {

if ctx.Err() != nil {
err := context.Cause(ctx)
l.Logger.Error("Context error", zap.Error(err))
return err
}

// Check consensus
if mo.operationMask.CountOnes() < mo.consensusThresh {
majorErr := zboxutil.MajorError(errsSlice)
if majorErr != nil {
l.Logger.Error("Consensus not met", zap.Int("required", mo.consensusThresh), zap.Int("got", mo.operationMask.CountOnes()), zap.Error(majorErr))
return errors.New("consensus_not_met",
fmt.Sprintf("Multioperation failed. Required consensus %d got %d. Major error: %s",
mo.consensusThresh, mo.operationMask.CountOnes(), majorErr.Error()))
Expand All @@ -219,19 +225,22 @@ func (mo *MultiOperation) Process() error {
// blobber 2 and so on.
start := time.Now()
mo.changes = zboxutil.Transpose(mo.changes)
l.Logger.Debug("Transposed changes", zap.Duration("duration", time.Since(start)))

writeMarkerMutex, err := CreateWriteMarkerMutex(client.GetClient(), mo.allocationObj)
if err != nil {
l.Logger.Error("Failed to create write marker mutex", zap.Error(err))
return fmt.Errorf("Operation failed: %s", err.Error())
}

l.Logger.Debug("Trying to lock write marker.....")
l.Logger.Debug("Trying to lock write marker")
if singleClientMode {
mo.allocationObj.commitMutex.Lock()
} else {
err = writeMarkerMutex.Lock(mo.ctx, &mo.operationMask, mo.maskMU,
mo.allocationObj.Blobbers, &mo.Consensus, 0, time.Minute, mo.connectionID)
if err != nil {
l.Logger.Error("Failed to lock write marker", zap.Error(err))
return fmt.Errorf("Operation failed: %s", err.Error())
}
}
Expand All @@ -241,7 +250,7 @@ func (mo *MultiOperation) Process() error {
if !mo.isRepair && !mo.allocationObj.checkStatus {
status, _, err = mo.allocationObj.CheckAllocStatus()
if err != nil {
logger.Logger.Error("Error checking allocation status", err)
logger.Logger.Error("Error checking allocation status", zap.Error(err))
if singleClientMode {
mo.allocationObj.commitMutex.Unlock()
} else {
Expand Down Expand Up @@ -297,7 +306,7 @@ func (mo *MultiOperation) Process() error {

commitReq.changes = append(commitReq.changes, mo.changes[pos]...)
commitReqs[counter] = commitReq
l.Logger.Debug("Commit request sending to blobber ", commitReq.blobber.Baseurl)
l.Logger.Debug("Commit request sending to blobber", zap.String("blobber", commitReq.blobber.Baseurl))
go AddCommitRequest(commitReq)
counter++
}
Expand All @@ -308,17 +317,17 @@ func (mo *MultiOperation) Process() error {
for idx, commitReq := range commitReqs {
if commitReq.result != nil {
if commitReq.result.Success {
l.Logger.Debug("Commit success", commitReq.blobber.Baseurl)
l.Logger.Debug("Commit success", zap.String("blobber", commitReq.blobber.Baseurl))
if !mo.isRepair {
rollbackMask = rollbackMask.Or(zboxutil.NewUint128(1).Lsh(commitReq.blobberInd))
}
mo.consensus++
} else {
errSlice[idx] = errors.New("commit_failed", commitReq.result.ErrorMessage)
l.Logger.Error("Commit failed", commitReq.blobber.Baseurl, commitReq.result.ErrorMessage)
l.Logger.Error("Commit failed", zap.String("blobber", commitReq.blobber.Baseurl), zap.String("error", commitReq.result.ErrorMessage))
}
} else {
l.Logger.Debug("Commit result not set", commitReq.blobber.Baseurl)
l.Logger.Debug("Commit result not set", zap.String("blobber", commitReq.blobber.Baseurl))
}
}

Expand All @@ -339,5 +348,4 @@ func (mo *MultiOperation) Process() error {
}

return nil

}
9 changes: 9 additions & 0 deletions zboxcore/sdk/upload_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,30 @@ func (uo *UploadOperation) Process(allocObj *Allocation, connectionID string) ([
}
}
err := uo.chunkedUpload.process()
l.Logger.Info("Started processing chunked upload")
if err != nil {
l.Logger.Error("UploadOperation Failed", zap.String("name", uo.chunkedUpload.fileMeta.RemoteName), zap.Error(err))
return nil, uo.chunkedUpload.uploadMask, err
}
l.Logger.Info("Chunked upload processed successfully")

var pos uint64
numList := len(uo.chunkedUpload.blobbers)
uo.refs = make([]*fileref.FileRef, numList)
l.Logger.Info("Processing blobbers", zap.Int("numList", numList))
for i := uo.chunkedUpload.uploadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) {
pos = uint64(i.TrailingZeros())
l.Logger.Info("Processing blobber", zap.Uint64("position", pos))
uo.refs[pos] = uo.chunkedUpload.blobbers[pos].fileRef
uo.refs[pos].ChunkSize = uo.chunkedUpload.chunkSize
remotePath := uo.chunkedUpload.fileMeta.RemotePath
allocationID := allocObj.ID
l.Logger.Info("Blobber details", zap.String("remotePath", remotePath), zap.String("allocationID", allocationID))
if singleClientMode {
lookuphash := fileref.GetReferenceLookup(allocationID, remotePath)
cacheKey := fileref.GetCacheKey(lookuphash, uo.chunkedUpload.blobbers[pos].blobber.ID)
fileref.DeleteFileRef(cacheKey)
l.Logger.Info("Deleted file reference from cache", zap.String("cacheKey", cacheKey))
}
}
l.Logger.Info("UploadOperation Success", zap.String("name", uo.chunkedUpload.fileMeta.RemoteName))
Expand Down Expand Up @@ -122,6 +129,7 @@ func (uo *UploadOperation) Error(allocObj *Allocation, consensus int, err error)

func NewUploadOperation(ctx context.Context, workdir string, allocObj *Allocation, connectionID string, fileMeta FileMeta, fileReader io.Reader, isUpdate, isWebstreaming, isRepair, isMemoryDownload, isStreamUpload bool, opts ...ChunkedUploadOption) (*UploadOperation, string, error) {
uo := &UploadOperation{}
l.Logger.Info("Creating chunked upload", zap.String("remotePath", fileMeta.RemotePath))
if fileMeta.ActualSize == 0 && !isStreamUpload {
byteReader := bytes.NewReader([]byte(
emptyFileDataHash))
Expand All @@ -132,6 +140,7 @@ func NewUploadOperation(ctx context.Context, workdir string, allocObj *Allocatio

cu, err := CreateChunkedUpload(ctx, workdir, allocObj, fileMeta, fileReader, isUpdate, isRepair, isWebstreaming, connectionID, opts...)
if err != nil {
l.Logger.Error("CreateChunkedUpload Failed", zap.Error(err))
return nil, "", err
}

Expand Down
Loading