Skip to content

Commit

Permalink
Merge pull request #8709 from dolthub/db/export-stream-scanner
Browse files Browse the repository at this point in the history
[no-release-notes] /go/cmd/dolt/commands: export stream scanner
  • Loading branch information
coffeegoddd authored Jan 2, 2025
2 parents 1fad4fc + 95ec611 commit 8103219
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 17 deletions.
2 changes: 1 addition & 1 deletion go/cmd/dolt/commands/filter-branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func processFilterQuery(ctx context.Context, dEnv *env.DoltEnv, root doltdb.Root
return nil, err
}

scanner := newStreamScanner(strings.NewReader(query))
scanner := NewStreamScanner(strings.NewReader(query))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/dolt/commands/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func saveQuery(ctx *sql.Context, root doltdb.RootValue, query string, name strin

// execBatchMode runs all the queries in the input reader
func execBatchMode(ctx *sql.Context, qryist cli.Queryist, input io.Reader, continueOnErr bool, format engine.PrintResultFormat) error {
scanner := newStreamScanner(input)
scanner := NewStreamScanner(input)
var query string
for scanner.Scan() {
if fileReadProg != nil {
Expand Down
29 changes: 15 additions & 14 deletions go/cmd/dolt/commands/sql_statement_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ const delimPrefixLen = 10

var delimPrefix = []byte("delimiter ")

// streamScanner is an iterator that reads bytes from |inp| until either
// StreamScanner is an iterator that reads bytes from |inp| until either
// (1) we match a DELIMITER statement, (2) we match the |delimiter| token,
// or (3) we EOF the file. After each Scan() call, the valid token will
// span from the buffer beginning to |state.end|.
type streamScanner struct {
type StreamScanner struct {
inp io.Reader
buf []byte
maxSize int
Expand All @@ -61,8 +61,9 @@ type streamScanner struct {
state *qState
}

func newStreamScanner(r io.Reader) *streamScanner {
return &streamScanner{inp: r, buf: make([]byte, pageSize), maxSize: maxStatementBufferBytes, delimiter: []byte(";"), state: new(qState)}
// NewStreamScanner returns a new StreamScanner
func NewStreamScanner(r io.Reader) *StreamScanner {
return &StreamScanner{inp: r, buf: make([]byte, pageSize), maxSize: maxStatementBufferBytes, delimiter: []byte(";"), state: new(qState)}
}

type qState struct {
Expand All @@ -77,7 +78,7 @@ type qState struct {
statementStartLine int
}

func (s *streamScanner) Scan() bool {
func (s *StreamScanner) Scan() bool {
// truncate last query
s.truncate()
s.resetState()
Expand Down Expand Up @@ -131,7 +132,7 @@ func (s *streamScanner) Scan() bool {
}
}

func (s *streamScanner) skipWhitespace() bool {
func (s *StreamScanner) skipWhitespace() bool {
for {
if s.i >= s.fill {
if err := s.read(); err != nil {
Expand All @@ -153,17 +154,17 @@ func (s *streamScanner) skipWhitespace() bool {
return true
}

func (s *streamScanner) truncate() {
func (s *StreamScanner) truncate() {
// copy size should be 4k or less
s.state.start = s.i
s.state.end = s.i
}

func (s *streamScanner) resetState() {
func (s *StreamScanner) resetState() {
s.state = &qState{}
}

func (s *streamScanner) read() error {
func (s *StreamScanner) read() error {
if s.fill >= s.maxSize {
// if script exceeds buffer that's OK, if
// a single query exceeds buffer that's not OK
Expand Down Expand Up @@ -195,21 +196,21 @@ func (s *streamScanner) read() error {
return nil
}

func (s *streamScanner) Err() error {
func (s *StreamScanner) Err() error {
return s.err
}

func (s *streamScanner) Bytes() []byte {
func (s *StreamScanner) Bytes() []byte {
return s.buf[s.state.start:s.state.end]
}

// Text returns the most recent token generated by a call to [Scanner.Scan]
// as a newly allocated string holding its bytes.
func (s *streamScanner) Text() string {
func (s *StreamScanner) Text() string {
return string(s.Bytes())
}

func (s *streamScanner) isDelimiterExpr() (error, bool) {
func (s *StreamScanner) isDelimiterExpr() (error, bool) {
if s.i == 0 && s.fill-s.i < delimPrefixLen {
// need to see first |delimPrefixLen| characters
if err := s.read(); err != nil {
Expand Down Expand Up @@ -251,7 +252,7 @@ func (s *streamScanner) isDelimiterExpr() (error, bool) {
return nil, false
}

func (s *streamScanner) seekDelimiter() (error, bool) {
func (s *StreamScanner) seekDelimiter() (error, bool) {
if s.i >= s.fill {
return nil, false
}
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/dolt/commands/sql_statement_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ insert into foo values (1,2,3)|`,
for _, tt := range testcases {
t.Run(tt.input, func(t *testing.T) {
reader := strings.NewReader(tt.input)
scanner := newStreamScanner(reader)
scanner := NewStreamScanner(reader)
var i int
for scanner.Scan() {
require.True(t, i < len(tt.statements))
Expand Down

0 comments on commit 8103219

Please sign in to comment.