diff --git a/.gitignore b/.gitignore index 17c00d909..ea7da366b 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ bin # High Dynamic Range (HDR) Histogram files *.hdr + +bin/ diff --git a/Makefile b/Makefile index f6a73c0c3..1f7388f65 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,8 @@ loaders: tsbs_load \ tsbs_load_siridb \ tsbs_load_timescaledb \ tsbs_load_victoriametrics \ - tsbs_load_questdb + tsbs_load_questdb \ + tsbs_load_iotdb runners: tsbs_run_queries_akumuli \ tsbs_run_queries_cassandra \ @@ -38,7 +39,8 @@ runners: tsbs_run_queries_akumuli \ tsbs_run_queries_timescaledb \ tsbs_run_queries_timestream \ tsbs_run_queries_victoriametrics \ - tsbs_run_queries_questdb + tsbs_run_queries_questdb \ + tsbs_run_queries_iotdb test: $(GOTEST) -v ./... diff --git a/README.md b/README.md index bf78a77df..45adb954f 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Current databases supported: + ClickHouse [(supplemental docs)](docs/clickhouse.md) + CrateDB [(supplemental docs)](docs/cratedb.md) + InfluxDB [(supplemental docs)](docs/influx.md) ++ IoTDB [(supplemental docs)](docs/iotdb.md) + MongoDB [(supplemental docs)](docs/mongo.md) + QuestDB [(supplemental docs)](docs/questdb.md) + SiriDB [(supplemental docs)](docs/siridb.md) @@ -75,6 +76,7 @@ cases are implemented for each database: |ClickHouse|X|| |CrateDB|X|| |InfluxDB|X|X| +|IoTDB|X|| |MongoDB|X| |QuestDB|X|X |SiriDB|X| @@ -92,8 +94,8 @@ query execution performance. (It currently does not measure concurrent insert and query performance, which is a future priority.) To accomplish this in a fair way, the data to be inserted and the queries to run are pre-generated and native Go clients are used -wherever possible to connect to each database (e.g., `mgo` for MongoDB, -`aws sdk` for Timestream). +wherever possible to connect to each database (e.g., `mgo` for MongoDB, +`aws sdk` for Timestream, `iotdb-client-go` for IoTDB). Although the data is randomly generated, TSBS data and queries are entirely deterministic. By supplying the same PRNG (pseudo-random number @@ -134,8 +136,8 @@ Variables needed: 1. an end time. E.g., `2016-01-04T00:00:00Z` 1. how much time should be between each reading per device, in seconds. E.g., `10s` 1. and which database(s) you want to generate for. E.g., `timescaledb` - (choose from `cassandra`, `clickhouse`, `cratedb`, `influx`, `mongo`, `questdb`, `siridb`, - `timescaledb` or `victoriametrics`) + (choose from `cassandra`, `clickhouse`, `cratedb`, `influx`, `iotdb`, `mongo`, + `questdb`, `siridb`, `timescaledb` or `victoriametrics`) Given the above steps you can now generate a dataset (or multiple datasets, if you chose to generate for multiple databases) that can diff --git a/cmd/tsbs_generate_queries/databases/iotdb/common.go b/cmd/tsbs_generate_queries/databases/iotdb/common.go new file mode 100644 index 000000000..bf1a42536 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/iotdb/common.go @@ -0,0 +1,46 @@ +package iotdb + +import ( + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" + "github.com/timescale/tsbs/pkg/query" +) + +const iotdbTimeFmt = "2006-01-02 15:04:05" + +// BaseGenerator contains settings specific for IoTDB +type BaseGenerator struct { + BasicPath string // e.g. "root.sg" is basic path of "root.sg.device". default : "root" + BasicPathLevel int32 // e.g. 0 for "root", 1 for "root.device" +} + +// GenerateEmptyQuery returns an empty query.Mongo. +func (g *BaseGenerator) GenerateEmptyQuery() query.Query { + return query.NewIoTDB() +} + +// fillInQuery fills the query struct with data. +func (g *BaseGenerator) fillInQuery(qi query.Query, humanLabel, humanDesc, sql string) { + q := qi.(*query.IoTDB) + q.HumanLabel = []byte(humanLabel) + q.HumanDescription = []byte(humanDesc) + q.SqlQuery = []byte(sql) +} + +// NewDevops creates a new devops use case query generator. +func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryGenerator, error) { + core, err := devops.NewCore(start, end, scale) + + if err != nil { + return nil, err + } + + devops := &Devops{ + BaseGenerator: g, + Core: core, + } + + return devops, nil +} diff --git a/cmd/tsbs_generate_queries/databases/iotdb/devops.go b/cmd/tsbs_generate_queries/databases/iotdb/devops.go new file mode 100644 index 000000000..717bbabec --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/iotdb/devops.go @@ -0,0 +1,215 @@ +package iotdb + +import ( + "fmt" + "strings" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/pkg/query" +) + +// TODO: Remove the need for this by continuing to bubble up errors +func panicIfErr(err error) { + if err != nil { + panic(err.Error()) + } +} + +// Devops produces IoTDB-specific queries for all the devops query types. +type Devops struct { + *BaseGenerator + *devops.Core +} + +// modifyHostnames makes sure IP address can appear in the path. +// Node names in path can NOT contain "." unless enclosing it within either single quote (') or double quote ("). +// In this case, quotes are recognized as part of the node name to avoid ambiguity. +func (d *Devops) modifyHostnames(hostnames []string) []string { + for i, hostname := range hostnames { + if strings.Contains(hostname, ".") { + if !(hostname[:1] == "`" && hostname[len(hostname)-1:] == "`") { + // not modified yet + hostnames[i] = "`" + hostnames[i] + "`" + } + + } + } + return hostnames +} + +// getHostFromWithHostnames creates FROM SQL statement for multiple hostnames. +// e.g. A storage group "root.cpu" has two devices. +// Two hostnames are "host1" and "host2" +// This function returns "root.cpu.host1, root.cpu.host2" (without "FROM") +func (d *Devops) getHostFromWithHostnames(hostnames []string) string { + hostnames = d.modifyHostnames(hostnames) + var hostnameClauses []string + for _, hostname := range hostnames { + hostnameClauses = append(hostnameClauses, fmt.Sprintf("%s.cpu.%s", d.BasicPath, hostname)) + } + return strings.Join(hostnameClauses, ", ") +} + +// getHostFromString gets multiple random hostnames and creates a FROM SQL statement for these hostnames. +// e.g. A storage group "root.cpu" has two devices, named "host1" and "host2" +// Two paths for them are "root.cpu.host1" and "root.cpu.host2" +// This function returns "root.cpu.host1, root.cpu.host2" (without "FROM") +func (d *Devops) getHostFromString(nHosts int) string { + hostnames, err := d.GetRandomHosts(nHosts) + panicIfErr(err) + fromClauses := d.getHostFromWithHostnames(hostnames) + return fromClauses +} + +// getSelectClausesAggMetrics gets clauses for aggregate functions. +func (d *Devops) getSelectClausesAggMetrics(agg string, metrics []string) []string { + selectClauses := make([]string, len(metrics)) + for i, m := range metrics { + selectClauses[i] = fmt.Sprintf("%s(%s)", agg, m) + } + + return selectClauses +} + +// getSelectClausesAggMetricsString gets a whole select clause for aggregate functions. +func (d *Devops) getSelectClausesAggMetricsString(agg string, metrics []string) string { + selectClauses := d.getSelectClausesAggMetrics(agg, metrics) + return strings.Join(selectClauses, ", ") +} + +// GroupByTime selects the MAX for numMetrics metrics under 'cpu', +// per minute for nhosts hosts, +// e.g. in pseudo-SQL: +// +// SELECT minute, max(metric1), ..., max(metricN) +// FROM cpu +// WHERE (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' AND time < '$HOUR_END' +// GROUP BY minute ORDER BY minute ASC +func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { + interval := d.Interval.MustRandWindow(timeRange) + metrics, err := devops.GetCPUMetricsSlice(numMetrics) + panicIfErr(err) + selectClause := d.getSelectClausesAggMetricsString("MAX_VALUE", metrics) + fromHosts := d.getHostFromString(nHosts) + + humanLabel := fmt.Sprintf("IoTDB %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + sql := "" + sql = sql + fmt.Sprintf("SELECT %s", selectClause) + sql = sql + fmt.Sprintf(" FROM %s", fromHosts) + sql = sql + fmt.Sprintf(" GROUP BY ([%s, %s), 1m), LEVEL = %d", interval.Start().Format(iotdbTimeFmt), interval.End().Format(iotdbTimeFmt), d.BasicPathLevel+1) + + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// GroupByTimeAndPrimaryTag selects the AVG of numMetrics metrics under 'cpu' per device per hour for a day, +// e.g. in pseudo-SQL: +// +// SELECT AVG(metric1), ..., AVG(metricN) +// FROM cpu +// WHERE time >= '$HOUR_START' AND time < '$HOUR_END' +// GROUP BY hour, hostname ORDER BY hour +func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { + metrics, err := devops.GetCPUMetricsSlice(numMetrics) + panicIfErr(err) + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) + selectClause := d.getSelectClausesAggMetricsString("AVG", metrics) + + humanLabel := devops.GetDoubleGroupByLabel("IoTDB", numMetrics) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + sql := "" + sql = sql + fmt.Sprintf("SELECT %s", selectClause) + sql = sql + fmt.Sprintf(" FROM %s.cpu.*", d.BasicPath) + sql = sql + fmt.Sprintf(" GROUP BY ([%s, %s), 1h)", interval.Start().Format(iotdbTimeFmt), interval.End().Format(iotdbTimeFmt)) + + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// LastPointPerHost finds the last row for every host in the dataset +func (d *Devops) LastPointPerHost(qi query.Query) { + humanLabel := "IoTDB last row per host" + humanDesc := humanLabel + ": cpu" + + sql := fmt.Sprintf("SELECT LAST * FROM %s.cpu.*", d.BasicPath) + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for nhosts hosts, +// e.g. in pseudo-SQL: +// +// SELECT MAX(metric1), ..., MAX(metricN) +// FROM cpu WHERE (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' AND time < '$HOUR_END' +// GROUP BY hour ORDER BY hour +func (d *Devops) MaxAllCPU(qi query.Query, nHosts int, duration time.Duration) { + interval := d.Interval.MustRandWindow(duration) + fromHosts := d.getHostFromString(nHosts) + selectClause := d.getSelectClausesAggMetricsString("MAX_VALUE", devops.GetAllCPUMetrics()) + + humanLabel := devops.GetMaxAllLabel("IoTDB", nHosts) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + sql := "" + sql = sql + fmt.Sprintf("SELECT %s", selectClause) + sql = sql + fmt.Sprintf(" FROM %s", fromHosts) + sql = sql + fmt.Sprintf(" GROUP BY ([%s, %s), 1h), LEVEL=%d", interval.Start().Format(iotdbTimeFmt), interval.End().Format(iotdbTimeFmt), d.BasicPathLevel+1) + + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// GroupByOrderByLimit benchmarks a query that has a time WHERE clause, that groups by a truncated date, orders by that date, and takes a limit: +// SELECT date_trunc('minute', time) AS t, MAX(cpu) FROM cpu +// WHERE time < '$TIME' +// GROUP BY t ORDER BY t DESC +// LIMIT $LIMIT +func (d *Devops) GroupByOrderByLimit(qi query.Query) { + interval := d.Interval.MustRandWindow(time.Hour) + selectClause := d.getSelectClausesAggMetricsString("MAX_VALUE", []string{"usage_user"}) + + sql := "" + sql = sql + fmt.Sprintf("SELECT %s", selectClause) + sql = sql + fmt.Sprintf(" FROM %s.cpu.*", d.BasicPath) + startTime := interval.Start() + endTime := interval.End() + optimizedStartTime := endTime.Add(time.Second * -300) // 5 mins ago + if optimizedStartTime.After(startTime) { + startTime = optimizedStartTime + } + sql = sql + fmt.Sprintf(" GROUP BY ([%s, %s), 1m), LEVEL = %d", startTime.Format(iotdbTimeFmt), endTime.Format(iotdbTimeFmt), d.BasicPathLevel+1) + sql = sql + " ORDER BY TIME DESC LIMIT 5" + + humanLabel := "IoTDB max cpu over last 5 min-intervals (random end)" + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// HighCPUForHosts populates a query that gets CPU metrics when the CPU has high +// usage between a time period for a number of hosts (if 0, it will search all hosts), +// e.g. in pseudo-SQL: +// +// SELECT * FROM cpu +// WHERE usage_user > 90.0 +// AND time >= '$TIME_START' AND time < '$TIME_END' +// AND (hostname = '$HOST' OR hostname = '$HOST2'...) +func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { + interval := d.Interval.MustRandWindow(devops.HighCPUDuration) + + var fromHosts string + if nHosts <= 0 { + fromHosts = fmt.Sprintf("%s.cpu.*", d.BasicPath) + } else { + fromHosts = d.getHostFromString(nHosts) + } + + humanLabel, err := devops.GetHighCPULabel("IoTDB", nHosts) + panicIfErr(err) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + + sql := "SELECT *" + sql = sql + fmt.Sprintf(" FROM %s", fromHosts) + sql = sql + fmt.Sprintf(" WHERE usage_user > 90 AND time >= %s AND time < %s ALIGN BY DEVICE", interval.Start().Format(iotdbTimeFmt), interval.End().Format(iotdbTimeFmt)) + + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} diff --git a/cmd/tsbs_generate_queries/databases/iotdb/devops_test.go b/cmd/tsbs_generate_queries/databases/iotdb/devops_test.go new file mode 100644 index 000000000..e405f1bd7 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/iotdb/devops_test.go @@ -0,0 +1,370 @@ +package iotdb + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" +) + +func TestModifyHostnames(t *testing.T) { + cases := []struct { + description string + hostnames []string + expected []string + }{ + { + description: "normal node name", + hostnames: []string{"hostname", "hello_world"}, + expected: []string{"hostname", "hello_world"}, + }, + { + description: "IP address or URL as hostnames", + hostnames: []string{"192.168.1.1", "8.8.8.8", "iotdb.apache.org"}, + expected: []string{"`192.168.1.1`", "`8.8.8.8`", "`iotdb.apache.org`"}, + }, + { + description: "already modified cases", + hostnames: []string{"`192.168.1.1`", "`8.8.8.8`", "`iotdb.apache.org`"}, + expected: []string{"`192.168.1.1`", "`8.8.8.8`", "`iotdb.apache.org`"}, + }, + { + description: "mixed host names", + hostnames: []string{"192.168.1.1", "hostname", "iotdb.apache.org", "`8.8.8.8`"}, + expected: []string{"`192.168.1.1`", "hostname", "`iotdb.apache.org`", "`8.8.8.8`"}, + }, + } + + for _, c := range cases { + t.Run(c.description, func(t *testing.T) { + b := BaseGenerator{BasicPath: "root", BasicPathLevel: 0} + queryGenerator, err := b.NewDevops(time.Now(), time.Now(), 10) + require.NoError(t, err, "Error while creating devops generator") + dp := queryGenerator.(*Devops) + + actual := dp.modifyHostnames(c.hostnames) + require.EqualValues(t, c.expected, actual) + }) + } +} + +func TestDevopsGetHostFromString(t *testing.T) { + cases := []struct { + description string + basicPath string + basicPathLevel int32 + hostnames []string + expected string + }{ + { + description: "single host", + basicPath: "root", + basicPathLevel: 0, + hostnames: []string{"host1"}, + expected: "root.cpu.host1", + }, + { + description: "multi host (2)", + basicPath: "root", + basicPathLevel: 0, + hostnames: []string{"host1", "host2"}, + expected: "root.cpu.host1, root.cpu.host2", + }, + { + description: "multi host (2) with storage group", + basicPath: "root.ln", + basicPathLevel: 1, + hostnames: []string{"host1", "host2"}, + expected: "root.ln.cpu.host1, root.ln.cpu.host2", + }, + { + description: "multi host (3) with special node names", + basicPath: "root", + basicPathLevel: 0, + hostnames: []string{"host1", "192.168.1.1", "`iotdb.apache.org`"}, + expected: "root.cpu.host1, root.cpu.`192.168.1.1`, root.cpu.`iotdb.apache.org`", + }, + } + + for _, c := range cases { + t.Run(c.description, func(t *testing.T) { + b := BaseGenerator{BasicPath: c.basicPath, BasicPathLevel: c.basicPathLevel} + queryGenerator, err := b.NewDevops(time.Now(), time.Now(), 10) + require.NoError(t, err, "Error while creating devops generator") + dp := queryGenerator.(*Devops) + + actual := dp.getHostFromWithHostnames(c.hostnames) + require.EqualValues(t, c.expected, actual) + }) + } +} + +func TestDevopsGetSelectClausesAggMetricsString(t *testing.T) { + cases := []struct { + description string + agg string + metrics []string + expected string + }{ + { + description: "single metric - max", + agg: "MAX_VALUE", + metrics: []string{"value"}, + expected: "MAX_VALUE(value)", + }, + { + description: "multiple metric - max", + agg: "MAX_VALUE", + metrics: []string{"temperature", "frequency"}, + expected: "MAX_VALUE(temperature), MAX_VALUE(frequency)", + }, + { + description: "multiple metric - avg", + agg: "AVG", + metrics: []string{"temperature", "frequency"}, + expected: "AVG(temperature), AVG(frequency)", + }, + } + + for _, c := range cases { + t.Run(c.description, func(t *testing.T) { + b := BaseGenerator{BasicPath: "root", BasicPathLevel: 0} + queryGenerator, err := b.NewDevops(time.Now(), time.Now(), 10) + require.NoError(t, err, "Error while creating devops generator") + d := queryGenerator.(*Devops) + + actual := d.getSelectClausesAggMetricsString(c.agg, c.metrics) + require.EqualValues(t, c.expected, actual) + }) + } +} + +func TestGroupByTime(t *testing.T) { + rand.Seed(123) // Setting seed for testing purposes. + start := time.Unix(0, 0) + end := start.Add(time.Hour) + base := BaseGenerator{BasicPath: "root", BasicPathLevel: 0} + queryGenerator, err := base.NewDevops(start, end, 10) + require.NoError(t, err, "Error while creating devops generator") + dp := queryGenerator.(*Devops) + + metrics := 1 + nHosts := 1 + duration := time.Second + + actual := dp.GenerateEmptyQuery() + expected := dp.GenerateEmptyQuery() + dp.fillInQuery(expected, + "IoTDB 1 cpu metric(s), random 1 hosts, random 1s by 1m", + "IoTDB 1 cpu metric(s), random 1 hosts, random 1s by 1m: 1970-01-01T00:05:58Z", + "SELECT MAX_VALUE(usage_user) FROM root.cpu.host_9 GROUP BY ([1970-01-01 00:05:58, 1970-01-01 00:05:59), 1m), LEVEL = 1", + ) + dp.GroupByTime(actual, nHosts, metrics, duration) + + require.EqualValues(t, expected, actual) +} + +func TestGroupByTimeAndPrimaryTag(t *testing.T) { + cases := []struct { + description string + numMetrics int + baseGenerator BaseGenerator + expectedHumanLabel string + expectedHumanDesc string + expectedSQLQuery string + }{ + { + description: "1 metric with storage group 'root.sg'", + numMetrics: 1, + baseGenerator: BaseGenerator{BasicPath: "root.sg", BasicPathLevel: 1}, + expectedHumanLabel: "IoTDB mean of 1 metrics, all hosts, random 12h0m0s by 1h", + expectedHumanDesc: "IoTDB mean of 1 metrics, all hosts, random 12h0m0s by 1h: 1970-01-01T00:16:22Z", + expectedSQLQuery: "SELECT AVG(usage_user) FROM root.sg.cpu.* GROUP BY ([1970-01-01 00:16:22, 1970-01-01 12:16:22), 1h)", + }, + { + description: "5 metric with storage group 'root'", + numMetrics: 5, + baseGenerator: BaseGenerator{BasicPath: "root", BasicPathLevel: 0}, + expectedHumanLabel: "IoTDB mean of 5 metrics, all hosts, random 12h0m0s by 1h", + expectedHumanDesc: "IoTDB mean of 5 metrics, all hosts, random 12h0m0s by 1h: 1970-01-01T00:16:22Z", + expectedSQLQuery: "SELECT AVG(usage_user), AVG(usage_system), AVG(usage_idle), AVG(usage_nice), AVG(usage_iowait) FROM root.cpu.* GROUP BY ([1970-01-01 00:16:22, 1970-01-01 12:16:22), 1h)", + }, + } + + start := time.Unix(0, 0) + end := start.Add(devops.DoubleGroupByDuration).Add(time.Hour) + + for _, c := range cases { + t.Run(c.description, func(t *testing.T) { + rand.Seed(123) // Setting seed for testing purposes. + b := c.baseGenerator + queryGenerator, err := b.NewDevops(start, end, 10) + require.NoError(t, err, "Error while creating devops generator") + dp := queryGenerator.(*Devops) + + actual := dp.GenerateEmptyQuery() + expected := dp.GenerateEmptyQuery() + + dp.fillInQuery(expected, c.expectedHumanLabel, c.expectedHumanDesc, c.expectedSQLQuery) + dp.GroupByTimeAndPrimaryTag(actual, c.numMetrics) + + require.EqualValues(t, expected, actual) + }) + } +} + +func TestLastPointPerHost(t *testing.T) { + rand.Seed(123) // Setting seed for testing purposes. + base := BaseGenerator{BasicPath: "root.sg", BasicPathLevel: 1} + queryGenerator, err := base.NewDevops(time.Now(), time.Now(), 10) + require.NoError(t, err, "Error while creating devops generator") + dp := queryGenerator.(*Devops) + + actual := dp.GenerateEmptyQuery() + expected := dp.GenerateEmptyQuery() + dp.fillInQuery(expected, + "IoTDB last row per host", + "IoTDB last row per host: cpu", + "SELECT LAST * FROM root.sg.cpu.*", + ) + dp.LastPointPerHost(actual) + + require.EqualValues(t, expected, actual) +} + +func TestMaxAllCPU(t *testing.T) { + cases := []struct { + description string + nHosts int + baseGenerator BaseGenerator + expectedHumanLabel string + expectedHumanDesc string + expectedSQLQuery string + }{ + { + description: "1 host with storage group 'root'", + nHosts: 1, + baseGenerator: BaseGenerator{BasicPath: "root", BasicPathLevel: 0}, + expectedHumanLabel: "IoTDB max of all CPU metrics, random 1 hosts, random 8h0m0s by 1h", + expectedHumanDesc: "IoTDB max of all CPU metrics, random 1 hosts, random 8h0m0s by 1h: 1970-01-01T02:16:22Z", + expectedSQLQuery: "SELECT MAX_VALUE(usage_user), MAX_VALUE(usage_system), MAX_VALUE(usage_idle), " + + "MAX_VALUE(usage_nice), MAX_VALUE(usage_iowait), MAX_VALUE(usage_irq), MAX_VALUE(usage_softirq), " + + "MAX_VALUE(usage_steal), MAX_VALUE(usage_guest), MAX_VALUE(usage_guest_nice) " + + "FROM root.cpu.host_9 GROUP BY ([1970-01-01 02:16:22, 1970-01-01 10:16:22), 1h), LEVEL=1", + }, + { + description: "3 hosts with storage group 'root'", + nHosts: 3, + baseGenerator: BaseGenerator{BasicPath: "root", BasicPathLevel: 0}, + expectedHumanLabel: "IoTDB max of all CPU metrics, random 3 hosts, random 8h0m0s by 1h", + expectedHumanDesc: "IoTDB max of all CPU metrics, random 3 hosts, random 8h0m0s by 1h: 1970-01-01T02:16:22Z", + expectedSQLQuery: "SELECT MAX_VALUE(usage_user), MAX_VALUE(usage_system), MAX_VALUE(usage_idle), " + + "MAX_VALUE(usage_nice), MAX_VALUE(usage_iowait), MAX_VALUE(usage_irq), MAX_VALUE(usage_softirq), " + + "MAX_VALUE(usage_steal), MAX_VALUE(usage_guest), MAX_VALUE(usage_guest_nice) " + + "FROM root.cpu.host_9, root.cpu.host_3, root.cpu.host_5 GROUP BY ([1970-01-01 02:16:22, 1970-01-01 10:16:22), 1h), LEVEL=1", + }, + } + + start := time.Unix(0, 0) + end := start.Add(devops.DoubleGroupByDuration).Add(time.Hour) + + for _, c := range cases { + t.Run(c.description, func(t *testing.T) { + rand.Seed(123) // Setting seed for testing purposes. + b := c.baseGenerator + queryGenerator, err := b.NewDevops(start, end, 10) + require.NoError(t, err, "Error while creating devops generator") + dp := queryGenerator.(*Devops) + + actual := dp.GenerateEmptyQuery() + expected := dp.GenerateEmptyQuery() + + dp.fillInQuery(expected, c.expectedHumanLabel, c.expectedHumanDesc, c.expectedSQLQuery) + dp.MaxAllCPU(actual, c.nHosts, devops.MaxAllDuration) + + require.EqualValues(t, expected, actual) + }) + } +} + +func TestGroupByOrderByLimit(t *testing.T) { + rand.Seed(123) // Setting seed for testing purposes. + base := BaseGenerator{BasicPath: "root", BasicPathLevel: 0} + start := time.Unix(0, 0) + end := start.Add(2 * time.Hour) + queryGenerator, err := base.NewDevops(start, end, 10) + require.NoError(t, err, "Error while creating devops generator") + dp := queryGenerator.(*Devops) + + actual := dp.GenerateEmptyQuery() + expected := dp.GenerateEmptyQuery() + dp.fillInQuery(expected, + "IoTDB max cpu over last 5 min-intervals (random end)", + "IoTDB max cpu over last 5 min-intervals (random end): 1970-01-01T00:16:22Z", + "SELECT MAX_VALUE(usage_user) FROM root.cpu.* GROUP BY ([1970-01-01 01:11:22, 1970-01-01 01:16:22), 1m), LEVEL = 1 ORDER BY TIME DESC LIMIT 5", + ) + dp.GroupByOrderByLimit(actual) + + require.EqualValues(t, expected, actual) +} + +func TestHighCPUForHosts(t *testing.T) { + cases := []struct { + description string + nHosts int + baseGenerator BaseGenerator + expectedHumanLabel string + expectedHumanDesc string + expectedSQLQuery string + }{ + { + description: "ALL host with storage group 'root'", + nHosts: 0, + baseGenerator: BaseGenerator{BasicPath: "root", BasicPathLevel: 0}, + expectedHumanLabel: "IoTDB CPU over threshold, all hosts", + expectedHumanDesc: "IoTDB CPU over threshold, all hosts: 1970-01-01T00:16:22Z", + expectedSQLQuery: "SELECT * FROM root.cpu.* WHERE usage_user > 90 AND time >= 1970-01-01 00:16:22 AND time < 1970-01-01 12:16:22 ALIGN BY DEVICE", + }, + { + description: "1 host with storage group 'root.sg.abc'", + nHosts: 1, + baseGenerator: BaseGenerator{BasicPath: "root.sg.abc", BasicPathLevel: 2}, + expectedHumanLabel: "IoTDB CPU over threshold, 1 host(s)", + expectedHumanDesc: "IoTDB CPU over threshold, 1 host(s): 1970-01-01T00:16:22Z", + expectedSQLQuery: "SELECT * FROM root.sg.abc.cpu.host_9 WHERE usage_user > 90 AND time >= 1970-01-01 00:16:22 AND time < 1970-01-01 12:16:22 ALIGN BY DEVICE", + }, + { + description: "5 host2 with storage group 'root.ln'", + nHosts: 5, + baseGenerator: BaseGenerator{BasicPath: "root.ln", BasicPathLevel: 1}, + expectedHumanLabel: "IoTDB CPU over threshold, 5 host(s)", + expectedHumanDesc: "IoTDB CPU over threshold, 5 host(s): 1970-01-01T00:16:22Z", + expectedSQLQuery: "SELECT * FROM root.ln.cpu.host_9, root.ln.cpu.host_3, root.ln.cpu.host_5, root.ln.cpu.host_1, root.ln.cpu.host_7 WHERE usage_user > 90 AND time >= 1970-01-01 00:16:22 AND time < 1970-01-01 12:16:22 ALIGN BY DEVICE", + }, + } + + start := time.Unix(0, 0) + end := start.Add(devops.HighCPUDuration).Add(time.Hour) + + for _, c := range cases { + t.Run(c.description, func(t *testing.T) { + rand.Seed(123) // Setting seed for testing purposes. + b := c.baseGenerator + queryGenerator, err := b.NewDevops(start, end, 10) + require.NoError(t, err, "Error while creating devops generator") + dp := queryGenerator.(*Devops) + + actual := dp.GenerateEmptyQuery() + expected := dp.GenerateEmptyQuery() + + dp.fillInQuery(expected, c.expectedHumanLabel, c.expectedHumanDesc, c.expectedSQLQuery) + dp.HighCPUForHosts(actual, c.nHosts) + fmt.Println(actual) + + require.EqualValues(t, expected, actual) + }) + } +} diff --git a/cmd/tsbs_load_iotdb/benchmark.go b/cmd/tsbs_load_iotdb/benchmark.go new file mode 100644 index 000000000..c9b3ff645 --- /dev/null +++ b/cmd/tsbs_load_iotdb/benchmark.go @@ -0,0 +1,51 @@ +package main + +import ( + "bufio" + + "github.com/apache/iotdb-client-go/client" + "github.com/timescale/tsbs/load" + "github.com/timescale/tsbs/pkg/targets" +) + +func newBenchmark(clientConfig client.Config, loaderConfig load.BenchmarkRunnerConfig) targets.Benchmark { + return &iotdbBenchmark{ + cilentConfig: clientConfig, + loaderConfig: loaderConfig, + recordsMaxRows: recordsMaxRows, + } +} + +type iotdbBenchmark struct { + cilentConfig client.Config + loaderConfig load.BenchmarkRunnerConfig + recordsMaxRows int +} + +func (b *iotdbBenchmark) GetDataSource() targets.DataSource { + return &fileDataSource{scanner: bufio.NewScanner(load.GetBufferedReader(b.loaderConfig.FileName))} +} + +func (b *iotdbBenchmark) GetBatchFactory() targets.BatchFactory { + return &factory{} +} + +func (b *iotdbBenchmark) GetPointIndexer(maxPartitions uint) targets.PointIndexer { + return &targets.ConstantIndexer{} +} + +func (b *iotdbBenchmark) GetProcessor() targets.Processor { + return &processor{ + recordsMaxRows: b.recordsMaxRows, + loadToSCV: loadToSCV, + csvFilepathPrefix: csvFilepathPrefix, + useAlignedTimeseries: useAlignedTimeseries, + storeTags: storeTags, + } +} + +func (b *iotdbBenchmark) GetDBCreator() targets.DBCreator { + return &dbCreator{ + loadToSCV: loadToSCV, + } +} diff --git a/cmd/tsbs_load_iotdb/creator.go b/cmd/tsbs_load_iotdb/creator.go new file mode 100644 index 000000000..e3e09c1e2 --- /dev/null +++ b/cmd/tsbs_load_iotdb/creator.go @@ -0,0 +1,82 @@ +package main + +import ( + "fmt" + + "github.com/apache/iotdb-client-go/client" +) + +// DBCreator is an interface for a benchmark to do the initial setup of a database +// in preparation for running a benchmark against it. + +type dbCreator struct { + session client.Session + loadToSCV bool +} + +func (d *dbCreator) Init() { + if !d.loadToSCV { + // no need to connect to database, because user want to generate csv files + d.session = client.NewSession(&clientConfig) + if err := d.session.Open(false, timeoutInMs); err != nil { + errMsg := fmt.Sprintf("IoTDB dbCreator init error, session is not open: %v\n", err) + errMsg = errMsg + fmt.Sprintf("timeout setting: %d ms", timeoutInMs) + fatal(errMsg) + } + } +} + +// get all Storage Group +func (d *dbCreator) getAllStorageGroup() ([]string, error) { + var sql = "show storage group" + sessionDataSet, err := d.session.ExecuteStatement(sql) + if err != nil { + return []string{}, err + } + + var sgList []string + + for next, err := sessionDataSet.Next(); err == nil && next; next, err = sessionDataSet.Next() { + for i := 0; i < sessionDataSet.GetColumnCount(); i++ { + columnName := sessionDataSet.GetColumnName(i) + switch sessionDataSet.GetColumnDataType(i) { + case client.TEXT: + sgList = append(sgList, sessionDataSet.GetText(columnName)) + default: + } + } + } + return sgList, nil +} + +func (d *dbCreator) DBExists(dbName string) bool { + if !d.loadToSCV { + // no need to connect to database, because user want to generate csv files + sgList, err := d.getAllStorageGroup() + if err != nil { + fatal("DBExists error: %v", err) + return false + } + sg := fmt.Sprintf("root.%s", dbName) + for _, thisSG := range sgList { + if thisSG == sg { + return true + } + } + } + return false +} + +func (d *dbCreator) CreateDB(dbName string) error { + return nil +} + +func (d *dbCreator) RemoveOldDB(dbName string) error { + if !d.loadToSCV { + // no need to connect to database, because user want to generate csv files + sg := fmt.Sprintf("root.%s", dbName) + _, err := d.session.DeleteStorageGroup(sg) + return err + } + return nil +} diff --git a/cmd/tsbs_load_iotdb/main.go b/cmd/tsbs_load_iotdb/main.go new file mode 100644 index 000000000..b3a7131b5 --- /dev/null +++ b/cmd/tsbs_load_iotdb/main.go @@ -0,0 +1,109 @@ +// tsbs_load_iotdb loads an IoTDB daemon with data from stdin. +// +// The caller is responsible for assuring that the database is empty before +// tsbs load. +package main + +import ( + "fmt" + "log" + + "github.com/blagojts/viper" + "github.com/spf13/pflag" + "github.com/timescale/tsbs/internal/utils" + "github.com/timescale/tsbs/load" + "github.com/timescale/tsbs/pkg/targets" + "github.com/timescale/tsbs/pkg/targets/constants" + "github.com/timescale/tsbs/pkg/targets/initializers" + + "github.com/apache/iotdb-client-go/client" +) + +// database option vars +var ( + clientConfig client.Config + timeoutInMs int // 0 for no timeout + recordsMaxRows int // max rows of records in 'InsertRecords' + loadToSCV bool // if true, do NOT insert into databases, but generate csv files instead. + csvFilepathPrefix string // Prefix of filepath for csv files. Specific a folder or a folder with filename prefix. + useAlignedTimeseries bool // using aligned timeseries if set true. + storeTags bool // store tags if set true. Can NOT be used if useAlignedTimeseries is set true. +) + +// Global vars +var ( + target targets.ImplementedTarget + + loaderConfig load.BenchmarkRunnerConfig + loader load.BenchmarkRunner +) + +// allows for testing +var fatal = log.Fatalf + +// Parse args: +func init() { + target = initializers.GetTarget(constants.FormatIoTDB) + loaderConfig = load.BenchmarkRunnerConfig{} + loaderConfig.AddToFlagSet(pflag.CommandLine) + target.TargetSpecificFlags("", pflag.CommandLine) + pflag.Parse() + + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + if err := viper.Unmarshal(&loaderConfig); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + + host := viper.GetString("host") + port := viper.GetString("port") + user := viper.GetString("user") + password := viper.GetString("password") + timeoutInMs = viper.GetInt("timeout") + recordsMaxRows = viper.GetInt("records-max-rows") + loadToSCV = viper.GetBool("to-csv") + csvFilepathPrefix = viper.GetString("csv-prefix") + useAlignedTimeseries = viper.GetBool("aligned-timeseries") + storeTags = viper.GetBool("store-tags") + + workers := viper.GetUint("workers") + + timeoutStr := fmt.Sprintf("timeout for session opening check: %d ms", timeoutInMs) + if timeoutInMs <= 0 { + timeoutInMs = 0 // 0 for no timeout. + timeoutStr = "no timeout for session opening check" + } + log.Printf("tsbs_load_iotdb target: %s:%s, %s. Loading with %d workers.\n", host, port, timeoutStr, workers) + if loadToSCV && workers != 1 { + err_msg := "Arguments conflicts! When using csv export method, `workers` should NOT be set more than 1. " + err_msg += fmt.Sprintf("Current setting: `to-csv`=%v, `workers`=%d.", loadToSCV, workers) + log.Println(err_msg) + panic(err_msg) + } + if useAlignedTimeseries && storeTags { + warn_msg := "[Waring] Can NOT store tags while using aligned timeseries!" + warn_msg += " Because IoTDB do NOT support 'attributes' and 'tags' for aligned timeseries yet." + log.Println(warn_msg) + warn_msg = "Automatic parameter correction: 'store-tags' is set to false." + log.Println(warn_msg) + storeTags = false + } + + clientConfig = client.Config{ + Host: host, + Port: port, + UserName: user, + Password: password, + } + + loader = load.GetBenchmarkRunner(loaderConfig) +} + +func main() { + benchmark := newBenchmark(clientConfig, loaderConfig) + + loader.RunBenchmark(benchmark) +} diff --git a/cmd/tsbs_load_iotdb/process.go b/cmd/tsbs_load_iotdb/process.go new file mode 100644 index 000000000..ac38b046a --- /dev/null +++ b/cmd/tsbs_load_iotdb/process.go @@ -0,0 +1,196 @@ +package main + +import ( + "fmt" + "os" + "strconv" + "strings" + + "github.com/apache/iotdb-client-go/client" + "github.com/timescale/tsbs/pkg/targets" + "github.com/timescale/tsbs/pkg/targets/iotdb" +) + +type processor struct { + numWorker int // the worker(like thread) ID of this processor + session client.Session + recordsMaxRows int // max rows of records in 'InsertRecords' + ProcessedTagsDeviceIDMap map[string]bool // already processed device ID + + loadToSCV bool // if true, do NOT insert into databases, but generate csv files instead. + csvFilepathPrefix string // Prefix of filepath for csv files. Specific a folder or a folder with filename prefix. + filePtrMap map[string]*os.File // file pointer for each deviceID + + useAlignedTimeseries bool // using aligned timeseries if set true. + storeTags bool // store tags if set true. Can NOT be used if useAlignedTimeseries is set true. +} + +func (p *processor) Init(numWorker int, doLoad, _ bool) { + p.numWorker = numWorker + if !doLoad { + return + } + if p.loadToSCV { + p.filePtrMap = make(map[string]*os.File) + } else { + p.ProcessedTagsDeviceIDMap = make(map[string]bool) + p.session = client.NewSession(&clientConfig) + if err := p.session.Open(false, timeoutInMs); err != nil { + errMsg := fmt.Sprintf("IoTDB processor init error, session is not open: %v, ", err) + errMsg = errMsg + fmt.Sprintf("timeout setting: %d ms\n", timeoutInMs) + fatal(errMsg) + } + } +} + +type records struct { + deviceId []string + measurements [][]string + dataTypes [][]client.TSDataType + values [][]interface{} + timestamps []int64 +} + +func (p *processor) pointsToRecords(points []*iotdbPoint) (records, []string) { + var rcds records + var sqlList []string + for _, row := range points { + rcds.deviceId = append(rcds.deviceId, row.deviceID) + rcds.measurements = append(rcds.measurements, row.measurements) + rcds.dataTypes = append(rcds.dataTypes, row.dataTypes) + rcds.values = append(rcds.values, row.values) + rcds.timestamps = append(rcds.timestamps, row.timestamp) + // append tags if "storeTags" is set true + if p.storeTags { + _, exist := p.ProcessedTagsDeviceIDMap[row.deviceID] + if !exist { + sqlList = append(sqlList, row.generateTagsAttributesSQL()) + p.ProcessedTagsDeviceIDMap[row.deviceID] = true + } + } + } + return rcds, sqlList +} + +func minInt(x int, y int) int { + if x < y { + return x + } + return y +} + +func getStringOfDatatype(datatype client.TSDataType) string { + switch datatype { + case client.BOOLEAN: + return "BOOLEAN" + case client.DOUBLE: + return "DOUBLE" + case client.FLOAT: + return "FLOAT" + case client.INT32: + return "INT32" + case client.INT64: + return "INT64" + case client.TEXT: + return "TEXT" + case client.UNKNOW: + return "UNKNOW" + default: + return "UNKNOW" + } +} + +func generateCSVHeader(point *iotdbPoint) string { + header := "Time" + for index, dataType := range point.dataTypes { + meta := fmt.Sprintf(",%s.%s(%s)", point.deviceID, point.measurements[index], + getStringOfDatatype(dataType)) + header = header + meta + } + header = header + "\n" + return header +} + +func generateCSVContent(point *iotdbPoint) string { + var valueList []string + valueList = append(valueList, strconv.FormatInt(point.timestamp, 10)) + for _, value := range point.values { + valueInStrByte, _ := iotdb.IotdbFormat(value) + valueList = append(valueList, string(valueInStrByte)) + } + content := strings.Join(valueList, ",") + content += "\n" + return content +} + +func (p *processor) ProcessBatch(b targets.Batch, doLoad bool) (metricCount, rowCount uint64) { + batch := b.(*iotdbBatch) + + // Write records + if doLoad { + if !p.loadToSCV { + // insert records into the database + var sqlList []string + for index := 0; index < len(batch.points); { + startIndex := index + var endIndex int + if p.recordsMaxRows > 0 { + endIndex = minInt(len(batch.points), index+p.recordsMaxRows) + } else { + endIndex = len(batch.points) + } + rcds, tempSqlList := p.pointsToRecords(batch.points[startIndex:endIndex]) + sqlList = append(sqlList, tempSqlList...) + // using relative API according to "aligned-timeseries" setting + var err error + if p.useAlignedTimeseries { + _, err = p.session.InsertAlignedRecords( + rcds.deviceId, rcds.measurements, rcds.dataTypes, rcds.values, rcds.timestamps, + ) + } else { + _, err = p.session.InsertRecords( + rcds.deviceId, rcds.measurements, rcds.dataTypes, rcds.values, rcds.timestamps, + ) + } + if err != nil { + fatal("ProcessBatch error:%v", err) + } + index = endIndex + } + // handle create timeseries SQL to insert tags + for _, sql := range sqlList { + _, err := p.session.ExecuteNonQueryStatement(sql) + if err != nil { + fatal("ProcessBatch SQL Execution error:%v", err) + } + } + } else { + // generate csv files. There is no requirement to connect to any database + for index := 0; index < len(batch.points); index++ { + point := batch.points[index] + _, exist := p.filePtrMap[point.deviceID] + if !exist { + // create file pointer + filepath := fmt.Sprintf("%s%s.csv", p.csvFilepathPrefix, point.deviceID) + filePtr, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0777) + if err != nil { + fatal(fmt.Sprintf("ERROR occurs while creating csv file for deviceID: %s, filepath: %s", point.deviceID, filepath)) + panic(err) + } + p.filePtrMap[point.deviceID] = filePtr + // write header of this csv file + header := generateCSVHeader(point) + filePtr.WriteString(header) + } + filePtr := p.filePtrMap[point.deviceID] + pointRowInCSV := generateCSVContent(point) + filePtr.WriteString(pointRowInCSV) + } + } + + } + + metricCount = batch.metrics + rowCount = uint64(batch.rows) + return metricCount, rowCount +} diff --git a/cmd/tsbs_load_iotdb/scan.go b/cmd/tsbs_load_iotdb/scan.go new file mode 100644 index 000000000..7bbb034ac --- /dev/null +++ b/cmd/tsbs_load_iotdb/scan.go @@ -0,0 +1,198 @@ +package main + +import ( + "bufio" + "fmt" + "strconv" + "strings" + "time" + + "github.com/apache/iotdb-client-go/client" + "github.com/timescale/tsbs/pkg/data" + "github.com/timescale/tsbs/pkg/data/usecases/common" + "github.com/timescale/tsbs/pkg/targets" +) + +// iotdbPoint is a single record(row) of data +type iotdbPoint struct { + deviceID string // the deviceID(path) of this record, e.g. "root.cpu.host_0" + timestamp int64 + measurements []string + values []interface{} + dataTypes []client.TSDataType + tagString string + + fieldsCnt uint64 +} + +func (p *iotdbPoint) generateTagsAttributesSQL() string { + if p.tagString == "" { + // no tags for this host. This is not a normal behavior in benchmark. + return fmt.Sprintf("CREATE timeseries %s._tags with datatype=INT32, encoding=RLE, compression=SNAPPY", p.deviceID) + } + sql := "CREATE timeseries %s._tags with datatype=INT32, encoding=RLE, compression=SNAPPY attributes(%s)" + // sql2 := "ALTER timeseries %s._tags UPSERT attributes(%s)" + return fmt.Sprintf(sql, p.deviceID, p.tagString) +} + +// parse datatype and convert string into interface +func parseDataToInterface(datatype client.TSDataType, str string) (interface{}, error) { + switch client.TSDataType(datatype) { + case client.BOOLEAN: + value, err := strconv.ParseBool(str) + return interface{}(value), err + case client.INT32: + value, err := strconv.ParseInt(str, 10, 32) + return interface{}(int32(value)), err + case client.INT64: + value, err := strconv.ParseInt(str, 10, 64) + return interface{}(int64(value)), err + case client.FLOAT: + value, err := strconv.ParseFloat(str, 32) + return interface{}(float32(value)), err + case client.DOUBLE: + value, err := strconv.ParseFloat(str, 64) + return interface{}(float64(value)), err + case client.TEXT: + return interface{}(str), nil + case client.UNKNOW: + return interface{}(nil), fmt.Errorf("datatype client.UNKNOW, value:%s", str) + default: + return interface{}(nil), fmt.Errorf("unknown datatype, value:%s", str) + } +} + +type fileDataSource struct { + scanner *bufio.Scanner +} + +// read new four line, which store one data point +// e.g., +// deviceID,timestamp,,,,... +// ,,,,,... +// datatype,,,,... +// tags,=,=,... +// +// deviceID,timestamp,hostname,value +// root.cpu.host_1,1451606400000000000,'host_1',44.0 +// datatype,5,2 +// tags,region='eu-west-1',datacenter='eu-west-1c',rack='87', +// +// return : bool -> true means got one point, else reaches EOF or error happens +func (d *fileDataSource) nextFourLines() (bool, string, string, string, string, error) { + ok := d.scanner.Scan() + if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF + return false, "", "", "", "", nil + } else if !ok { + return false, "", "", "", "", fmt.Errorf("scan error: %v", d.scanner.Err()) + } + line1 := d.scanner.Text() + line_ok := strings.HasPrefix(line1, "deviceID,timestamp,") + if !line_ok { + return false, line1, "", "", "", fmt.Errorf("scan error, illegal line: %s", line1) + } + ok = d.scanner.Scan() + if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF + return false, "", "", "", "", nil + } else if !ok { + return false, "", "", "", "", fmt.Errorf("scan error: %v", d.scanner.Err()) + } + line2 := d.scanner.Text() + ok = d.scanner.Scan() + if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF + return false, "", "", "", "", nil + } else if !ok { + return false, "", "", "", "", fmt.Errorf("scan error: %v", d.scanner.Err()) + } + line3 := d.scanner.Text() + ok = d.scanner.Scan() + if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF + return false, "", "", "", "", nil + } else if !ok { + return false, "", "", "", "", fmt.Errorf("scan error: %v", d.scanner.Err()) + } + line4 := d.scanner.Text() + return true, line1, line2, line3, line4, nil +} + +func parseFourLines(line1 string, line2 string, line3 string, line4 string) data.LoadedPoint { + line1_parts := strings.Split(line1, ",") // 'deviceID' and rest keys of fields + line2_parts := strings.Split(line2, ",") // deviceID and rest values of fields + line3_parts := strings.Split(line3, ",") // deviceID and rest values of fields + line4_parts := strings.SplitN(line4, ",", 2) // 'tags' and string of tags + timestamp, err := strconv.ParseInt(line2_parts[1], 10, 64) + if err != nil { + fatal("timestamp convert err: %v", err) + } + timestamp = int64(timestamp / int64(time.Millisecond)) + var measurements []string + var values []interface{} + var dataTypes []client.TSDataType + // handle measurements, datatype and values + measurements = append(measurements, line1_parts[2:]...) + for type_index := 1; type_index < len(line3_parts); type_index++ { + value_index := type_index + 1 + datatype, _ := strconv.ParseInt(line3_parts[type_index], 10, 8) + dataTypes = append(dataTypes, client.TSDataType(datatype)) + value, err := parseDataToInterface(client.TSDataType(datatype), line2_parts[value_index]) + if err != nil { + panic(fmt.Errorf("iotdb fileDataSource NextItem Parse error:%v", err)) + } + values = append(values, value) + } + tagString := "" + if len(line4_parts) > 1 { + tagString = line4_parts[1] + } + return data.NewLoadedPoint( + &iotdbPoint{ + deviceID: line2_parts[0], + timestamp: timestamp, + measurements: measurements, + values: values, + dataTypes: dataTypes, + tagString: tagString, + fieldsCnt: uint64(len(line1_parts) - 2), + }) +} + +func (d *fileDataSource) NextItem() data.LoadedPoint { + scan_ok, line1, line2, line3, line4, err := d.nextFourLines() + if !scan_ok { + if err == nil { // End of file + return data.LoadedPoint{} + } else { // Some error occurred + fatal("IoTDB scan error: %v", err) + return data.LoadedPoint{} + } + } + return parseFourLines(line1, line2, line3, line4) +} + +func (d *fileDataSource) Headers() *common.GeneratedDataHeaders { return nil } + +// A struct that storages data points +type iotdbBatch struct { + points []*iotdbPoint + rows uint // count of records(rows) + metrics uint64 // total count of all metrics in this batch +} + +func (b *iotdbBatch) Len() uint { + return b.rows +} + +func (b *iotdbBatch) Append(item data.LoadedPoint) { + b.rows++ + b.points = append(b.points, item.Data.(*iotdbPoint)) + b.metrics += item.Data.(*iotdbPoint).fieldsCnt +} + +type factory struct{} + +func (f *factory) New() targets.Batch { + return &iotdbBatch{ + rows: 0, + metrics: 0, + } +} diff --git a/cmd/tsbs_load_iotdb/scan_test.go b/cmd/tsbs_load_iotdb/scan_test.go new file mode 100644 index 000000000..ac996a06f --- /dev/null +++ b/cmd/tsbs_load_iotdb/scan_test.go @@ -0,0 +1,126 @@ +package main + +import ( + "testing" + + "github.com/apache/iotdb-client-go/client" + "github.com/stretchr/testify/require" +) + +func TestGenerateTagsAttributesSQL(t *testing.T) { + cases := []struct { + description string + point iotdbPoint + expectedSQL string + }{ + { + description: "no tags", + point: iotdbPoint{ + deviceID: "root.cpu.host_9", + tagString: "", + }, + expectedSQL: "CREATE timeseries root.cpu.host_9._tags with datatype=INT32, encoding=RLE, compression=SNAPPY", + }, + { + description: "one tag", + point: iotdbPoint{ + deviceID: "root.cpu.host_0", + tagString: "key1='value'", + }, + expectedSQL: "CREATE timeseries root.cpu.host_0._tags with datatype=INT32, encoding=RLE, compression=SNAPPY attributes(key1='value')", + }, + { + description: "one tag that type is int", + point: iotdbPoint{ + deviceID: "root.cpu.host_2", + tagString: "key1=123", + }, + expectedSQL: "CREATE timeseries root.cpu.host_2._tags with datatype=INT32, encoding=RLE, compression=SNAPPY attributes(key1=123)", + }, + { + description: "two tags", + point: iotdbPoint{ + deviceID: "root.cpu.host_5", + tagString: "region='eu-west-1',datacenter='eu-west-1b'", + }, + expectedSQL: "CREATE timeseries root.cpu.host_5._tags with datatype=INT32, encoding=RLE, compression=SNAPPY attributes(region='eu-west-1',datacenter='eu-west-1b')", + }, + } + for _, c := range cases { + t.Run(c.description, func(t *testing.T) { + actual := c.point.generateTagsAttributesSQL() + require.EqualValues(t, c.expectedSQL, actual) + }) + } +} + +func TestGenerateInsertStatement(t *testing.T) { + cases := []struct { + description string + lines []string + expected iotdbPoint + }{ + { + description: "one point", + lines: []string{ + "deviceID,timestamp,value", + "root.cpu.host_9,1451606400000000000,3.1415926", + "datatype,4", + "tags", + }, + expected: iotdbPoint{ + deviceID: "root.cpu.host_9", + timestamp: 1451606400000, + measurements: []string{"value"}, + values: []interface{}{float64(3.1415926)}, + dataTypes: []client.TSDataType{client.DOUBLE}, + tagString: "", + fieldsCnt: 1, + }, + }, + { + description: "one point with different dataTypes", + lines: []string{ + "deviceID,timestamp,floatV,strV,int64V,int32V,boolV", + "root.cpu.host_0,1451606400000000000,3.1415926,hello,123,123,true", + "datatype,4,5,2,1,0", + "tags", + }, + expected: iotdbPoint{ + deviceID: "root.cpu.host_0", + timestamp: 1451606400000, + measurements: []string{"floatV", "strV", "int64V", "int32V", "boolV"}, + values: []interface{}{float64(3.1415926), string("hello"), int64(123), int32(123), true}, + dataTypes: []client.TSDataType{client.DOUBLE, client.TEXT, client.INT64, client.INT32, client.BOOLEAN}, + tagString: "", + fieldsCnt: 5, + }, + }, + { + description: "one point with different dataTypes", + lines: []string{ + "deviceID,timestamp,floatV,strV,int64V,int32V,boolV", + "root.cpu.host_0,1451606400000000000,3.1415926,hello,123,123,true", + "datatype,4,5,2,1,0", + "tags,region='eu-west-1',datacenter='eu-west-1b'", + }, + expected: iotdbPoint{ + deviceID: "root.cpu.host_0", + timestamp: 1451606400000, + measurements: []string{"floatV", "strV", "int64V", "int32V", "boolV"}, + values: []interface{}{float64(3.1415926), string("hello"), int64(123), int32(123), true}, + dataTypes: []client.TSDataType{client.DOUBLE, client.TEXT, client.INT64, client.INT32, client.BOOLEAN}, + tagString: "region='eu-west-1',datacenter='eu-west-1b'", + fieldsCnt: 5, + }, + }, + } + + for _, c := range cases { + t.Run(c.description, func(t *testing.T) { + require.True(t, len(c.lines) == 4) + actual := parseFourLines(c.lines[0], c.lines[1], c.lines[2], c.lines[3]) + require.EqualValues(t, &c.expected, actual.Data.(*iotdbPoint)) + }) + } +} diff --git a/cmd/tsbs_run_queries_iotdb/main.go b/cmd/tsbs_run_queries_iotdb/main.go new file mode 100644 index 000000000..f9675eb55 --- /dev/null +++ b/cmd/tsbs_run_queries_iotdb/main.go @@ -0,0 +1,153 @@ +package main + +import ( + "fmt" + "log" + "time" + + "github.com/blagojts/viper" + "github.com/spf13/pflag" + "github.com/timescale/tsbs/internal/utils" + "github.com/timescale/tsbs/pkg/query" + + "github.com/apache/iotdb-client-go/client" +) + +// database option vars +var ( + clientConfig client.Config + timeoutInMs int64 // 0 for no timeout +) + +// Global vars: +var ( + runner *query.BenchmarkRunner +) + +// Parse args: +func init() { + var config query.BenchmarkRunnerConfig + config.AddToFlagSet(pflag.CommandLine) + + pflag.String("host", "localhost", "Hostname of IoTDB instance") + pflag.String("port", "6667", "Which port to connect to on the database host") + pflag.String("user", "root", "The user who connect to IoTDB") + pflag.String("password", "root", "The password for user connecting to IoTDB") + + pflag.Parse() + + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + + if err := viper.Unmarshal(&config); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + + host := viper.GetString("host") + port := viper.GetString("port") + user := viper.GetString("user") + password := viper.GetString("password") + workers := viper.GetUint("workers") + timeoutInMs = 0 // 0 for no timeout + + log.Printf("tsbs_run_queries_iotdb target: %s:%s. Loading with %d workers.\n", host, port, workers) + if workers < 5 { + log.Println("Insertion throughput is strongly related to the number of threads. Use more workers for better performance.") + } + + clientConfig = client.Config{ + Host: host, + Port: port, + UserName: user, + Password: password, + } + + runner = query.NewBenchmarkRunner(config) +} + +func main() { + runner.Run(&query.IoTDBPool, newProcessor) +} + +type processor struct { + session client.Session + printResponses bool +} + +func newProcessor() query.Processor { return &processor{} } + +func (p *processor) Init(workerNumber int) { + p.session = client.NewSession(&clientConfig) + p.printResponses = runner.DoPrintResponses() + if err := p.session.Open(false, int(timeoutInMs)); err != nil { + errMsg := fmt.Sprintf("query processor init error, session is not open: %v\n", err) + errMsg = errMsg + fmt.Sprintf("timeout setting: %d ms", timeoutInMs) + log.Fatal(errMsg) + } +} + +func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { + iotdbQ := q.(*query.IoTDB) + sql := string(iotdbQ.SqlQuery) + + start := time.Now().UnixNano() + dataSet, err := p.session.ExecuteQueryStatement(sql, &timeoutInMs) // 0 for no timeout + if err == nil { + if p.printResponses { + printDataSet(sql, dataSet) + } else { + // var next bool + // for next, err = dataSet.Next(); err == nil && next; next, err = dataSet.Next() { + // // Traverse query results + // } + } + } + took := time.Now().UnixNano() - start + + defer dataSet.Close() + if err != nil { + log.Printf("An error occurred while executing query SQL: %s\n", sql) + return nil, err + } + + lag := float64(took) / float64(time.Millisecond) // in milliseconds + stat := query.GetStat() + stat.Init(q.HumanLabelName(), lag) + return []*query.Stat{stat}, err +} + +func printDataSet(sql string, sds *client.SessionDataSet) { + fmt.Printf("\nResponse for query '%s':\n", sql) + showTimestamp := !sds.IsIgnoreTimeStamp() + if showTimestamp { + fmt.Print("Time\t\t\t\t") + } + + for i := 0; i < sds.GetColumnCount(); i++ { + fmt.Printf("%s\t", sds.GetColumnName(i)) + } + fmt.Println() + + printedColsCount := 0 + for next, err := sds.Next(); err == nil && next; next, err = sds.Next() { + if showTimestamp { + fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName)) + } + for i := 0; i < sds.GetColumnCount(); i++ { + columnName := sds.GetColumnName(i) + v := sds.GetValue(columnName) + if v == nil { + v = "null" + } + fmt.Printf("%v\t\t", v) + } + fmt.Println() + printedColsCount++ + } + if printedColsCount == 0 { + fmt.Println("Empty Set.") + } +} diff --git a/docs/iotdb.md b/docs/iotdb.md new file mode 100644 index 000000000..64ff0586f --- /dev/null +++ b/docs/iotdb.md @@ -0,0 +1,209 @@ +# TSBS Supplemental Guide: Apache IoTDB + +**This should be read *after* the main README.** + +Apache IoTDB (Database for Internet of Things) is an IoT native database with +high performance for data management and analysis, deployable on the edge and +the cloud. For more details about Apache IoTDB, please take a look at: +[https://iotdb.apache.org/](https://iotdb.apache.org/) + +This supplemental guide explains how the data generated for TSBS is stored, +additional flags available when using the data importer (`tsbs_load_iotdb`), +and additional flags available for the query runner (`tsbs_run_queries_iotdb`). + +## Data format + +Data generated by `tsbs_generate_data` for IoTDB is serialized in a "pseudo-CSV" +format, along with a custom header at the beginning: + +* one line composed of a comma-separated list of "deviceID" and field names +* one line composed of a comma-separated list of deviceID and field values +* one line composed of a comma-separated list of "datatype" and data types +* one line composed of a comma-separated list of tags + +For example: + +```text +deviceID,timestamp,,,,... +,,,,,... +datatype,,,,... +tags,=,=,... + +deviceID,timestamp,hostname,value +root.cpu.host_1,1451606400000000000,'host_1',44.0 +datatype,5,2 +tags,region='eu-west-1',datacenter='eu-west-1c',rack='87', +``` + +`deviceID` describes the storage path which is composed of several nodes. +IoTDB uses storage groups to manage data. For example, in test case `devops`, +if `measurementName` is `cpu` while `hostname` tag is `host_0`, then `deviceID` +is `root.cpu.host_0`. +`hostname` in `devops` test cases is one of primary keys, and each `hostname` +specifies a unique device, so it's selected to be part of `deviceID`. But other +tags are not so important and their values do not change during whole testing, +so they are designed to be stored in a node named `_tags` as attributes. +For more detail about how IoTDB manage attributes of a node, please take a look +at [Timeseries Management](https://iotdb.apache.org/UserGuide/Master/Operate-Metadata/Timeseries.html#create-timeseries). + +The unit of timestamp in generated data is nanosecond, but it will be converted +into millisecond before insert into database. + +An example for the `cpu-only` use case: + +```text +deviceID,timestamp,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice +root.cpu.host_0,1451606400000000000,58,2,24,61,22,63,6,44,80,38 +datatype,2,2,2,2,2,2,2,2,2,2 +tags,region='eu-west-1',datacenter='eu-west-1c',rack='87',os='Ubuntu16.04LTS',arch='x64',team='NYC',service='18',service_version='1',service_environment='production' + +``` + +--- + +## CSV Export + +`tsbs_load_iotdb` provides CSV export function. User can export any test cases +into CSV files. In this method, `tsbs_load_iotdb` will NOT open any session, +and will NOT connect to database. It will create some CSV files and store data +into them instead. The amount of CSV files is equal to the amount of storage +groups. For example, if `sacle==4000` in `devops` test cases, there will be +`4,000 * 9 = 36,000` storage groups and 36,000 CSV files. + +An example for a CSV file: + +```text +Time,root.cpu.host_0.usage_user(INT64),root.cpu.host_0.usage_system(INT64),root.cpu.host_0.usage_idle(INT64),root.cpu.host_0.usage_nice(INT64),root.cpu.host_0.usage_iowait(INT64),root.cpu.host_0.usage_irq(INT64),root.cpu.host_0.usage_softirq(INT64),root.cpu.host_0.usage_steal(INT64),root.cpu.host_0.usage_guest(INT64),root.cpu.host_0.usage_guest_nice(INT64) +1451606400000,58,2,24,61,22,63,6,44,80,38 +1451606460000,58,1,24,62,22,61,7,48,80,38 +1451606520000,59,0,24,63,21,61,6,48,79,38 +1451606580000,59,1,25,62,22,60,6,49,80,36 +1451606640000,58,1,24,62,22,60,8,49,80,35 +1451606700000,58,2,24,62,21,61,8,48,79,36 +``` + +In this CSV file, the data aligned by time, and headers with data type. For more +information about this format, please take a look at +[Sample CSV file to be imported](https://iotdb.apache.org/UserGuide/Master/Write-Data/CSV-Tool.html#sample-csv-file-to-be-imported). + +--- + +## `tsbs_load_iotdb` Additional Flags + +### IoTDB related + +#### `-host` (type: `string`, default: `localhost`) + +Hostname of IoTDB instance. + +#### `-port` (type: `string`, default: `6667`) + +Which port to connect to on the database host. + +#### `-user` (type: `string`, default: `root`) + +The username of user who connect to IoTDB. + +#### `-password` (type: `string`, default: `root`) + +The password for user connecting to IoTDB. + +#### `-timeout` (type: `int`, default: `0`) + +Session timeout check in millisecond. After session config initialization, +client will try to get response from database server to make sure the connection +is established. This argument is the session timeout in millisecond. Use 0 for +no timeout. + +#### `-records-max-rows` (type: `int`, default: `0`) + +Session use `insertRecords` to insert data into database. This argument is the +max size (max rows) of records. Use 0 for no limit. If this argument is 0, +`tsbs_load_iotdb` will insert all data in a batch at one time, that means the +behavior is equal to setting this arguments as `batch-size`. + +**Warning!** If this value is set too small, the insertion performance will be +seriously reduced. + +#### `-to-csv` (type: `bool`, default: `false`) + +This argument is relative to CSV export function. If this argument is `false`, +then `tsbs_load_iotdb` will insert data into IoTDB database. Else it will +generate CSV files locally without any connect to database. Therefore, if this +argument is `true`, 6 arguments are meaningless. + +**Warning!** While this argument is `true`, do NOT set `worker` more than 1. +Because file writing is a sequential operation. + +#### `-csv-prefix` (type: `string`, default: `./`) + +This is prefix of filepath for CSV files. Specific a folder or a folder with +filename prefix. +For example, if it's set to `/home/`, all exported CSV files will be stored in +folder `/home/`, like this: + +```text +|-- home + |-- root.cpu.host_0.csv + |-- root.cpu.host_1.csv + |-- root.cpu.host_2.csv + |-- root.cpu.host_3.csv + ... + |-- root.disk.host_0.csv + |-- root.disk.host_1.csv + |-- root.disk.host_2.csv + ... +``` + +For another example, if it's set to `/home/iotdb-data-`, all exported CSV files +will be stored in folder `/home/` with prefix `iotdb-data-`, like this: + +```text +|-- home + |-- iotdb-data-root.cpu.host_0.csv + |-- iotdb-data-root.cpu.host_1.csv + |-- iotdb-data-root.cpu.host_2.csv + |-- iotdb-data-root.cpu.host_3.csv + ... + |-- iotdb-data-root.disk.host_0.csv + |-- iotdb-data-root.disk.host_1.csv + |-- iotdb-data-root.disk.host_2.csv + ... +``` + +**Warning!** `tsbs_load_iotdb` uses native string connection to complete above +works, so please make sure folders in those paths are exist. That means user +should create those folders manually. + +#### `-aligned-timeseries` (type: `bool`, default: `true`) + +Using aligned timeseries for all metrics if set true. That means using +InsertAlignedRecords, which is faster than InsertRecords. + +#### `-store-tags` (type: `bool`, default: `false`) + +Store tags if set true. Can NOT be used if `-aligned-timeseries` is set true. +That's because IoTDB do NOT support 'attributes' and 'tags' for aligned +timeseries yet. + +--- + +## `tsbs_run_queries_iotdb` Additional Flags + +### IoTDB related + +#### `-host` (type: `string`, default: `localhost`) + +Hostname of IoTDB instance. + +#### `-port` (type: `string`, default: `6667`) + +Which port to connect to on the database host. + +#### `-user` (type: `string`, default: `root`) + +The username of user who connect to IoTDB. + +#### `-password` (type: `string`, default: `root`) + +The password for user connecting to IoTDB. diff --git a/go.mod b/go.mod index 1106116ed..9f0b9d2fb 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/SiriDB/go-siridb-connector v0.0.0-20190110105621-86b34c44c921 github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 + github.com/apache/iotdb-client-go v0.14.0-preview1.0.20221125032227-e7dcc092f533 github.com/aws/aws-sdk-go v1.35.13 github.com/blagojts/viper v1.6.3-0.20200313094124-068f44cf5e69 github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 @@ -25,6 +26,7 @@ require ( github.com/shirou/gopsutil v3.21.3+incompatible github.com/spf13/cobra v1.0.0 github.com/spf13/pflag v1.0.5 + github.com/stretchr/testify v1.8.1 github.com/timescale/promscale v0.0.0-20201006153045-6a66a36f5c84 github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/transceptor-technology/go-qpack v0.0.0-20190116123619-49a14b216a45 diff --git a/go.sum b/go.sum index 6b3ac40ef..0451762a7 100644 --- a/go.sum +++ b/go.sum @@ -93,8 +93,12 @@ github.com/andybalholm/brotli v1.0.0 h1:7UCwP93aiSfvWpapti8g88vVVGp2qqtGyePsSuDa github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= +github.com/apache/iotdb-client-go v0.14.0-preview1.0.20221125032227-e7dcc092f533 h1:BpGjGsy1s2xsnw0ML9E1Agqn2bmwdSMr9L+fG7WTsq8= +github.com/apache/iotdb-client-go v0.14.0-preview1.0.20221125032227-e7dcc092f533/go.mod h1:4Az8byJmeA6Nolvl0tYTvuFVP+6AJcF++SZ+gNSeaIA= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/apache/thrift v0.15.0 h1:aGvdaR0v1t9XLgjtBYwxcBvBOTMqClzwE26CHOgjW1Y= +github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -359,6 +363,7 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= +github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -828,11 +833,8 @@ github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c/go.mod h1:/PevMnwAxekIXwN8qQyfc5gl2NlkB3CQlkizAbOkeBs= github.com/shirou/gopsutil v0.0.0-20190901111213-e4ec7b275ada/go.mod h1:WWnYX4lzhCH5h/3YBfyVA3VbLYjlMZZAQcW9ojMexNc= -github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM= -github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.21.3+incompatible h1:uenXGGa8ESCQq+dbgtl916dmg6PSAz2cXov0uORQ9v8= github.com/shirou/gopsutil v3.21.3+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= -github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 h1:udFKJ0aHUL60LboW/A+DfgoHVedieIzIXE8uylPue0U= github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc h1:jUIKcSPO9MoMJBbEoyE/RJoE8vz7Mb8AjvifMMwSyvY= @@ -883,13 +885,18 @@ github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5J github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tdakkota/asciicheck v0.0.0-20200416190851-d7f85be797a2/go.mod h1:yHp0ai0Z9gUljN3o0xMhYJnH/IcvkdTBOX2fmJ93JEM= @@ -902,6 +909,7 @@ github.com/timescale/promscale v0.0.0-20201006153045-6a66a36f5c84/go.mod h1:rkhy github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4= github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI= +github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA= github.com/tklauser/numcpus v0.2.2/go.mod h1:x3qojaO3uyYt0i56EW/VUYs7uBvdl2fkfZFu0T9wgjM= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -1136,7 +1144,6 @@ golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200821140526-fda516888d29/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200908134130-d2e65c121b96 h1:gJciq3lOg0eS9fSZJcoHfv7q1BfC6cJfnmSSKL1yu3Q= golang.org/x/sys v0.0.0-20200908134130-d2e65c121b96/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa h1:ZYxPR6aca/uhfRJyaOAtflSHjJYiktO7QnJC5ut7iY4= golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1348,8 +1355,9 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v0.0.0-20181223230014-1083505acf35/go.mod h1:R//lfYlUuTOTfblYI3lGoAAAebUdzjvbmQsuB7Ykd90= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/utils/time_interval.go b/internal/utils/time_interval.go index e2b271541..276b16824 100644 --- a/internal/utils/time_interval.go +++ b/internal/utils/time_interval.go @@ -118,12 +118,12 @@ func (ti *TimeInterval) StartUnixMillis() int64 { return ti.start.UTC().UnixNano() / int64(time.Millisecond) } -// StartString formats the start of the TimeInterval according to RFC3339. +// StartString formats the start of the TimeInterval according to RFC3339. func (ti *TimeInterval) StartString() string { return ti.start.Format(time.RFC3339) } -// End returns the starting time in UTC. +// End returns the end time in UTC. func (ti *TimeInterval) End() time.Time { return ti.end } diff --git a/pkg/query/factories/init_factories.go b/pkg/query/factories/init_factories.go index ff3faf47d..27292914f 100644 --- a/pkg/query/factories/init_factories.go +++ b/pkg/query/factories/init_factories.go @@ -6,6 +6,7 @@ import ( "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/clickhouse" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/cratedb" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/influx" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/iotdb" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/mongo" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/questdb" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/databases/siridb" @@ -39,5 +40,9 @@ func InitQueryFactories(config *config.QueryGeneratorConfig) map[string]interfac DBName: config.DbName, } factories[constants.FormatQuestDB] = &questdb.BaseGenerator{} + factories[constants.FormatIoTDB] = &iotdb.BaseGenerator{ + BasicPath: "root", + BasicPathLevel: 0, + } return factories } diff --git a/pkg/query/iotdb.go b/pkg/query/iotdb.go new file mode 100644 index 000000000..e7adf03b8 --- /dev/null +++ b/pkg/query/iotdb.go @@ -0,0 +1,71 @@ +package query + +import ( + "fmt" + "sync" +) + +// IoTDB encodes a IoTDB request. This will be serialized for use +// by the tsbs_run_queries_iotdb program. +type IoTDB struct { + HumanLabel []byte + HumanDescription []byte + + SqlQuery []byte + id uint64 +} + +// IoTDBPool is a sync.Pool of IoTDB Query types +var IoTDBPool = sync.Pool{ + New: func() interface{} { + return &IoTDB{ + HumanLabel: []byte{}, + HumanDescription: []byte{}, + + SqlQuery: []byte{}, + } + }, +} + +// NewIoTDB returns a new IoTDB Query instance +func NewIoTDB() *IoTDB { + return IoTDBPool.Get().(*IoTDB) +} + +// GetID returns the ID of this Query +func (q *IoTDB) GetID() uint64 { + return q.id +} + +// SetID sets the ID for this Query +func (q *IoTDB) SetID(id uint64) { + q.id = id +} + +// String produces a debug-ready description of a Query. +func (q *IoTDB) String() string { + return fmt.Sprintf( + "HumanLabel: %s, HumanDescription: %s, Query: %s", + q.HumanLabel, q.HumanDescription, q.SqlQuery, + ) +} + +// HumanLabelName returns the human readable name of this Query +func (q *IoTDB) HumanLabelName() []byte { + return q.HumanLabel +} + +// HumanDescriptionName returns the human readable description of this Query +func (q *IoTDB) HumanDescriptionName() []byte { + return q.HumanDescription +} + +// Release resets and returns this Query to its pool +func (q *IoTDB) Release() { + q.HumanLabel = q.HumanLabel[:0] + q.HumanDescription = q.HumanDescription[:0] + q.id = 0 + q.SqlQuery = q.SqlQuery[:0] + + IoTDBPool.Put(q) +} diff --git a/pkg/targets/constants/constants.go b/pkg/targets/constants/constants.go index 516093d09..7bfe4423b 100644 --- a/pkg/targets/constants/constants.go +++ b/pkg/targets/constants/constants.go @@ -14,6 +14,7 @@ const ( FormatVictoriaMetrics = "victoriametrics" FormatTimestream = "timestream" FormatQuestDB = "questdb" + FormatIoTDB = "iotdb" ) func SupportedFormats() []string { @@ -30,5 +31,6 @@ func SupportedFormats() []string { FormatVictoriaMetrics, FormatTimestream, FormatQuestDB, + FormatIoTDB, } } diff --git a/pkg/targets/initializers/target_initializers.go b/pkg/targets/initializers/target_initializers.go index a7a31677e..9afaf0843 100644 --- a/pkg/targets/initializers/target_initializers.go +++ b/pkg/targets/initializers/target_initializers.go @@ -2,6 +2,8 @@ package initializers import ( "fmt" + "strings" + "github.com/timescale/tsbs/pkg/targets" "github.com/timescale/tsbs/pkg/targets/akumuli" "github.com/timescale/tsbs/pkg/targets/cassandra" @@ -9,6 +11,7 @@ import ( "github.com/timescale/tsbs/pkg/targets/constants" "github.com/timescale/tsbs/pkg/targets/crate" "github.com/timescale/tsbs/pkg/targets/influx" + "github.com/timescale/tsbs/pkg/targets/iotdb" "github.com/timescale/tsbs/pkg/targets/mongo" "github.com/timescale/tsbs/pkg/targets/prometheus" "github.com/timescale/tsbs/pkg/targets/questdb" @@ -16,7 +19,6 @@ import ( "github.com/timescale/tsbs/pkg/targets/timescaledb" "github.com/timescale/tsbs/pkg/targets/timestream" "github.com/timescale/tsbs/pkg/targets/victoriametrics" - "strings" ) func GetTarget(format string) targets.ImplementedTarget { @@ -45,6 +47,8 @@ func GetTarget(format string) targets.ImplementedTarget { return timestream.NewTarget() case constants.FormatQuestDB: return questdb.NewTarget() + case constants.FormatIoTDB: + return iotdb.NewTarget() } supportedFormatsStr := strings.Join(constants.SupportedFormats(), ",") diff --git a/pkg/targets/iotdb/implemented_target.go b/pkg/targets/iotdb/implemented_target.go new file mode 100644 index 000000000..25ca5a0b4 --- /dev/null +++ b/pkg/targets/iotdb/implemented_target.go @@ -0,0 +1,51 @@ +package iotdb + +import ( + "github.com/blagojts/viper" + "github.com/spf13/pflag" + "github.com/timescale/tsbs/pkg/data/serialize" + "github.com/timescale/tsbs/pkg/data/source" + "github.com/timescale/tsbs/pkg/targets" + "github.com/timescale/tsbs/pkg/targets/constants" +) + +func NewTarget() targets.ImplementedTarget { + return &iotdbTarget{ + BasicPath: "root", + BasicPathLevel: 0, + } +} + +type iotdbTarget struct { + BasicPath string // e.g. "root.sg" is basic path of "root.sg.device". default : "root" + BasicPathLevel int32 // e.g. 0 for "root", 1 for "root.device" +} + +func (t *iotdbTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.FlagSet) { + flagSet.String(flagPrefix+"host", "localhost", "Hostname of IoTDB instance") + flagSet.String(flagPrefix+"port", "6667", "Which port to connect to on the database host") + flagSet.String(flagPrefix+"user", "root", "The user who connect to IoTDB") + flagSet.String(flagPrefix+"password", "root", "The password for user connecting to IoTDB") + flagSet.Int(flagPrefix+"timeout", 0, "Session timeout check in millisecond. Use 0 for no timeout.") + flagSet.Int(flagPrefix+"records-max-rows", 0, "Max rows of 'InsertRecords'. Use 0 for no limit.") + flagSet.Bool(flagPrefix+"to-csv", false, "Do not insert into database, but to some CSV files.") + flagSet.String(flagPrefix+"csv-prefix", "./", "Prefix of filepath for CSV files. Specific a folder or a folder with filename prefix.") + flagSet.Bool(flagPrefix+"aligned-timeseries", true, "Using aligned timeseries for all metrics if set true.") + flagSet.Bool(flagPrefix+"store-tags", false, "Store tags if set true. Can NOT be used if aligned-timeseries is set true.") +} + +func (t *iotdbTarget) TargetName() string { + return constants.FormatIoTDB +} + +func (t *iotdbTarget) Serializer() serialize.PointSerializer { + return &Serializer{ + BasicPath: t.BasicPath, + BasicPathLevel: t.BasicPathLevel, + } +} + +func (t *iotdbTarget) Benchmark(string, *source.DataSourceConfig, *viper.Viper) (targets.Benchmark, error) { + // benchmark for tsbs_load_iotdb is implemented in "cmd/tsbs_load_iotdb/main.go/main()" + panic("Benchmark() not implemented! Benchmark for tsbs_load_iotdb is implemented in \"cmd/tsbs_load_iotdb/main.go/main()\"") +} diff --git a/pkg/targets/iotdb/serializer.go b/pkg/targets/iotdb/serializer.go new file mode 100644 index 000000000..143aa5471 --- /dev/null +++ b/pkg/targets/iotdb/serializer.go @@ -0,0 +1,167 @@ +package iotdb + +import ( + "fmt" + "io" + "strconv" + "strings" + + "github.com/apache/iotdb-client-go/client" + "github.com/timescale/tsbs/pkg/data" +) + +// Serializer writes a Point in a serialized form for IoTDB +type Serializer struct { + BasicPath string // e.g. "root.sg" is basic path of "root.sg.device". default : "root" + BasicPathLevel int32 // e.g. 0 for "root", 1 for "root.device" +} + +// const iotdbTimeFmt = "2006-01-02 15:04:05" + +const defaultBufSize = 4096 + +// Serialize writes Point p to the given Writer w, so it can be +// loaded by the IoTDB loader. The format is CSV with two lines per Point, +// with the first row being the names of fields and the second row being the +// field values. +// +// e.g., +// deviceID,timestamp,,,,... +// ,,,,,... +// datatype,,,,... +// tags,=,=,... +// +// deviceID,timestamp,hostname,value +// root.cpu.host_1,1451606400000000000,'host_1',44.0 +// datatype,5,2 +// tags,region='eu-west-1',datacenter='eu-west-1c',rack='87' +// +func (s *Serializer) Serialize(p *data.Point, w io.Writer) error { + // Tag row first, prefixed with 'time,path' + buf1 := make([]byte, 0, defaultBufSize) + buf1 = append(buf1, []byte("deviceID,timestamp")...) + datatype_buf := make([]byte, 0, defaultBufSize) + datatype_buf = append(datatype_buf, []byte("datatype")...) + tags_buf := make([]byte, 0, defaultBufSize) + tags_buf = append(tags_buf, []byte("tags")...) + tempBuf := make([]byte, 0, defaultBufSize) + var hostname string + foundHostname := false + tagKeys := p.TagKeys() + tagValues := p.TagValues() + for i, v := range tagValues { + if keyStr := string(tagKeys[i]); keyStr == "hostname" { + foundHostname = true + hostname = v.(string) + } else { + // handle other tags + + // buf1 = append(buf1, ',') + // buf1 = append(buf1, tagKeys[i]...) + // valueInStrByte, datatype := iotdbFormat(v) + // tempBuf = append(tempBuf, ',') + // tempBuf = append(tempBuf, valueInStrByte...) + // datatype_buf = append(datatype_buf, ',') + // datatype_buf = append(datatype_buf, []byte(fmt.Sprintf("%d", datatype))...) + valueInStrByte, datatype := IotdbFormat(v) + if datatype == client.TEXT { + tagStr := fmt.Sprintf(",%s='%s'", keyStr, string(valueInStrByte)) + tags_buf = append(tags_buf, []byte(tagStr)...) + } else { + tagStr := fmt.Sprintf(",%s=", keyStr) + tags_buf = append(tags_buf, []byte(tagStr)...) + tags_buf = append(tags_buf, valueInStrByte...) + } + } + } + if !foundHostname { + // Unable to find hostname as part of device id + hostname = "unknown" + } + buf2 := make([]byte, 0, defaultBufSize) + buf2 = append(buf2, []byte(fmt.Sprintf("%s.%s.%s,", s.BasicPath, modifyHostname(string(p.MeasurementName())), hostname))...) + buf2 = append(buf2, []byte(fmt.Sprintf("%d", p.Timestamp().UTC().UnixNano()))...) + buf2 = append(buf2, tempBuf...) + // Fields + fieldKeys := p.FieldKeys() + fieldValues := p.FieldValues() + for i, v := range fieldValues { + buf1 = append(buf1, ',') + buf1 = append(buf1, fieldKeys[i]...) + valueInStrByte, datatype := IotdbFormat(v) + buf2 = append(buf2, ',') + buf2 = append(buf2, valueInStrByte...) + datatype_buf = append(datatype_buf, ',') + datatype_buf = append(datatype_buf, []byte(fmt.Sprintf("%d", datatype))...) + } + buf1 = append(buf1, '\n') + buf2 = append(buf2, '\n') + datatype_buf = append(datatype_buf, '\n') + tags_buf = append(tags_buf, '\n') + _, err := w.Write(buf1) + if err == nil { + _, err = w.Write(buf2) + } + if err == nil { + _, err = w.Write(datatype_buf) + } + if err == nil { + _, err = w.Write(tags_buf) + } + return err +} + +// modifyHostnames makes sure IP address can appear in the path. +// Node names in path can NOT contain "." unless enclosing it within either single quote (') or double quote ("). +// In this case, quotes are recognized as part of the node name to avoid ambiguity. +func modifyHostname(hostname string) string { + if strings.Contains(hostname, ".") { + if !(hostname[:1] == "`" && hostname[len(hostname)-1:] == "`") { + // not modified yet + hostname = "`" + hostname + "`" + } + + } + return hostname +} + +// Utility function for appending various data types to a byte string +func IotdbFormat(v interface{}) ([]byte, client.TSDataType) { + // treat all integer as int32 + switch v.(type) { + case uint: + // return []byte(strconv.FormatInt(int64(v.(uint)), 10)), client.INT32 + return []byte(strconv.FormatInt(int64(v.(uint)), 10)), client.INT64 + case uint32: + // return []byte(strconv.FormatInt(int64(v.(uint32)), 10)), client.INT32 + return []byte(strconv.FormatInt(int64(v.(uint32)), 10)), client.INT64 + case uint64: + // return []byte(strconv.FormatInt(int64(v.(uint64)), 10)), client.INT32 + return []byte(strconv.FormatInt(int64(v.(uint64)), 10)), client.INT64 + case int: + // return []byte(strconv.FormatInt(int64(v.(int)), 10)), client.INT32 + return []byte(strconv.FormatInt(int64(v.(int)), 10)), client.INT64 + case int32: + // return []byte(strconv.FormatInt(int64(v.(int32)), 10)), client.INT32 + return []byte(strconv.FormatInt(int64(v.(int32)), 10)), client.INT32 + case int64: + // return []byte(strconv.FormatInt(int64(v.(int64)), 10)), client.INT32 + return []byte(strconv.FormatInt(int64(v.(int64)), 10)), client.INT64 + case float64: + // Why -1 ? + // From Golang source on genericFtoa (called by AppendFloat): 'Negative precision means "only as much as needed to be exact."' + // Using this instead of an exact number for precision ensures we preserve the precision passed in to the function, allowing us + // to use different precision for different use cases. + return []byte(strconv.FormatFloat(float64(v.(float64)), 'f', -1, 64)), client.DOUBLE + case float32: + return []byte(strconv.FormatFloat(float64(v.(float32)), 'f', -1, 32)), client.FLOAT + case bool: + return []byte(strconv.FormatBool(v.(bool))), client.BOOLEAN + case string: + return []byte(v.(string)), client.TEXT + case nil: + return []byte(v.(string)), client.UNKNOW + default: + panic(fmt.Sprintf("unknown field type for %#v", v)) + } +} diff --git a/pkg/targets/iotdb/serializer_test.go b/pkg/targets/iotdb/serializer_test.go new file mode 100644 index 000000000..e438c2bac --- /dev/null +++ b/pkg/targets/iotdb/serializer_test.go @@ -0,0 +1,169 @@ +package iotdb + +import ( + "bytes" + "testing" + + "github.com/apache/iotdb-client-go/client" + "github.com/stretchr/testify/require" + "github.com/timescale/tsbs/pkg/data" + "github.com/timescale/tsbs/pkg/data/serialize" +) + +func TestIotdbFormat(t *testing.T) { + cases := []struct { + description string + input interface{} + expectedByte []byte + expectedType client.TSDataType + }{ + { + description: "boolean true", + input: interface{}(true), + expectedByte: []byte("true"), + expectedType: client.BOOLEAN, + }, + { + description: "boolean false", + input: interface{}(false), + expectedByte: []byte("false"), + expectedType: client.BOOLEAN, + }, + { + description: "int32 -1", + input: interface{}(int32(-1)), + expectedByte: []byte("-1"), + expectedType: client.INT32, + }, + { + description: "int64 2147483648", + input: interface{}(int64(2147483648)), + expectedByte: []byte("2147483648"), + expectedType: client.INT64, + }, + { + description: "int64 9223372036854775801", + input: interface{}(int64(9223372036854775801)), + expectedByte: []byte("9223372036854775801"), + expectedType: client.INT64, + }, + { + description: "float32 0.1", + input: interface{}(float32(0.1)), + expectedByte: []byte("0.1"), + expectedType: client.FLOAT, + }, + { + description: "float64 0.12345678901234567890123456", + input: interface{}(float64(0.12345678901234567890123456)), + expectedByte: []byte("0.12345678901234568"), + expectedType: client.DOUBLE, + }, + } + for _, c := range cases { + t.Run(c.description, func(t *testing.T) { + actualByte, actualType := IotdbFormat(c.input) + require.EqualValues(t, c.expectedByte, actualByte) + require.EqualValues(t, c.expectedType, actualType) + }) + } + +} + +func TestSerialize_normal(t *testing.T) { + cases := []struct { + description string + inputPoint *data.Point + expected string + }{ + { + description: "a regular point ", + inputPoint: serialize.TestPointDefault(), + expected: "deviceID,timestamp,usage_guest_nice\nroot.cpu.host_0,1451606400000000000,38.24311829\ndatatype,4\ntags,region='eu-west-1',datacenter='eu-west-1b'\n", + }, + { + description: "a regular Point using int as value", + inputPoint: serialize.TestPointInt(), + expected: "deviceID,timestamp,usage_guest\nroot.cpu.host_0,1451606400000000000,38\ndatatype,2\ntags,region='eu-west-1',datacenter='eu-west-1b'\n", + }, + { + description: "a regular Point with multiple fields", + inputPoint: serialize.TestPointMultiField(), + expected: "deviceID,timestamp,big_usage_guest,usage_guest,usage_guest_nice\nroot.cpu.host_0,1451606400000000000,5000000000,38,38.24311829\ndatatype,2,2,4\ntags,region='eu-west-1',datacenter='eu-west-1b'\n", + }, + { + description: "a Point with no tags", + inputPoint: serialize.TestPointNoTags(), + expected: "deviceID,timestamp,usage_guest_nice\nroot.cpu.unknown,1451606400000000000,38.24311829\ndatatype,4\ntags\n", + }, + } + for _, c := range cases { + t.Run(c.description, func(t *testing.T) { + ps := &Serializer{ + BasicPath: "root", + BasicPathLevel: 0, + } + b := new(bytes.Buffer) + err := ps.Serialize(c.inputPoint, b) + require.NoError(t, err) + actual := b.String() + + require.EqualValues(t, c.expected, actual) + }) + } + +} + +func TestSerialize_nonDefaultBasicPath(t *testing.T) { + cases := []struct { + description string + inputPoint *data.Point + BasicPath string + BasicPathLevel int32 + expected string + }{ + { + description: "a regular point ", + inputPoint: serialize.TestPointDefault(), + BasicPath: "root.sg", + BasicPathLevel: 1, + expected: "deviceID,timestamp,usage_guest_nice\nroot.sg.cpu.host_0,1451606400000000000,38.24311829\ndatatype,4\ntags,region='eu-west-1',datacenter='eu-west-1b'\n", + }, + { + description: "a regular Point using int as value", + inputPoint: serialize.TestPointInt(), + BasicPath: "root.ln", + BasicPathLevel: 1, + expected: "deviceID,timestamp,usage_guest\nroot.ln.cpu.host_0,1451606400000000000,38\ndatatype,2\ntags,region='eu-west-1',datacenter='eu-west-1b'\n", + }, + { + description: "a regular Point with multiple fields", + inputPoint: serialize.TestPointMultiField(), + BasicPath: "root.db.abc", + BasicPathLevel: 2, + expected: "deviceID,timestamp,big_usage_guest,usage_guest,usage_guest_nice\nroot.db.abc.cpu.host_0,1451606400000000000,5000000000,38,38.24311829\ndatatype,2,2,4\ntags,region='eu-west-1',datacenter='eu-west-1b'\n", + }, + { + description: "a Point with no tags", + inputPoint: serialize.TestPointNoTags(), + BasicPath: "root", + BasicPathLevel: 0, + expected: "deviceID,timestamp,usage_guest_nice\nroot.cpu.unknown,1451606400000000000,38.24311829\ndatatype,4\ntags\n", + }, + } + for _, c := range cases { + t.Run(c.description, func(t *testing.T) { + ps := &Serializer{ + BasicPath: c.BasicPath, + BasicPathLevel: c.BasicPathLevel, + } + b := new(bytes.Buffer) + err := ps.Serialize(c.inputPoint, b) + require.NoError(t, err) + actual := b.String() + + require.EqualValues(t, c.expected, actual) + }) + } + +}