Skip to content

Commit

Permalink
Merge branch 'sprint-1.14' of https://github.com/0chain/gosdk into fi…
Browse files Browse the repository at this point in the history
…x/chunked-break
  • Loading branch information
Hitenjain14 committed May 6, 2024
2 parents c579e48 + f83a9d8 commit f8f3928
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 63 deletions.
9 changes: 5 additions & 4 deletions core/transaction/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ const (
STORAGESC_KILL_VALIDATOR = "kill_validator"
STORAGESC_SHUTDOWN_BLOBBER = "shutdown_blobber"
STORAGESC_SHUTDOWN_VALIDATOR = "shutdown_validator"
STORAGESC_RESET_BLOBBER_STATS = "reset_blobber_stats"
STORAGESC_RESET_BLOBBER_STATS = "reset_blobber_stats"
STORAGESC_RESET_ALLOCATION_STATS = "reset_allocation_stats"

MINERSC_LOCK = "addToDelegatePool"
MINERSC_UNLOCK = "deleteFromDelegatePool"
Expand All @@ -168,9 +169,9 @@ const (
ZCNSC_ADD_AUTHORIZER = "add-authorizer"
ZCNSC_AUTHORIZER_HEALTH_CHECK = "authorizer-health-check"
ZCNSC_DELETE_AUTHORIZER = "delete-authorizer"
ZCNSC_COLLECT_REWARD = "collect-rewards"
ZCNSC_LOCK = "add-to-delegate-pool"
ZCNSC_UNLOCK = "delete-from-delegate-pool"
ZCNSC_COLLECT_REWARD = "collect-rewards"
ZCNSC_LOCK = "add-to-delegate-pool"
ZCNSC_UNLOCK = "delete-from-delegate-pool"

ESTIMATE_TRANSACTION_COST = `/v1/estimate_txn_fee`
FEES_TABLE = `/v1/fees_table`
Expand Down
17 changes: 16 additions & 1 deletion mobilesdk/zbox/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ func CancelDownload(allocationID, remotepath string) error {
//
// ## Inputs
// - allocationID
// - localPath
// - remotePath
func CancelUpload(allocationID, remotePath string) error {
a, err := getAllocation(allocationID)
if err != nil {
Expand All @@ -719,6 +719,21 @@ func CancelUpload(allocationID, remotePath string) error {
return a.CancelUpload(remotePath)
}

// PauseUpload - pause file upload

// ## Inputs
// - allocationID
// - remotePath

func PauseUpload(allocationID, remotePath string) error {
a, err := getAllocation(allocationID)
if err != nil {
return err
}
return a.PauseUpload(remotePath)

}

// StartRepair - start repair files from path
//
// ## Inputs
Expand Down
24 changes: 20 additions & 4 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func listObjectsFromAuthTicket(allocationID, authTicket, lookupHash string, offs
return alloc.ListDirFromAuthTicket(authTicket, lookupHash, sdk.WithListRequestOffset(offset), sdk.WithListRequestPageLimit(pageLimit))
}

func cancelUpload(allocationID string, remotePath string) error {
func cancelUpload(allocationID, remotePath string) error {
allocationObj, err := getAllocation(allocationID)
if err != nil {
PrintError("Error fetching the allocation", err)
Expand All @@ -55,6 +55,15 @@ func cancelUpload(allocationID string, remotePath string) error {
return allocationObj.CancelUpload(remotePath)
}

func pauseUpload(allocationID, remotePath string) error {
allocationObj, err := getAllocation(allocationID)
if err != nil {
PrintError("Error fetching the allocation", err)
return err
}
return allocationObj.PauseUpload(remotePath)
}

func createDir(allocationID, remotePath string) error {
if len(allocationID) == 0 {
return RequiredArg("allocationID")
Expand Down Expand Up @@ -631,8 +640,12 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
wg.Add(1)
encrypt := option.Encrypt
remotePath := option.RemotePath

fileReader := jsbridge.NewFileReader(option.ReadChunkFuncName, option.FileSize)
fileReader, err := jsbridge.NewFileReader(option.ReadChunkFuncName, option.FileSize, allocationObj.GetChunkReadSize(encrypt))
if err != nil {
result.Error = "Error in file operation"
result.Success = false
return result, err
}
mimeType := option.MimeType
localPath := remotePath
remotePath = zboxutil.RemoteClean(remotePath)
Expand Down Expand Up @@ -725,7 +738,10 @@ func uploadWithJsFuncs(allocationID, remotePath string, readChunkFuncName string
}
wg.Add(1)

fileReader := jsbridge.NewFileReader(readChunkFuncName, fileSize)
fileReader, err := jsbridge.NewFileReader(readChunkFuncName, fileSize, allocationObj.GetChunkReadSize(encrypt))
if err != nil {
return false, err
}

localPath := remotePath

Expand Down
2 changes: 1 addition & 1 deletion wasmsdk/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func getAllocation(allocationId string) (*sdk.Allocation, error) {
if err != nil {
return nil, err
}

sdk.SetShouldVerifyHash(false)
it = &cachedAllocation{
Allocation: a,
Expiration: time.Now().Add(5 * time.Minute),
Expand Down
62 changes: 45 additions & 17 deletions wasmsdk/jsbridge/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,74 @@ package jsbridge
import (
"errors"
"io"
"sync"
"syscall/js"
)

var jsFileReaderMutex sync.Mutex

type FileReader struct {
size int64
offset int64
readChunk js.Value
buf []byte
bufOffset int
endOfFile bool
}

func NewFileReader(readChunkFuncName string, fileSize int64) *FileReader {
readChunk := js.Global().Get(readChunkFuncName)
const (
bufferSize = 16 * 1024 * 1024 //16MB
)

func NewFileReader(readChunkFuncName string, fileSize, chunkReadSize int64) (*FileReader, error) {
readChunk := js.Global().Get(readChunkFuncName)
var buf []byte
if bufferSize > fileSize {
buf = make([]byte, fileSize)
} else {
bufSize := (chunkReadSize * (bufferSize / chunkReadSize))
buf = make([]byte, bufSize)
}
result, err := Await(readChunk.Invoke(0, len(buf)))
if len(err) > 0 && !err[0].IsNull() {
return nil, errors.New("file_reader: " + err[0].String())
}
chunk := result[0]
n := js.CopyBytesToGo(buf, chunk)
if n < len(buf) {
return nil, errors.New("file_reader: failed to read first chunk")
}
return &FileReader{
size: fileSize,
offset: 0,
offset: int64(n),
readChunk: readChunk,
}
buf: buf,
endOfFile: n == int(fileSize),
}, nil
}

func (r *FileReader) Read(p []byte) (int, error) {
//js.Value doesn't work in parallel invoke
jsFileReaderMutex.Lock()
defer jsFileReaderMutex.Unlock()
size := len(p)

result, err := Await(r.readChunk.Invoke(r.offset, size))
if len(r.buf)-r.bufOffset < size && !r.endOfFile {
r.bufOffset = 0 //reset buffer offset
result, err := Await(r.readChunk.Invoke(r.offset, len(r.buf)))

if len(err) > 0 && !err[0].IsNull() {
return 0, errors.New("file_reader: " + err[0].String())
}
if len(err) > 0 && !err[0].IsNull() {
return 0, errors.New("file_reader: " + err[0].String())
}

chunk := result[0]
chunk := result[0]

n := js.CopyBytesToGo(p, chunk)
r.offset += int64(n)
n := js.CopyBytesToGo(r.buf, chunk)
r.offset += int64(n)
if n < len(r.buf) {
r.buf = r.buf[:n]
r.endOfFile = true
}
}

if n < size {
n := copy(p, r.buf[r.bufOffset:])
r.bufOffset += n
if r.endOfFile && r.bufOffset == len(r.buf) {
return n, io.EOF
}

Expand Down
78 changes: 50 additions & 28 deletions wasmsdk/jsbridge/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,74 @@ package jsbridge

import (
"errors"
"io"
"io/fs"
"sync"
"syscall/js"
)

var jsFileWriterMutex sync.Mutex

type FileWriter struct {
writableStream js.Value
uint8Array js.Value
fileHandle js.Value
bufLen int
buf []byte
bufWriteOffset int
}

func (w *FileWriter) Write(p []byte) (int, error) {
//js.Value doesn't work in parallel invoke
jsFileWriterMutex.Lock()
defer jsFileWriterMutex.Unlock()
const writeBufferSize = 4 * 1024 * 1024 //4MB

if w.bufLen != len(p) {
w.bufLen = len(p)
w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen)
// len(p) will always be <= 64KB
func (w *FileWriter) Write(p []byte) (int, error) {
//copy bytes to buf
if w.bufWriteOffset+len(p) > len(w.buf) {
return 0, io.ErrShortWrite
}
js.CopyBytesToJS(w.uint8Array, p)
_, err := Await(w.writableStream.Call("write", w.uint8Array))
if len(err) > 0 && !err[0].IsNull() {
return 0, errors.New("file_writer: " + err[0].String())
n := copy(w.buf[w.bufWriteOffset:], p)
w.bufWriteOffset += n
if w.bufWriteOffset == len(w.buf) {
//write to file
if w.bufLen != len(w.buf) {
w.bufLen = len(w.buf)
w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen)
}
js.CopyBytesToJS(w.uint8Array, w.buf)
_, err := Await(w.writableStream.Call("write", w.uint8Array))
if len(err) > 0 && !err[0].IsNull() {
return 0, errors.New("file_writer: " + err[0].String())
}
//reset buffer
w.bufWriteOffset = 0
}
return len(p), nil
}

func (w *FileWriter) WriteAt(p []byte, offset int64) (int, error) {
uint8Array := js.Global().Get("Uint8Array").New(len(p))
js.CopyBytesToJS(uint8Array, p)
options := js.Global().Get("Object").New()
options.Set("type", "write")
options.Set("position", offset)
options.Set("data", uint8Array)
options.Set("size", len(p))
_, err := Await(w.writableStream.Call("write", options))
if len(err) > 0 && !err[0].IsNull() {
return 0, errors.New("file_writer: " + err[0].String())
}
return len(p), nil
}
// func (w *FileWriter) WriteAt(p []byte, offset int64) (int, error) {
// uint8Array := js.Global().Get("Uint8Array").New(len(p))
// js.CopyBytesToJS(uint8Array, p)
// options := js.Global().Get("Object").New()
// options.Set("type", "write")
// options.Set("position", offset)
// options.Set("data", uint8Array)
// options.Set("size", len(p))
// _, err := Await(w.writableStream.Call("write", options))
// if len(err) > 0 && !err[0].IsNull() {
// return 0, errors.New("file_writer: " + err[0].String())
// }
// return len(p), nil
// }

func (w *FileWriter) Close() error {

if w.bufWriteOffset > 0 {
w.buf = w.buf[:w.bufWriteOffset]
uint8Array := js.Global().Get("Uint8Array").New(len(w.buf))
js.CopyBytesToJS(uint8Array, w.buf)
_, err := Await(w.writableStream.Call("write", uint8Array))
if len(err) > 0 && !err[0].IsNull() {
return errors.New("file_writer: " + err[0].String())
}
}

_, err := Await(w.writableStream.Call("close"))
if len(err) > 0 && !err[0].IsNull() {
return errors.New("file_writer: " + err[0].String())
Expand Down Expand Up @@ -99,5 +120,6 @@ func NewFileWriter(filename string) (*FileWriter, error) {
return &FileWriter{
writableStream: writableStream[0],
fileHandle: fileHandle[0],
buf: make([]byte, writeBufferSize),
}, nil
}
1 change: 1 addition & 0 deletions wasmsdk/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func main() {
"updateForbidAllocation": UpdateForbidAllocation,
"send": send,
"cancelUpload": cancelUpload,
"pauseUpload": pauseUpload,

// player
"play": play,
Expand Down
42 changes: 42 additions & 0 deletions winsdk/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,3 +740,45 @@ func DeleteAuthTicket(allocationID, remotePath, refereeClientID *C.char) *C.char
return WithJSON(true, nil)

}

func CancelUpload(allocationID, remotePath *C.char) *C.char {
defer func() {
if r := recover(); r != nil {
log.Error("win: crash ", r)
}
}()
alloc, err := getAllocation(C.GoString(allocationID))
if err != nil {
log.Error("win: ", err)
return WithJSON(false, err)
}

rPath := C.GoString(remotePath)
err = alloc.CancelUpload(rPath)
if err != nil {
log.Error("win: ", err)
return WithJSON(false, err)
}
return WithJSON(true, nil)
}

func PauseUpload(allocationID, remotePath *C.char) *C.char {
defer func() {
if r := recover(); r != nil {
log.Error("win: crash ", r)
}
}()
alloc, err := getAllocation(C.GoString(allocationID))
if err != nil {
log.Error("win: ", err)
return WithJSON(false, err)
}

rPath := C.GoString(remotePath)
err = alloc.PauseUpload(rPath)
if err != nil {
log.Error("win: ", err)
return WithJSON(false, err)
}
return WithJSON(true, nil)
}
Loading

0 comments on commit f8f3928

Please sign in to comment.