From 525c4227803724fa96f9baf8691feda38d6d8902 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Thu, 25 Apr 2024 20:30:58 +0530 Subject: [PATCH 1/6] init read free --- zboxcore/sdk/allocation.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index e01b45a89..5e2856d2e 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -213,6 +213,7 @@ type Allocation struct { repairRequestInProgress *RepairRequest initialized bool checkStatus bool + readFree bool // conseususes consensusThreshold int fullconsensus int @@ -315,6 +316,15 @@ func (a *Allocation) InitAllocation() { for _, blobber := range a.Blobbers { zboxutil.SetHostClient(blobber.ID, blobber.Baseurl) } + a.readFree = true + if a.ReadPriceRange.Max > 0 { + for _, blobberDetail := range a.BlobberDetails { + if blobberDetail.Terms.ReadPrice > 0 { + a.readFree = false + break + } + } + } a.startWorker(a.ctx) InitCommitWorker(a.Blobbers) InitBlockDownloader(a.Blobbers, downloadWorkerCount) @@ -1169,17 +1179,13 @@ func (a *Allocation) processReadMarker(drs []*DownloadRequest) { blobberMap := make(map[uint64]int64) mpLock := sync.Mutex{} wg := sync.WaitGroup{} - var isReadFree bool - if a.ReadPriceRange.Max == 0 && a.ReadPriceRange.Min == 0 { - isReadFree = true - } now := time.Now() for _, dr := range drs { wg.Add(1) go func(dr *DownloadRequest) { defer wg.Done() - if isReadFree { + if a.readFree { dr.freeRead = true } dr.processDownloadRequest() @@ -1198,7 +1204,7 @@ func (a *Allocation) processReadMarker(drs []*DownloadRequest) { elapsedProcessDownloadRequest := time.Since(now) // Do not send readmarkers for free reads - if isReadFree { + if a.readFree { for _, dr := range drs { if dr.skip { continue From bcb2031fed0b0055910cebcf5db6cb592840ed99 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Wed, 1 May 2024 23:50:53 +0530 Subject: [PATCH 2/6] Reset allocation stats --- core/transaction/entity.go | 9 +++++---- zboxcore/sdk/sdk.go | 13 +++++++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/transaction/entity.go b/core/transaction/entity.go index 002493fb0..1841e95bc 100644 --- a/core/transaction/entity.go +++ b/core/transaction/entity.go @@ -144,7 +144,8 @@ const ( STORAGESC_KILL_VALIDATOR = "kill_validator" STORAGESC_SHUTDOWN_BLOBBER = "shutdown_blobber" STORAGESC_SHUTDOWN_VALIDATOR = "shutdown_validator" - STORAGESC_RESET_BLOBBER_STATS = "reset_blobber_stats" + STORAGESC_RESET_BLOBBER_STATS = "reset_blobber_stats" + STORAGESC_RESET_ALLOCATION_STATS = "reset_allocation_stats" MINERSC_LOCK = "addToDelegatePool" MINERSC_UNLOCK = "deleteFromDelegatePool" @@ -168,9 +169,9 @@ const ( ZCNSC_ADD_AUTHORIZER = "add-authorizer" ZCNSC_AUTHORIZER_HEALTH_CHECK = "authorizer-health-check" ZCNSC_DELETE_AUTHORIZER = "delete-authorizer" - ZCNSC_COLLECT_REWARD = "collect-rewards" - ZCNSC_LOCK = "add-to-delegate-pool" - ZCNSC_UNLOCK = "delete-from-delegate-pool" + ZCNSC_COLLECT_REWARD = "collect-rewards" + ZCNSC_LOCK = "add-to-delegate-pool" + ZCNSC_UNLOCK = "delete-from-delegate-pool" ESTIMATE_TRANSACTION_COST = `/v1/estimate_txn_fee` FEES_TABLE = `/v1/fees_table` diff --git a/zboxcore/sdk/sdk.go b/zboxcore/sdk/sdk.go index 8c4c86e6b..c9ae54051 100644 --- a/zboxcore/sdk/sdk.go +++ b/zboxcore/sdk/sdk.go @@ -1493,6 +1493,19 @@ func ResetBlobberStats(rbs *ResetBlobberStatsDto) (string, int64, error) { return hash, n, err } +func ResetAllocationStats(allocationId string) (string, int64, error) { + if !sdkInitialized { + return "", 0, sdkNotInitialized + } + + var sn = transaction.SmartContractTxnData{ + Name: transaction.STORAGESC_RESET_ALLOCATION_STATS, + InputArgs: map[string]interface{}{"allocation_id": allocationId}, + } + hash, _, n, _, err := storageSmartContractTxn(sn) + return hash, n, err +} + func smartContractTxn(scAddress string, sn transaction.SmartContractTxnData) ( hash, out string, nonce int64, txn *transaction.Transaction, err error) { return smartContractTxnValue(scAddress, sn, 0) From 6817fb4f089dca7d06e82fc9b6af269958c215ec Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Thu, 2 May 2024 00:20:29 +0530 Subject: [PATCH 3/6] Fix --- zboxcore/sdk/sdk.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zboxcore/sdk/sdk.go b/zboxcore/sdk/sdk.go index c9ae54051..1741eadb6 100644 --- a/zboxcore/sdk/sdk.go +++ b/zboxcore/sdk/sdk.go @@ -1500,7 +1500,7 @@ func ResetAllocationStats(allocationId string) (string, int64, error) { var sn = transaction.SmartContractTxnData{ Name: transaction.STORAGESC_RESET_ALLOCATION_STATS, - InputArgs: map[string]interface{}{"allocation_id": allocationId}, + InputArgs: allocationId, } hash, _, n, _, err := storageSmartContractTxn(sn) return hash, n, err From 8dff99b8ff12a1eed01002c8fab4d5aced24ff37 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Thu, 2 May 2024 01:15:36 +0530 Subject: [PATCH 4/6] buffer read and writes to js --- wasmsdk/blobber.go | 13 ++++-- wasmsdk/cache.go | 2 +- wasmsdk/jsbridge/file_reader.go | 62 +++++++++++++++++++------- wasmsdk/jsbridge/file_writer.go | 78 +++++++++++++++++++++------------ 4 files changed, 106 insertions(+), 49 deletions(-) diff --git a/wasmsdk/blobber.go b/wasmsdk/blobber.go index 031715717..0e7e52e02 100644 --- a/wasmsdk/blobber.go +++ b/wasmsdk/blobber.go @@ -631,8 +631,12 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) { wg.Add(1) encrypt := option.Encrypt remotePath := option.RemotePath - - fileReader := jsbridge.NewFileReader(option.ReadChunkFuncName, option.FileSize) + fileReader, err := jsbridge.NewFileReader(option.ReadChunkFuncName, option.FileSize, allocationObj.GetChunkReadSize(encrypt)) + if err != nil { + result.Error = "Error in file operation" + result.Success = false + return result, err + } mimeType := option.MimeType localPath := remotePath remotePath = zboxutil.RemoteClean(remotePath) @@ -725,7 +729,10 @@ func uploadWithJsFuncs(allocationID, remotePath string, readChunkFuncName string } wg.Add(1) - fileReader := jsbridge.NewFileReader(readChunkFuncName, fileSize) + fileReader, err := jsbridge.NewFileReader(readChunkFuncName, fileSize, allocationObj.GetChunkReadSize(encrypt)) + if err != nil { + return false, err + } localPath := remotePath diff --git a/wasmsdk/cache.go b/wasmsdk/cache.go index 57a324fb9..c90835254 100644 --- a/wasmsdk/cache.go +++ b/wasmsdk/cache.go @@ -33,7 +33,7 @@ func getAllocation(allocationId string) (*sdk.Allocation, error) { if err != nil { return nil, err } - + sdk.SetShouldVerifyHash(false) it = &cachedAllocation{ Allocation: a, Expiration: time.Now().Add(5 * time.Minute), diff --git a/wasmsdk/jsbridge/file_reader.go b/wasmsdk/jsbridge/file_reader.go index 33e8fc33f..22a88ad64 100644 --- a/wasmsdk/jsbridge/file_reader.go +++ b/wasmsdk/jsbridge/file_reader.go @@ -6,46 +6,74 @@ package jsbridge import ( "errors" "io" - "sync" "syscall/js" ) -var jsFileReaderMutex sync.Mutex - type FileReader struct { size int64 offset int64 readChunk js.Value + buf []byte + bufOffset int + endOfFile bool } -func NewFileReader(readChunkFuncName string, fileSize int64) *FileReader { - readChunk := js.Global().Get(readChunkFuncName) +const ( + bufferSize = 16 * 1024 * 1024 //16MB +) +func NewFileReader(readChunkFuncName string, fileSize, chunkReadSize int64) (*FileReader, error) { + readChunk := js.Global().Get(readChunkFuncName) + var buf []byte + if bufferSize > fileSize { + buf = make([]byte, fileSize) + } else { + bufSize := (chunkReadSize * (bufferSize / chunkReadSize)) + buf = make([]byte, bufSize) + } + result, err := Await(readChunk.Invoke(0, len(buf))) + if len(err) > 0 && !err[0].IsNull() { + return nil, errors.New("file_reader: " + err[0].String()) + } + chunk := result[0] + n := js.CopyBytesToGo(buf, chunk) + if n < len(buf) { + return nil, errors.New("file_reader: failed to read first chunk") + } return &FileReader{ size: fileSize, - offset: 0, + offset: int64(n), readChunk: readChunk, - } + buf: buf, + endOfFile: n == int(fileSize), + }, nil } func (r *FileReader) Read(p []byte) (int, error) { //js.Value doesn't work in parallel invoke - jsFileReaderMutex.Lock() - defer jsFileReaderMutex.Unlock() size := len(p) - result, err := Await(r.readChunk.Invoke(r.offset, size)) + if len(r.buf)-r.bufOffset < size && !r.endOfFile { + r.bufOffset = 0 //reset buffer offset + result, err := Await(r.readChunk.Invoke(r.offset, len(r.buf))) - if len(err) > 0 && !err[0].IsNull() { - return 0, errors.New("file_reader: " + err[0].String()) - } + if len(err) > 0 && !err[0].IsNull() { + return 0, errors.New("file_reader: " + err[0].String()) + } - chunk := result[0] + chunk := result[0] - n := js.CopyBytesToGo(p, chunk) - r.offset += int64(n) + n := js.CopyBytesToGo(r.buf, chunk) + r.offset += int64(n) + if n < len(r.buf) { + r.buf = r.buf[:n] + r.endOfFile = true + } + } - if n < size { + n := copy(p, r.buf[r.bufOffset:]) + r.bufOffset += n + if r.endOfFile && r.bufOffset == len(r.buf) { return n, io.EOF } diff --git a/wasmsdk/jsbridge/file_writer.go b/wasmsdk/jsbridge/file_writer.go index 9f0d2d55d..50e6721af 100644 --- a/wasmsdk/jsbridge/file_writer.go +++ b/wasmsdk/jsbridge/file_writer.go @@ -5,53 +5,74 @@ package jsbridge import ( "errors" + "io" "io/fs" - "sync" "syscall/js" ) -var jsFileWriterMutex sync.Mutex - type FileWriter struct { writableStream js.Value uint8Array js.Value fileHandle js.Value bufLen int + buf []byte + bufWriteOffset int } -func (w *FileWriter) Write(p []byte) (int, error) { - //js.Value doesn't work in parallel invoke - jsFileWriterMutex.Lock() - defer jsFileWriterMutex.Unlock() +const writeBufferSize = 4 * 1024 * 1024 //4MB - if w.bufLen != len(p) { - w.bufLen = len(p) - w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen) +// len(p) will always be <= 64KB +func (w *FileWriter) Write(p []byte) (int, error) { + //copy bytes to buf + if w.bufWriteOffset+len(p) > len(w.buf) { + return 0, io.ErrShortWrite } - js.CopyBytesToJS(w.uint8Array, p) - _, err := Await(w.writableStream.Call("write", w.uint8Array)) - if len(err) > 0 && !err[0].IsNull() { - return 0, errors.New("file_writer: " + err[0].String()) + n := copy(w.buf[w.bufWriteOffset:], p) + w.bufWriteOffset += n + if w.bufWriteOffset == len(w.buf) { + //write to file + if w.bufLen != len(w.buf) { + w.bufLen = len(w.buf) + w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen) + } + js.CopyBytesToJS(w.uint8Array, w.buf) + _, err := Await(w.writableStream.Call("write", w.uint8Array)) + if len(err) > 0 && !err[0].IsNull() { + return 0, errors.New("file_writer: " + err[0].String()) + } + //reset buffer + w.bufWriteOffset = 0 } return len(p), nil } -func (w *FileWriter) WriteAt(p []byte, offset int64) (int, error) { - uint8Array := js.Global().Get("Uint8Array").New(len(p)) - js.CopyBytesToJS(uint8Array, p) - options := js.Global().Get("Object").New() - options.Set("type", "write") - options.Set("position", offset) - options.Set("data", uint8Array) - options.Set("size", len(p)) - _, err := Await(w.writableStream.Call("write", options)) - if len(err) > 0 && !err[0].IsNull() { - return 0, errors.New("file_writer: " + err[0].String()) - } - return len(p), nil -} +// func (w *FileWriter) WriteAt(p []byte, offset int64) (int, error) { +// uint8Array := js.Global().Get("Uint8Array").New(len(p)) +// js.CopyBytesToJS(uint8Array, p) +// options := js.Global().Get("Object").New() +// options.Set("type", "write") +// options.Set("position", offset) +// options.Set("data", uint8Array) +// options.Set("size", len(p)) +// _, err := Await(w.writableStream.Call("write", options)) +// if len(err) > 0 && !err[0].IsNull() { +// return 0, errors.New("file_writer: " + err[0].String()) +// } +// return len(p), nil +// } func (w *FileWriter) Close() error { + + if w.bufWriteOffset > 0 { + w.buf = w.buf[:w.bufWriteOffset] + uint8Array := js.Global().Get("Uint8Array").New(len(w.buf)) + js.CopyBytesToJS(uint8Array, w.buf) + _, err := Await(w.writableStream.Call("write", uint8Array)) + if len(err) > 0 && !err[0].IsNull() { + return errors.New("file_writer: " + err[0].String()) + } + } + _, err := Await(w.writableStream.Call("close")) if len(err) > 0 && !err[0].IsNull() { return errors.New("file_writer: " + err[0].String()) @@ -99,5 +120,6 @@ func NewFileWriter(filename string) (*FileWriter, error) { return &FileWriter{ writableStream: writableStream[0], fileHandle: fileHandle[0], + buf: make([]byte, writeBufferSize), }, nil } From b7108b42abbc4669074d18252121b4d658c13e71 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Thu, 2 May 2024 14:52:00 +0530 Subject: [PATCH 5/6] expose pause upload to wasm,mobile and winsdk --- mobilesdk/zbox/storage.go | 17 +++++++++++++++- wasmsdk/blobber.go | 11 +++++++++- wasmsdk/proxy.go | 1 + winsdk/storage.go | 42 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 69 insertions(+), 2 deletions(-) diff --git a/mobilesdk/zbox/storage.go b/mobilesdk/zbox/storage.go index 77dfb549d..cd464ba11 100644 --- a/mobilesdk/zbox/storage.go +++ b/mobilesdk/zbox/storage.go @@ -710,7 +710,7 @@ func CancelDownload(allocationID, remotepath string) error { // // ## Inputs // - allocationID -// - localPath +// - remotePath func CancelUpload(allocationID, remotePath string) error { a, err := getAllocation(allocationID) if err != nil { @@ -719,6 +719,21 @@ func CancelUpload(allocationID, remotePath string) error { return a.CancelUpload(remotePath) } +// PauseUpload - pause file upload + +// ## Inputs +// - allocationID +// - remotePath + +func PauseUpload(allocationID, remotePath string) error { + a, err := getAllocation(allocationID) + if err != nil { + return err + } + return a.PauseUpload(remotePath) + +} + // StartRepair - start repair files from path // // ## Inputs diff --git a/wasmsdk/blobber.go b/wasmsdk/blobber.go index 031715717..73731f6de 100644 --- a/wasmsdk/blobber.go +++ b/wasmsdk/blobber.go @@ -46,7 +46,7 @@ func listObjectsFromAuthTicket(allocationID, authTicket, lookupHash string, offs return alloc.ListDirFromAuthTicket(authTicket, lookupHash, sdk.WithListRequestOffset(offset), sdk.WithListRequestPageLimit(pageLimit)) } -func cancelUpload(allocationID string, remotePath string) error { +func cancelUpload(allocationID, remotePath string) error { allocationObj, err := getAllocation(allocationID) if err != nil { PrintError("Error fetching the allocation", err) @@ -55,6 +55,15 @@ func cancelUpload(allocationID string, remotePath string) error { return allocationObj.CancelUpload(remotePath) } +func pauseUpload(allocationID, remotePath string) error { + allocationObj, err := getAllocation(allocationID) + if err != nil { + PrintError("Error fetching the allocation", err) + return err + } + return allocationObj.PauseUpload(remotePath) +} + func createDir(allocationID, remotePath string) error { if len(allocationID) == 0 { return RequiredArg("allocationID") diff --git a/wasmsdk/proxy.go b/wasmsdk/proxy.go index fca2681f3..fa22d2a67 100644 --- a/wasmsdk/proxy.go +++ b/wasmsdk/proxy.go @@ -187,6 +187,7 @@ func main() { "updateForbidAllocation": UpdateForbidAllocation, "send": send, "cancelUpload": cancelUpload, + "pauseUpload": pauseUpload, // player "play": play, diff --git a/winsdk/storage.go b/winsdk/storage.go index fc5bcd5f2..85c2f3f17 100644 --- a/winsdk/storage.go +++ b/winsdk/storage.go @@ -740,3 +740,45 @@ func DeleteAuthTicket(allocationID, remotePath, refereeClientID *C.char) *C.char return WithJSON(true, nil) } + +func CancelUpload(allocationID, remotePath *C.char) *C.char { + defer func() { + if r := recover(); r != nil { + log.Error("win: crash ", r) + } + }() + alloc, err := getAllocation(C.GoString(allocationID)) + if err != nil { + log.Error("win: ", err) + return WithJSON(false, err) + } + + rPath := C.GoString(remotePath) + err = alloc.CancelUpload(rPath) + if err != nil { + log.Error("win: ", err) + return WithJSON(false, err) + } + return WithJSON(true, nil) +} + +func PauseUpload(allocationID, remotePath *C.char) *C.char { + defer func() { + if r := recover(); r != nil { + log.Error("win: crash ", r) + } + }() + alloc, err := getAllocation(C.GoString(allocationID)) + if err != nil { + log.Error("win: ", err) + return WithJSON(false, err) + } + + rPath := C.GoString(remotePath) + err = alloc.PauseUpload(rPath) + if err != nil { + log.Error("win: ", err) + return WithJSON(false, err) + } + return WithJSON(true, nil) +} From 63f09676190da4cd2fa6ed2d11d350a4ede3d86e Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sat, 4 May 2024 00:43:36 +0530 Subject: [PATCH 6/6] set max keep alive conn time and max resp body size --- zboxcore/zboxutil/http.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/zboxcore/zboxutil/http.go b/zboxcore/zboxutil/http.go index fa7cfd032..e52eb9156 100644 --- a/zboxcore/zboxutil/http.go +++ b/zboxcore/zboxutil/http.go @@ -198,8 +198,10 @@ func init() { Concurrency: 4096, DNSCacheDuration: time.Hour, }).Dial, - ReadTimeout: 120 * time.Second, - WriteTimeout: 120 * time.Second, + ReadTimeout: 120 * time.Second, + WriteTimeout: 120 * time.Second, + MaxConnDuration: 45 * time.Second, + MaxResponseBodySize: 1024 * 1024 * 64, //64MB } envProxy.initialize() log.Init(logger.DEBUG, "0box-sdk")