1
+ package io.sniffy.influxdb.parser
2
+
3
+ import io.sniffy.influxdb.lineprotocol.Point
4
+ import io.sniffy.influxdb.parser.LineProtocolParser.State.*
5
+ import java.io.InputStream
6
+ import java.io.InputStreamReader
7
+ import java.io.Reader
8
+ import java.io.StringReader
9
+ import java.lang.Long.parseLong
10
+
11
+ class LineProtocolParser (private val reader : Reader , private val failFast : Boolean = false ) : Iterator<Point> {
12
+
13
+ private val sb = StringBuilder ()
14
+ private val sb2 = StringBuilder ()
15
+
16
+ @Volatile private var nextPoint: Point ? = null
17
+
18
+ constructor (string: String , failFast: Boolean = false ) :
19
+ this (StringReader (string), failFast)
20
+
21
+ constructor (inputStream: InputStream , failFast: Boolean = false ) :
22
+ this (InputStreamReader (inputStream, Charsets .UTF_8 ), failFast)
23
+
24
+ override fun hasNext (): Boolean {
25
+ nextPoint = parseNext()
26
+ return nextPoint != null
27
+ }
28
+
29
+ override fun next (): Point {
30
+ return nextPoint ? : throw NoSuchElementException ()
31
+ }
32
+
33
+ private enum class State {
34
+ Beginning ,
35
+
36
+ Comment ,
37
+
38
+ Measurement ,
39
+
40
+ TagKey ,
41
+ TagValue ,
42
+
43
+ FieldKey ,
44
+ FieldValue ,
45
+ StringFieldValue ,
46
+ NonStringFieldValue ,
47
+
48
+ Timestamp ,
49
+
50
+ End {
51
+ override fun finite () = true
52
+ },
53
+ Eos {
54
+ override fun finite () = true
55
+ },
56
+ Error {
57
+ override fun finite () = true
58
+ };
59
+
60
+ open fun finite () = false
61
+
62
+ }
63
+
64
+ internal @Synchronized
65
+ fun parseNext (): Point ? {
66
+
67
+ val builder = Point .Builder ()
68
+
69
+ var state = State .Beginning
70
+
71
+ sb.setLength(0 )
72
+ sb2.setLength(0 )
73
+
74
+ // TODO: what about \r ??
75
+
76
+ while (! state.finite()) {
77
+ when (state) {
78
+ Beginning -> {
79
+ val i1 = reader.read()
80
+ if (i1 < 0 ) state = Eos
81
+ else {
82
+ val c1 = i1.toChar()
83
+ when (c1) {
84
+ ' #' -> state = Comment
85
+ else -> {
86
+ sb.append(c1)
87
+ state = Measurement
88
+ }
89
+ }
90
+ }
91
+ }
92
+ Comment -> {
93
+ val i1 = reader.read()
94
+ if (i1 < 0 ) state = Eos
95
+ else {
96
+ val c1 = i1.toChar()
97
+ when (c1) {
98
+ ' \n ' -> state = Beginning
99
+ }
100
+ }
101
+
102
+ }
103
+ Measurement -> {
104
+ val i1 = reader.read()
105
+ if (i1 < 0 ) state = Error
106
+ else {
107
+ val c1 = i1.toChar()
108
+ when (c1) {
109
+ ' \\ ' -> {
110
+ val i2 = reader.read()
111
+ if (i2 < 0 ) state = Error
112
+ else {
113
+ val c2 = i2.toChar()
114
+ when (c2) {
115
+ ' \n ' -> state = Error
116
+ ' ,' -> sb.append(' ,' )
117
+ ' ' -> sb.append(' ' )
118
+ else -> sb.append(' \\ ' ).append(c2)
119
+ }
120
+ }
121
+ }
122
+ ' \n ' -> state = Error
123
+ ' ,' , ' ' -> {
124
+ val measurement = sb.toString()
125
+ if (measurement.isEmpty()) {
126
+ state = Error // TODO: read till end of line
127
+ } else {
128
+ builder.measurement = measurement
129
+ state = if (' ,' == c1) TagKey else FieldKey
130
+ }
131
+ sb.setLength(0 )
132
+ }
133
+ else -> sb.append(c1)
134
+ }
135
+ }
136
+ }
137
+ TagKey -> {
138
+ val i1 = reader.read()
139
+ if (i1 < 0 ) state = Error
140
+ else {
141
+ val c1 = i1.toChar()
142
+ when (c1) {
143
+ ' \\ ' -> {
144
+ val i2 = reader.read()
145
+ if (i2 < 0 ) state = Error
146
+ else {
147
+ val c2 = i2.toChar()
148
+ when (c2) {
149
+ ' \n ' -> state = Error
150
+ ' ,' -> sb.append(' ,' )
151
+ ' ' -> sb.append(' ' )
152
+ ' =' -> sb.append(' =' )
153
+ else -> sb.append(' \\ ' ).append(c2)
154
+ }
155
+ }
156
+ }
157
+ ' \n ' -> state = Error
158
+ ' ,' , ' ' -> {
159
+ state = Error
160
+ // TODO: read till new line
161
+ }
162
+ ' =' -> {
163
+ state = TagValue
164
+ }
165
+ else -> sb.append(c1)
166
+ }
167
+ }
168
+ }
169
+ TagValue -> {
170
+ val i1 = reader.read()
171
+ if (i1 < 0 ) state = Error
172
+ else {
173
+ val c1 = i1.toChar()
174
+ when (c1) {
175
+ ' \\ ' -> {
176
+ val i2 = reader.read()
177
+ if (i2 < 0 ) state = Error
178
+ else {
179
+ val c2 = i2.toChar()
180
+ when (c2) {
181
+ ' \n ' -> state = Error
182
+ ' ,' -> sb2.append(' ,' )
183
+ ' ' -> sb2.append(' ' )
184
+ ' =' -> sb2.append(' =' )
185
+ else -> sb2.append(' \\ ' ).append(c2)
186
+ }
187
+ }
188
+ }
189
+ ' ,' , ' ' -> {
190
+ val tagKey = sb.toString()
191
+ val tagValue = sb2.toString()
192
+ if (tagKey.isEmpty() || tagValue.isEmpty()) {
193
+ state = Error // TODO: skip if fail fast or read till new Line
194
+ } else {
195
+ builder.addTag(tagKey, tagValue)
196
+ state = if (' ,' == c1) TagKey else FieldKey
197
+ }
198
+ sb.setLength(0 )
199
+ sb2.setLength(0 )
200
+ }
201
+ ' \n ' -> state = Error
202
+ ' =' -> {
203
+ state = Error // TODO: read till new Line
204
+ }
205
+ else -> sb2.append(c1)
206
+ }
207
+ }
208
+ }
209
+
210
+ FieldKey -> {
211
+ val i1 = reader.read()
212
+ if (i1 < 0 ) state = Error
213
+ else {
214
+ val c1 = i1.toChar()
215
+ when (c1) {
216
+ ' \\ ' -> {
217
+ val i2 = reader.read()
218
+ if (i2 < 0 ) state = Error
219
+ else {
220
+ val c2 = i2.toChar()
221
+ when (c2) {
222
+ ' \n ' -> state = Error
223
+ ' ,' -> sb.append(' ,' )
224
+ ' ' -> sb.append(' ' )
225
+ ' =' -> sb.append(' =' )
226
+ else -> sb.append(' \\ ' ).append(c2)
227
+ }
228
+ }
229
+ }
230
+ ' \n ' -> state = Error
231
+ ' ,' , ' ' -> {
232
+ state = Error
233
+ }
234
+ ' =' -> {
235
+ state = FieldValue
236
+ }
237
+ else -> sb.append(c1)
238
+ }
239
+ }
240
+ }
241
+ FieldValue -> {
242
+ val i1 = reader.read()
243
+ if (i1 < 0 ) state = Error
244
+ else {
245
+ val c1 = i1.toChar()
246
+ when (c1) {
247
+ ' \n ' -> state = Error
248
+ ' "' -> state = StringFieldValue
249
+ else -> {
250
+ sb2.append(c1)
251
+ state = NonStringFieldValue
252
+ }
253
+ }
254
+ }
255
+ }
256
+ StringFieldValue -> {
257
+ val i1 = reader.read()
258
+ if (i1 < 0 ) state = Error
259
+ else {
260
+ val c1 = i1.toChar()
261
+ when (c1) {
262
+ ' \\ ' -> {
263
+ val i2 = reader.read()
264
+ if (i2 < 0 ) state = Error
265
+ else {
266
+ val c2 = i2.toChar()
267
+ when (c2) {
268
+ ' "' -> sb2.append(' "' )
269
+ else -> sb2.append(' \\ ' ).append(c2)
270
+ }
271
+ }
272
+ }
273
+ ' "' -> {
274
+ builder.addValue(sb.toString(), sb2.toString())
275
+ sb.setLength(0 )
276
+ sb2.setLength(0 )
277
+ val i2 = reader.read()
278
+ if (i2 < 0 ) state = End
279
+ else when (i2.toChar()) {
280
+ ' \n ' -> state = End
281
+ ' ,' -> state = FieldKey
282
+ ' ' -> state = Timestamp
283
+ else -> state = Error
284
+ }
285
+ }
286
+ else -> sb2.append(c1)
287
+ }
288
+ }
289
+ }
290
+ NonStringFieldValue -> {
291
+ val i1 = reader.read()
292
+ if (i1 < 0 ) {
293
+
294
+ val value = sb2.toString()
295
+
296
+ when (value) {
297
+ " t" , " T" , " true" , " True" , " TRUE" -> {
298
+ builder.addValue(sb.toString(), true )
299
+ }
300
+ " f" , " F" , " false" , " False" , " FALSE" -> {
301
+ builder.addValue(sb.toString(), false )
302
+ }
303
+ else -> {
304
+ if (value.endsWith(' i' )) {
305
+ builder.addValue(
306
+ sb.toString(),
307
+ parseLong(value.substring(0 , value.length - 1 ))
308
+ )
309
+ } else {
310
+ builder.addValue(
311
+ sb.toString(),
312
+ java.lang.Double .parseDouble(value)
313
+ )
314
+ }
315
+ }
316
+ }
317
+
318
+ sb.setLength(0 )
319
+ sb2.setLength(0 )
320
+
321
+ state = End
322
+ } else {
323
+ val c1 = i1.toChar()
324
+ when (c1) {
325
+ ' ,' , ' ' , ' \n ' -> {
326
+
327
+ val value = sb2.toString()
328
+
329
+ when (value) {
330
+ " t" , " T" , " true" , " True" , " TRUE" -> {
331
+ builder.addValue(sb.toString(), true )
332
+ }
333
+ " f" , " F" , " false" , " False" , " FALSE" -> {
334
+ builder.addValue(sb.toString(), false )
335
+ }
336
+ else -> {
337
+ if (value.endsWith(' i' )) {
338
+ builder.addValue(
339
+ sb.toString(),
340
+ parseLong(value.substring(0 , value.length - 1 ))
341
+ )
342
+ } else {
343
+ builder.addValue(
344
+ sb.toString(),
345
+ java.lang.Double .parseDouble(value)
346
+ )
347
+ }
348
+ }
349
+ }
350
+
351
+ sb.setLength(0 )
352
+ sb2.setLength(0 )
353
+ state = if (' ,' == c1) FieldKey else Timestamp
354
+ }
355
+ else -> sb2.append(c1)
356
+ }
357
+ }
358
+ }
359
+
360
+ Timestamp -> {
361
+ val i1 = reader.read()
362
+ if (i1 < 0 ) {
363
+ builder.timestamp = parseLong(sb.toString())
364
+ state = End
365
+ } else {
366
+ val c1 = i1.toChar()
367
+ when (c1) {
368
+ ' \n ' -> {
369
+ builder.timestamp = parseLong(sb.toString())
370
+ state = End
371
+ }
372
+ else -> {
373
+ sb.append(c1)
374
+ }
375
+ }
376
+ }
377
+
378
+ }
379
+ }
380
+ }
381
+
382
+ return builder.build()
383
+
384
+ }
385
+
386
+ }
0 commit comments