Skip to content

Commit

Permalink
optimize PUT memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ext-simba-jl committed Dec 5, 2024
1 parent 951ae61 commit 673e693
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 40 deletions.
32 changes: 23 additions & 9 deletions connection_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (sc *snowflakeConn) processFileTransfer(
if sfa.options.MultiPartThreshold == 0 {
sfa.options.MultiPartThreshold = dataSizeThreshold
}
if err := sfa.execute(); err != nil {
if err = sfa.execute(); err != nil {
return nil, err
}
data, err = sfa.result()
Expand All @@ -134,18 +134,32 @@ func (sc *snowflakeConn) processFileTransfer(
return data, nil
}

func getFileStream(ctx context.Context) (*bytes.Buffer, error) {
func getReaderFromContext(ctx context.Context) io.Reader {
s := ctx.Value(fileStreamFile)
if s == nil {
return nil, nil
}
r, ok := s.(io.Reader)
if !ok {
return nil, errors.New("incorrect io.Reader")
return nil
}
return r
}

func getFileStream(ctx context.Context) (*bytes.Buffer, error) {
r := getReaderFromContext(ctx)
if r == nil {
return nil, nil
}

// read a small amount of data to check if file stream will be used
buf := make([]byte, defaultStringBufferSize)
for {
_, err := r.Read(buf)
if err != nil {
return nil, err
} else {

Check failure on line 158 in connection_util.go

View workflow job for this annotation

GitHub Actions / Check linter

if block ends with a return statement, so drop this else and outdent its block
break
}
}
buf := new(bytes.Buffer)
_, err := buf.ReadFrom(r)
return buf, err
return bytes.NewBuffer(buf), nil
}

func getFileTransferOptions(ctx context.Context) *SnowflakeFileTransferOptions {
Expand Down
25 changes: 5 additions & 20 deletions file_transfer_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,20 +463,9 @@ func (sfa *snowflakeFileTransferAgent) processFileCompressionType() error {
if currentFileCompressionType == nil {
var mtype *mimetype.MIME
var err error
if meta.srcStream != nil {
r := getReaderFromBuffer(&meta.srcStream)
mtype, err = mimetype.DetectReader(r)
if err != nil {
return err
}
if _, err = io.ReadAll(r); err != nil { // flush out tee buffer
return err
}
} else {
mtype, err = mimetype.DetectFile(fileName)
if err != nil {
return err
}
mtype, err = mimetype.DetectFile(fileName)
if err != nil {
return err
}
currentFileCompressionType = lookupByExtension(mtype.Extension())
}
Expand Down Expand Up @@ -858,7 +847,7 @@ func (sfa *snowflakeFileTransferAgent) uploadOneFile(meta *fileMetadata) (*fileM
fileUtil := new(snowflakeFileUtil)
if meta.requireCompress {
if meta.srcStream != nil {
meta.realSrcStream, _, err = fileUtil.compressFileWithGzipFromStream(&meta.srcStream)
meta.realSrcStream, _, err = fileUtil.compressFileWithGzipFromStream(sfa.ctx)
} else {
meta.realSrcFileName, _, err = fileUtil.compressFileWithGzip(meta.srcFileName, tmpDir)
}
Expand All @@ -868,11 +857,7 @@ func (sfa *snowflakeFileTransferAgent) uploadOneFile(meta *fileMetadata) (*fileM
}

if meta.srcStream != nil {
if meta.realSrcStream != nil {
meta.sha256Digest, meta.uploadSize, err = fileUtil.getDigestAndSizeForStream(&meta.realSrcStream)
} else {
meta.sha256Digest, meta.uploadSize, err = fileUtil.getDigestAndSizeForStream(&meta.srcStream)
}
meta.sha256Digest, meta.uploadSize, err = fileUtil.getDigestAndSizeForStream(&meta.realSrcStream, &meta.srcStream, sfa.ctx)
} else {
meta.sha256Digest, meta.uploadSize, err = fileUtil.getDigestAndSizeForFile(meta.realSrcFileName)
}
Expand Down
46 changes: 35 additions & 11 deletions file_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package gosnowflake
import (
"bytes"
"compress/gzip"
"context"
"crypto/sha256"
"encoding/base64"
"io"
Expand All @@ -23,16 +24,28 @@ const (
readWriteFileMode os.FileMode = 0666
)

func (util *snowflakeFileUtil) compressFileWithGzipFromStream(srcStream **bytes.Buffer) (*bytes.Buffer, int, error) {
r := getReaderFromBuffer(srcStream)
buf, err := io.ReadAll(r)
if err != nil {
return nil, -1, err
}
func (util *snowflakeFileUtil) compressFileWithGzipFromStream(ctx context.Context) (*bytes.Buffer, int, error) {
var c bytes.Buffer
w := gzip.NewWriter(&c)
if _, err := w.Write(buf); err != nil { // write buf to gzip writer
return nil, -1, err
buf := make([]byte, fileChunkSize)
r := getReaderFromContext(ctx)
if r == nil {
return nil, -1, nil
}

// read the whole file in chunks
for {
n, err := r.Read(buf)
if err == io.EOF {
break
}
if err != nil {
return nil, -1, err
}
// write buf to gzip writer
if _, err = w.Write(buf[:n]); err != nil {
return nil, -1, err
}
}
if err := w.Close(); err != nil {
return nil, -1, err
Expand Down Expand Up @@ -75,11 +88,22 @@ func (util *snowflakeFileUtil) compressFileWithGzip(fileName string, tmpDir stri
return gzipFileName, stat.Size(), err
}

func (util *snowflakeFileUtil) getDigestAndSizeForStream(stream **bytes.Buffer) (string, int64, error) {
func (util *snowflakeFileUtil) getDigestAndSizeForStream(realSrcStream **bytes.Buffer, srcStream **bytes.Buffer, ctx context.Context) (string, int64, error) {

Check failure on line 91 in file_util.go

View workflow job for this annotation

GitHub Actions / Check linter

context.Context should be the first parameter of a function
var r io.Reader
var stream **bytes.Buffer
if realSrcStream != nil {
r = getReaderFromBuffer(srcStream)
stream = realSrcStream
} else {
r = getReaderFromContext(ctx)
stream = srcStream
}
if r == nil {
return "", 0, nil
}

m := sha256.New()
r := getReaderFromBuffer(stream)
chunk := make([]byte, fileChunkSize)

for {
n, err := r.Read(chunk)
if err == io.EOF {
Expand Down

0 comments on commit 673e693

Please sign in to comment.