Skip to content

Commit

Permalink
fix: handle line protocol wrap with --skipRowOnError (#536) (#539)
Browse files Browse the repository at this point in the history
closes #502

(cherry picked from commit b29425c)

closes #537
  • Loading branch information
davidby-influx authored Apr 5, 2024
1 parent 3ca7925 commit ec55d42
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 11 deletions.
63 changes: 53 additions & 10 deletions pkg/csv2lp/lp_reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package csv2lp

import (
"errors"
"io"
"log"

Expand All @@ -13,6 +14,9 @@ type LineProtocolFilterReader struct {
lineReader *LineReader
// LineNumber represents line number of csv.Reader, 1 is the first
LineNumber int
buffer []byte
// A slice into buffer that contains the current line
curLine []byte
}

// LineProtocolFilter creates a reader wrapper that parses points, skipping if invalid
Expand All @@ -21,24 +25,63 @@ func LineProtocolFilter(reader io.Reader) *LineProtocolFilterReader {
lineReader.LineNumber = 1 // start counting from 1
return &LineProtocolFilterReader{
lineReader: lineReader,
buffer: make([]byte, defaultBufSize),
}
}

func (state *LineProtocolFilterReader) Read(b []byte) (int, error) {
var err error
var bytesRead int

state.buffer = state.buffer[:0]
for {
bytesRead, err := state.lineReader.Read(b)
if err != nil {
return bytesRead, err
if len(state.curLine) > 0 {
n := copy(b, state.curLine)
// if we have more data than we can put in b, save it for next read
state.curLine = state.curLine[n:]
state.buffer = state.buffer[:0]
return n, err
}
for {
bytesRead, err = state.lineReader.Read(state.buffer[len(state.buffer):cap(state.buffer)])
if bytesRead == 0 {
// real error
if err != nil && !errors.Is(err, io.EOF) {
state.curLine = nil
state.buffer = state.buffer[:0]
return 0, err
}
// EOF
break
} else {
// bytesRead > 0
state.buffer = state.buffer[:len(state.buffer)+bytesRead]
if state.buffer[len(state.buffer)-1] != '\n' && !errors.Is(err, io.EOF) {
// We have run out of buffer, but we need to read more.
if cap(state.buffer) == len(state.buffer) {
state.buffer = append(state.buffer, 0)[:len(state.buffer)]
}
// read until we get a full line
continue
} else {
// Got a whole line or an EOF on this read
break
}
}
}
state.LineNumber = state.lineReader.LastLineNumber
buf := b[0:bytesRead]
pts, err := models.ParsePoints(buf) // any time precision because we won't actually use this point
if err != nil {
state.curLine = state.buffer
pts, parseErr := models.ParsePoints(state.curLine) // any time precision because we won't actually use this point
if parseErr != nil {
log.Printf("invalid point on line %d: %v\n", state.LineNumber, err)
continue
} else if len(pts) == 0 { // no points on this line
continue
state.buffer = state.buffer[:0]
state.curLine = nil
return 0, nil
} else if len(pts) == 0 {
state.curLine = nil
state.buffer = state.buffer[:0]
// err should only be nil or io.EOF here
return 0, err
}
return bytesRead, nil
}
}
198 changes: 197 additions & 1 deletion pkg/csv2lp/lp_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,202 @@ func TestLineProtocolFilter(t *testing.T) {
"weather,location=us-central temperature=31 1465839830100400205",
}, "\n"),
},
{
`# INFLUXDB EXPORT: 2024-03-19T10:00:00Z - 2024-03-19T15:59:59Z
# DDL
#CREATE DATABASE "rde-test" WITH NAME autogen
# DML
# CONTEXT-DATABASE:rde-test
# CONTEXT-RETENTION-POLICY:autogen
# writing tsm data
Data-12,brand=A,tag_one=GT value=1 1710842400000000000
Data-12,brand=A,tag_one=GT value=1 1710842580000000000
Data-12,brand=A,tag_one=GT value=1 1710842760000000000
Data-12,brand=A,tag_one=GT value=1 1710842940000000000
Data-12,brand=A,tag_one=GT value=1 1710843120000000000
Data-12,brand=A,tag_one=GT value=1 1710843300000000000
Data-12,brand=A,tag_one=GT value=1 1710843480000000000
Data-12,brand=A,tag_one=GT value=1 1710843660000000000
Data-12,brand=A,tag_one=GT value=1 1710843840000000000
Data-12,brand=A,tag_one=GT value=1 1710844020000000000
Data-12,brand=A,tag_one=GT value=1 1710844200000000000
Data-12,brand=A,tag_one=GT value=1 1710844380000000000
Data-12,brand=A,tag_one=GT value=1 1710844560000000000
Data-12,brand=A,tag_one=GT value=1 1710844740000000000
Data-12,brand=A,tag_one=GT value=1 1710844920000000000
Data-12,brand=A,tag_one=GT value=1 1710845100000000000
Data-12,brand=A,tag_one=GT value=1 1710845280000000000
Data-12,brand=A,tag_one=GT value=1 1710845460000000000
Data-12,brand=A,tag_one=GT value=1 1710845640000000000
Data-12,brand=A,tag_one=GT value=1 1710845820000000000
Data-12,brand=A,tag_one=GT value=1 1710846000000000000
Data-12,brand=A,tag_one=GT value=1 1710846180000000000
Data-12,brand=A,tag_one=GT value=1 1710846360000000000
Data-12,brand=A,tag_one=GT value=1 1710846540000000000
Data-12,brand=A,tag_one=GT value=1 1710846720000000000
Data-12,brand=A,tag_one=GT value=1 1710846900000000000
Data-12,brand=A,tag_one=GT value=1 1710847080000000000
Data-12,brand=A,tag_one=GT value=1 1710847260000000000
Data-12,brand=A,tag_one=GT value=1 1710847440000000000
Data-12,brand=A,tag_one=GT value=1 1710847620000000000
Data-12,brand=A,tag_one=GT value=1 1710847800000000000
Data-12,brand=A,tag_one=GT value=1 1710847980000000000
Data-12,brand=A,tag_one=GT value=1 1710848160000000000
Data-12,brand=A,tag_one=GT value=1 1710848340000000000
Data-12,brand=A,tag_one=GT value=1 1710848520000000000
Data-12,brand=A,tag_one=GT value=1 1710848700000000000
Data-12,brand=A,tag_one=GT value=1 1710848880000000000
Data-12,brand=A,tag_one=GT value=1 1710849060000000000
Data-12,brand=A,tag_one=GT value=1 1710849240000000000
Data-12,brand=A,tag_one=GT value=1 1710849420000000000
Data-12,brand=A,tag_one=GT value=1 1710849600000000000
Data-12,brand=A,tag_one=GT value=1 1710849780000000000
Data-12,brand=A,tag_one=GT value=1 1710849960000000000
Data-12,brand=A,tag_one=GT value=1 1710850140000000000
Data-12,brand=A,tag_one=GT value=1 1710850320000000000
Data-12,brand=A,tag_one=GT value=1 1710850500000000000
Data-12,brand=A,tag_one=GT value=1 1710850680000000000
Data-12,brand=A,tag_one=GT value=1 1710850860000000000
Data-12,brand=A,tag_one=GT value=1 1710851040000000000
Data-12,brand=A,tag_one=GT value=1 1710851220000000000
Data-12,brand=A,tag_one=GT value=1 1710851400000000000
Data-12,brand=A,tag_one=GT value=1 1710851580000000000
Data-12,brand=A,tag_one=GT value=1 1710851760000000000
Data-12,brand=A,tag_one=GT value=1 1710851940000000000
Data-12,brand=A,tag_one=GT value=1 1710852120000000000
Data-12,brand=A,tag_one=GT value=1 1710852300000000000
Data-12,brand=A,tag_one=GT value=1 1710852480000000000
Data-12,brand=A,tag_one=GT value=1 1710852660000000000
Data-12,brand=A,tag_one=GT value=1 1710852840000000000
Data-12,brand=A,tag_one=GT value=1 1710853020000000000
Data-12,brand=A,tag_one=GT value=1 1710853200000000000
Data-12,brand=A,tag_one=GT value=1 1710853380000000000
Data-12,brand=A,tag_one=GT value=1 1710853560000000000
Data-12,brand=A,tag_one=GT value=1 1710853740000000000
Data-12,brand=A,tag_one=GT value=1 1710853920000000000
Data-12,brand=A,tag_one=GT value=1 1710854100000000000
Data-12,brand=A,tag_one=GT value=1 1710854280000000000
Data-12,brand=A,tag_one=GT value=1 1710854460000000000
Data-12,brand=A,tag_one=GT value=1 1710854640000000000
Data-12,brand=A,tag_one=GT value=1 1710854820000000000
Data-12,brand=A,tag_one=GT value=1 1710855000000000000
Data-12,brand=A,tag_one=GT value=1 1710855180000000000
Data-12,brand=A,tag_one=GT value=1 1710855360000000000
Data-12,brand=A,tag_one=GT value=1 1710855540000000000
Data-12,brand=A,tag_one=GT value=1 1710855720000000000
Data-12,brand=A,tag_one=GT value=1 1710855900000000000
Data-12,brand=A,tag_one=GT value=1 1710856080000000000
Data-12,brand=A,tag_one=GT value=1 1710856260000000000
Data-12,brand=A,tag_one=GT value=1 1710856440000000000
Data-12,brand=A,tag_one=GT value=1 1710856620000000000
Data-12,brand=A,tag_one=GT value=1 1710856800000000000
Data-12,brand=A,tag_one=GT value=1 1710856980000000000
Data-12,brand=A,tag_one=GT value=1 1710857160000000000
Data-12,brand=A,tag_one=GT value=1 1710857340000000000
Data-12,brand=A,tag_one=GT value=1 1710857520000000000
Data-12,brand=A,tag_one=GT value=1 1710857700000000000
Data-12,brand=A,tag_one=GT value=1 1710857880000000000
Data-12,brand=A,ship=BL value=1 1710842400000000000
Data-12,brand=A,ship=BL value=1 1710842580000000000
Data-12,brand=A,ship=BL value=1 1710842760000000000
Data-12,brand=A,ship=BL value=1 1710842940000000000
Data-12,brand=A,ship=BL value=1 1710843120000000000
Data-12,brand=A,ship=BL value=1 1710843300000000000
`, `Data-12,brand=A,tag_one=GT value=1 1710842400000000000
Data-12,brand=A,tag_one=GT value=1 1710842580000000000
Data-12,brand=A,tag_one=GT value=1 1710842760000000000
Data-12,brand=A,tag_one=GT value=1 1710842940000000000
Data-12,brand=A,tag_one=GT value=1 1710843120000000000
Data-12,brand=A,tag_one=GT value=1 1710843300000000000
Data-12,brand=A,tag_one=GT value=1 1710843480000000000
Data-12,brand=A,tag_one=GT value=1 1710843660000000000
Data-12,brand=A,tag_one=GT value=1 1710843840000000000
Data-12,brand=A,tag_one=GT value=1 1710844020000000000
Data-12,brand=A,tag_one=GT value=1 1710844200000000000
Data-12,brand=A,tag_one=GT value=1 1710844380000000000
Data-12,brand=A,tag_one=GT value=1 1710844560000000000
Data-12,brand=A,tag_one=GT value=1 1710844740000000000
Data-12,brand=A,tag_one=GT value=1 1710844920000000000
Data-12,brand=A,tag_one=GT value=1 1710845100000000000
Data-12,brand=A,tag_one=GT value=1 1710845280000000000
Data-12,brand=A,tag_one=GT value=1 1710845460000000000
Data-12,brand=A,tag_one=GT value=1 1710845640000000000
Data-12,brand=A,tag_one=GT value=1 1710845820000000000
Data-12,brand=A,tag_one=GT value=1 1710846000000000000
Data-12,brand=A,tag_one=GT value=1 1710846180000000000
Data-12,brand=A,tag_one=GT value=1 1710846360000000000
Data-12,brand=A,tag_one=GT value=1 1710846540000000000
Data-12,brand=A,tag_one=GT value=1 1710846720000000000
Data-12,brand=A,tag_one=GT value=1 1710846900000000000
Data-12,brand=A,tag_one=GT value=1 1710847080000000000
Data-12,brand=A,tag_one=GT value=1 1710847260000000000
Data-12,brand=A,tag_one=GT value=1 1710847440000000000
Data-12,brand=A,tag_one=GT value=1 1710847620000000000
Data-12,brand=A,tag_one=GT value=1 1710847800000000000
Data-12,brand=A,tag_one=GT value=1 1710847980000000000
Data-12,brand=A,tag_one=GT value=1 1710848160000000000
Data-12,brand=A,tag_one=GT value=1 1710848340000000000
Data-12,brand=A,tag_one=GT value=1 1710848520000000000
Data-12,brand=A,tag_one=GT value=1 1710848700000000000
Data-12,brand=A,tag_one=GT value=1 1710848880000000000
Data-12,brand=A,tag_one=GT value=1 1710849060000000000
Data-12,brand=A,tag_one=GT value=1 1710849240000000000
Data-12,brand=A,tag_one=GT value=1 1710849420000000000
Data-12,brand=A,tag_one=GT value=1 1710849600000000000
Data-12,brand=A,tag_one=GT value=1 1710849780000000000
Data-12,brand=A,tag_one=GT value=1 1710849960000000000
Data-12,brand=A,tag_one=GT value=1 1710850140000000000
Data-12,brand=A,tag_one=GT value=1 1710850320000000000
Data-12,brand=A,tag_one=GT value=1 1710850500000000000
Data-12,brand=A,tag_one=GT value=1 1710850680000000000
Data-12,brand=A,tag_one=GT value=1 1710850860000000000
Data-12,brand=A,tag_one=GT value=1 1710851040000000000
Data-12,brand=A,tag_one=GT value=1 1710851220000000000
Data-12,brand=A,tag_one=GT value=1 1710851400000000000
Data-12,brand=A,tag_one=GT value=1 1710851580000000000
Data-12,brand=A,tag_one=GT value=1 1710851760000000000
Data-12,brand=A,tag_one=GT value=1 1710851940000000000
Data-12,brand=A,tag_one=GT value=1 1710852120000000000
Data-12,brand=A,tag_one=GT value=1 1710852300000000000
Data-12,brand=A,tag_one=GT value=1 1710852480000000000
Data-12,brand=A,tag_one=GT value=1 1710852660000000000
Data-12,brand=A,tag_one=GT value=1 1710852840000000000
Data-12,brand=A,tag_one=GT value=1 1710853020000000000
Data-12,brand=A,tag_one=GT value=1 1710853200000000000
Data-12,brand=A,tag_one=GT value=1 1710853380000000000
Data-12,brand=A,tag_one=GT value=1 1710853560000000000
Data-12,brand=A,tag_one=GT value=1 1710853740000000000
Data-12,brand=A,tag_one=GT value=1 1710853920000000000
Data-12,brand=A,tag_one=GT value=1 1710854100000000000
Data-12,brand=A,tag_one=GT value=1 1710854280000000000
Data-12,brand=A,tag_one=GT value=1 1710854460000000000
Data-12,brand=A,tag_one=GT value=1 1710854640000000000
Data-12,brand=A,tag_one=GT value=1 1710854820000000000
Data-12,brand=A,tag_one=GT value=1 1710855000000000000
Data-12,brand=A,tag_one=GT value=1 1710855180000000000
Data-12,brand=A,tag_one=GT value=1 1710855360000000000
Data-12,brand=A,tag_one=GT value=1 1710855540000000000
Data-12,brand=A,tag_one=GT value=1 1710855720000000000
Data-12,brand=A,tag_one=GT value=1 1710855900000000000
Data-12,brand=A,tag_one=GT value=1 1710856080000000000
Data-12,brand=A,tag_one=GT value=1 1710856260000000000
Data-12,brand=A,tag_one=GT value=1 1710856440000000000
Data-12,brand=A,tag_one=GT value=1 1710856620000000000
Data-12,brand=A,tag_one=GT value=1 1710856800000000000
Data-12,brand=A,tag_one=GT value=1 1710856980000000000
Data-12,brand=A,tag_one=GT value=1 1710857160000000000
Data-12,brand=A,tag_one=GT value=1 1710857340000000000
Data-12,brand=A,tag_one=GT value=1 1710857520000000000
Data-12,brand=A,tag_one=GT value=1 1710857700000000000
Data-12,brand=A,tag_one=GT value=1 1710857880000000000
Data-12,brand=A,ship=BL value=1 1710842400000000000
Data-12,brand=A,ship=BL value=1 1710842580000000000
Data-12,brand=A,ship=BL value=1 1710842760000000000
Data-12,brand=A,ship=BL value=1 1710842940000000000
Data-12,brand=A,ship=BL value=1 1710843120000000000
Data-12,brand=A,ship=BL value=1 1710843300000000000
`,
},
}
for _, tt := range tests {
reader := LineProtocolFilter(strings.NewReader(tt.input))
Expand All @@ -63,6 +259,6 @@ func TestLineProtocolFilter(t *testing.T) {
t.Errorf("failed reading: %v", err)
continue
}
require.Equal(t, strings.TrimSpace(string(b)), strings.TrimSpace(tt.expected))
require.Equal(t, strings.TrimSpace(tt.expected), strings.TrimSpace(string(b)))
}
}

0 comments on commit ec55d42

Please sign in to comment.