Skip to content

Commit 938f015

Browse files
LazinRobAtticus
authored andcommitted
Add Akumuli database support
This PR adds support for the Akumuli database for the devops use case. Akumuli is able to handle all queries for devops except for the GroupByOrderByLimit.
1 parent f38a8a8 commit 938f015

File tree

17 files changed

+1128
-2
lines changed

17 files changed

+1128
-2
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Current databases supported:
1313
+ ClickHouse [(supplemental docs)](docs/clickhouse.md)
1414
+ CrateDB [(supplemental docs)](docs/cratedb.md)
1515
+ SiriDB [(supplemental docs)](docs/siridb.md)
16+
+ Akumuli [(supplemental docs)](docs/akumuli.md)
1617

1718
## Overview
1819

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package serialize
2+
3+
import (
4+
"encoding/binary"
5+
"errors"
6+
"fmt"
7+
"io"
8+
)
9+
10+
const (
11+
placeholderText = "AAAAFFEE"
12+
)
13+
14+
// AkumuliSerializer writes a series of Point elements into RESP encoded
15+
// buffer.
16+
type AkumuliSerializer struct {
17+
book map[string]uint32
18+
bookClosed bool
19+
deferred []byte
20+
index uint32
21+
}
22+
23+
// NewAkumuliSerializer initializes AkumuliSerializer instance.
24+
func NewAkumuliSerializer() *AkumuliSerializer {
25+
s := &AkumuliSerializer{}
26+
s.book = make(map[string]uint32)
27+
s.deferred = make([]byte, 0, 4096)
28+
s.bookClosed = false
29+
return s
30+
}
31+
32+
// Serialize writes Point data to the given writer, conforming to the
33+
// AKUMULI RESP protocol. Serializer adds extra data to guide data loader.
34+
// This function writes output that contains binary and text data in RESP format.
35+
func (s *AkumuliSerializer) Serialize(p *Point, w io.Writer) (err error) {
36+
deferPoint := false
37+
38+
buf := make([]byte, 0, 1024)
39+
// Add cue
40+
const HeaderLength = 8
41+
buf = append(buf, placeholderText...)
42+
buf = append(buf, "+"...)
43+
44+
// Series name
45+
for i := 0; i < len(p.fieldKeys); i++ {
46+
buf = append(buf, p.measurementName...)
47+
buf = append(buf, '.')
48+
buf = append(buf, p.fieldKeys[i]...)
49+
if i+1 < len(p.fieldKeys) {
50+
buf = append(buf, '|')
51+
} else {
52+
buf = append(buf, ' ')
53+
}
54+
}
55+
56+
for i := 0; i < len(p.tagKeys); i++ {
57+
buf = append(buf, ' ')
58+
buf = append(buf, p.tagKeys[i]...)
59+
buf = append(buf, '=')
60+
buf = append(buf, p.tagValues[i].(string)...)
61+
}
62+
63+
series := string(buf[HeaderLength:])
64+
if !s.bookClosed {
65+
// Save point for later
66+
if id, ok := s.book[series]; ok {
67+
s.bookClosed = true
68+
_, err = w.Write(s.deferred)
69+
if err != nil {
70+
return err
71+
}
72+
buf = buf[:HeaderLength]
73+
buf = append(buf, fmt.Sprintf(":%d", id)...)
74+
binary.LittleEndian.PutUint32(buf[:4], id)
75+
} else {
76+
// Shortcut
77+
s.index++
78+
tmp := make([]byte, 0, 1024)
79+
tmp = append(tmp, placeholderText...)
80+
tmp = append(tmp, "*2\n"...)
81+
tmp = append(tmp, buf[HeaderLength:]...)
82+
tmp = append(tmp, '\n')
83+
tmp = append(tmp, fmt.Sprintf(":%d\n", s.index)...)
84+
s.book[series] = s.index
85+
// Update cue
86+
binary.LittleEndian.PutUint16(tmp[4:6], uint16(len(tmp)))
87+
binary.LittleEndian.PutUint16(tmp[6:HeaderLength], uint16(0))
88+
binary.LittleEndian.PutUint32(tmp[:4], s.index)
89+
binary.LittleEndian.PutUint32(buf[:4], s.index)
90+
_, err = w.Write(tmp)
91+
if err != nil {
92+
return err
93+
}
94+
deferPoint = true
95+
buf = buf[:HeaderLength]
96+
buf = append(buf, fmt.Sprintf(":%d", s.index)...)
97+
}
98+
} else {
99+
// Replace the series name with the value from the book
100+
if id, ok := s.book[series]; ok {
101+
buf = buf[:HeaderLength]
102+
buf = append(buf, fmt.Sprintf(":%d", id)...)
103+
binary.LittleEndian.PutUint16(buf[4:6], uint16(len(buf)))
104+
binary.LittleEndian.PutUint16(buf[6:HeaderLength], uint16(0))
105+
binary.LittleEndian.PutUint32(buf[:4], id)
106+
} else {
107+
return errors.New("unexpected series name")
108+
}
109+
}
110+
111+
buf = append(buf, '\n')
112+
113+
// Timestamp
114+
buf = append(buf, ':')
115+
buf = fastFormatAppend(p.timestamp.UTC().UnixNano(), buf)
116+
buf = append(buf, '\n')
117+
118+
// Values
119+
buf = append(buf, fmt.Sprintf("*%d\n", len(p.fieldValues))...)
120+
for i := 0; i < len(p.fieldValues); i++ {
121+
v := p.fieldValues[i]
122+
switch v.(type) {
123+
case int, int64:
124+
buf = append(buf, ':')
125+
case float64:
126+
buf = append(buf, '+')
127+
}
128+
buf = fastFormatAppend(v, buf)
129+
buf = append(buf, '\n')
130+
}
131+
132+
// Update cue
133+
binary.LittleEndian.PutUint16(buf[4:6], uint16(len(buf)))
134+
binary.LittleEndian.PutUint16(buf[6:HeaderLength], uint16(len(p.fieldValues)))
135+
if deferPoint {
136+
s.deferred = append(s.deferred, buf...)
137+
return nil
138+
}
139+
_, err = w.Write(buf)
140+
return err
141+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package serialize
2+
3+
import (
4+
"bytes"
5+
"strings"
6+
"testing"
7+
)
8+
9+
func TestAkumuliSerializerSerialize(t *testing.T) {
10+
11+
serializer := NewAkumuliSerializer()
12+
13+
points := []*Point{
14+
testPointDefault,
15+
testPointInt,
16+
testPointMultiField,
17+
testPointDefault,
18+
testPointInt,
19+
testPointMultiField,
20+
}
21+
22+
type testCase struct {
23+
expCount int
24+
expValue string
25+
name string
26+
}
27+
28+
cases := []testCase{
29+
{
30+
expCount: 1,
31+
expValue: "+cpu.usage_guest_nice hostname=host_0 region=eu-west-1 datacenter=eu-west-1b",
32+
name: "series name default",
33+
},
34+
{
35+
expCount: 1,
36+
expValue: "+cpu.usage_guest hostname=host_0 region=eu-west-1 datacenter=eu-west-1b",
37+
name: "series name int",
38+
},
39+
{
40+
expCount: 1,
41+
expValue: "+cpu.big_usage_guest|cpu.usage_guest|cpu.usage_guest_nice hostname=host_0 region=eu-west-1 datacenter=eu-west-1b",
42+
name: "series name multi-field",
43+
},
44+
{
45+
expCount: 2,
46+
expValue: "*1\n+38.24311829",
47+
name: "value default",
48+
},
49+
{
50+
expCount: 2,
51+
expValue: "*1\n:38",
52+
name: "value int",
53+
},
54+
{
55+
expCount: 2,
56+
expValue: "*3\n:5000000000\n:38\n+38.24311829",
57+
name: "value multi-field",
58+
},
59+
{
60+
expCount: 6,
61+
expValue: ":1451606400000000000",
62+
name: "timestamp",
63+
},
64+
}
65+
buf := new(bytes.Buffer)
66+
for _, point := range points {
67+
serializer.Serialize(point, buf)
68+
}
69+
70+
got := buf.String()
71+
72+
for _, c := range cases {
73+
actualCnt := strings.Count(got, c.expValue)
74+
if actualCnt != c.expCount {
75+
t.Errorf("Output incorrect: %s expected %d times got %d times", c.name, c.expCount, actualCnt)
76+
}
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package akumuli
2+
3+
import (
4+
"time"
5+
6+
"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
7+
"github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils"
8+
"github.com/timescale/tsbs/query"
9+
)
10+
11+
// BaseGenerator contains settings specific for Akumuli database.
12+
type BaseGenerator struct {
13+
}
14+
15+
// GenerateEmptyQuery returns an empty query.HTTP
16+
func (d *Devops) GenerateEmptyQuery() query.Query {
17+
return query.NewHTTP()
18+
}
19+
20+
// fillInQuery fills the query struct with data.
21+
func (g *BaseGenerator) fillInQuery(qi query.Query, humanLabel, humanDesc, body string, begin, end int64) {
22+
q := qi.(*query.HTTP)
23+
q.HumanLabel = []byte(humanLabel)
24+
q.HumanDescription = []byte(humanDesc)
25+
q.Method = []byte("POST")
26+
q.Path = []byte("/api/query")
27+
q.Body = []byte(body)
28+
q.StartTimestamp = begin
29+
q.EndTimestamp = end
30+
}
31+
32+
// NewDevops makes an Devops object ready to generate Queries.
33+
func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryGenerator, error) {
34+
core, err := devops.NewCore(start, end, scale)
35+
if err != nil {
36+
return nil, err
37+
}
38+
39+
devops := &Devops{
40+
BaseGenerator: g,
41+
Core: core,
42+
}
43+
44+
return devops, nil
45+
}

0 commit comments

Comments
 (0)