Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
df9db17
init - make success
citrusreticulata Oct 9, 2022
7021966
Init iotdb, part Query not completed
citrusreticulata Oct 22, 2022
425da8c
complete devops.go; fix typo
citrusreticulata Oct 22, 2022
2fbc8e2
devops query generate completed. test file added.
citrusreticulata Oct 22, 2022
401fe51
queries test complete
citrusreticulata Oct 24, 2022
48b77d1
serializer complete
citrusreticulata Oct 30, 2022
1444627
working on testcases loading
citrusreticulata Oct 30, 2022
b320cba
tsbs_load_iotdb complete (without unit tests)
citrusreticulata Nov 1, 2022
2419b00
update iotdb-client-go version
citrusreticulata Nov 3, 2022
c5869c9
complete scan test
citrusreticulata Nov 3, 2022
6a92520
update some user friendly tips
citrusreticulata Nov 4, 2022
68a0aaf
Users can now specify timeout
citrusreticulata Nov 4, 2022
7c17989
tsbs_run_queries_iotdb has been completed.
citrusreticulata Nov 9, 2022
c3850f4
CRTODO removed,using back quote(`) in IP address
citrusreticulata Nov 9, 2022
9cc69bc
using DATETIME-INPUT in data serializer
citrusreticulata Nov 9, 2022
1a83132
fix bug in GroupByTimeAndPrimaryTag
citrusreticulata Nov 9, 2022
c5a630c
added doc, readme for iotdb
citrusreticulata Nov 10, 2022
e1439b5
update main readme
citrusreticulata Nov 13, 2022
b009f19
[Important] using InsertRecords
citrusreticulata Nov 13, 2022
079fc73
using 4lines structure for serializer, not completed
citrusreticulata Nov 16, 2022
bcbf8b3
using 4lines, storage tags in attributes of node '_tags'
citrusreticulata Nov 16, 2022
eb26063
update workersNum warning in insertion test
citrusreticulata Nov 17, 2022
9c99cd9
optimize ProcessBatch function
citrusreticulata Nov 17, 2022
7094855
fix typo parseDateToInterface -> parseDataToInterface
citrusreticulata Nov 17, 2022
146f139
[important] support csv export
citrusreticulata Nov 17, 2022
a3b10a1
make sure csv export can be used offline
citrusreticulata Nov 17, 2022
bedfeba
updated docs and comments. added arguments check. init bug fixed
citrusreticulata Nov 17, 2022
b4df7a6
updated test cases for scan.go and serializer.go
citrusreticulata Nov 21, 2022
9d00fa0
correct SQL queries; support printing responses
citrusreticulata Nov 21, 2022
ffe4bab
[important] Printing is not counted into time; do NOT send close.
citrusreticulata Nov 22, 2022
eecb4cb
updated main readme
citrusreticulata Nov 22, 2022
61c84f4
fix bug: query dataset closing
citrusreticulata Nov 23, 2022
8f8bdf1
check dataset close error before return
citrusreticulata Nov 23, 2022
5963896
better error catch for dataset closing
citrusreticulata Nov 23, 2022
abcb39b
updated SQL query
citrusreticulata Nov 23, 2022
8234919
close sql dataset in insertion
citrusreticulata Nov 25, 2022
e5e2580
Traverse query results in insertion
citrusreticulata Nov 25, 2022
a16db5b
fix defer ds.Close() in insertion
citrusreticulata Nov 25, 2022
9b5389e
use ExecuteNonQueryStatement for inserting tags
citrusreticulata Nov 25, 2022
3a8975a
close dataSet in queries anyway
citrusreticulata Nov 25, 2022
08712e4
for GroupByTime use 1min
citrusreticulata Nov 27, 2022
2f14fa6
user can choose TS type and whether to store tags
citrusreticulata Nov 30, 2022
fa950bb
do NOT traverse query results
citrusreticulata Dec 15, 2022
82902f3
treat all integer as int32
citrusreticulata Dec 23, 2022
96ce144
do NOT treat all integer as int32
citrusreticulata Jan 6, 2023
559ebd6
Fixed Bug, supported using BasicPath for data generation
citrusreticulata Apr 3, 2023
38c9e1b
Fix test cases TestSerialize_normal
citrusreticulata Apr 3, 2023
074cefa
Merge pull request #1 from citrusreticulata/master
citrusreticulata Oct 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ bin

# High Dynamic Range (HDR) Histogram files
*.hdr

bin/
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ loaders: tsbs_load \
tsbs_load_siridb \
tsbs_load_timescaledb \
tsbs_load_victoriametrics \
tsbs_load_questdb
tsbs_load_questdb \
tsbs_load_iotdb

runners: tsbs_run_queries_akumuli \
tsbs_run_queries_cassandra \
Expand All @@ -38,7 +39,8 @@ runners: tsbs_run_queries_akumuli \
tsbs_run_queries_timescaledb \
tsbs_run_queries_timestream \
tsbs_run_queries_victoriametrics \
tsbs_run_queries_questdb
tsbs_run_queries_questdb \
tsbs_run_queries_iotdb

test:
$(GOTEST) -v ./...
Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Current databases supported:
+ ClickHouse [(supplemental docs)](docs/clickhouse.md)
+ CrateDB [(supplemental docs)](docs/cratedb.md)
+ InfluxDB [(supplemental docs)](docs/influx.md)
+ IoTDB [(supplemental docs)](docs/iotdb.md)
+ MongoDB [(supplemental docs)](docs/mongo.md)
+ QuestDB [(supplemental docs)](docs/questdb.md)
+ SiriDB [(supplemental docs)](docs/siridb.md)
Expand Down Expand Up @@ -75,6 +76,7 @@ cases are implemented for each database:
|ClickHouse|X||
|CrateDB|X||
|InfluxDB|X|X|
|IoTDB|X||
|MongoDB|X|
|QuestDB|X|X
|SiriDB|X|
Expand All @@ -92,8 +94,8 @@ query execution performance. (It currently does not measure
concurrent insert and query performance, which is a future priority.)
To accomplish this in a fair way, the data to be inserted and the
queries to run are pre-generated and native Go clients are used
wherever possible to connect to each database (e.g., `mgo` for MongoDB,
`aws sdk` for Timestream).
wherever possible to connect to each database (e.g., `mgo` for MongoDB,
`aws sdk` for Timestream, `iotdb-client-go` for IoTDB).

Although the data is randomly generated, TSBS data and queries are
entirely deterministic. By supplying the same PRNG (pseudo-random number
Expand Down Expand Up @@ -134,8 +136,8 @@ Variables needed:
1. an end time. E.g., `2016-01-04T00:00:00Z`
1. how much time should be between each reading per device, in seconds. E.g., `10s`
1. and which database(s) you want to generate for. E.g., `timescaledb`
(choose from `cassandra`, `clickhouse`, `cratedb`, `influx`, `mongo`, `questdb`, `siridb`,
`timescaledb` or `victoriametrics`)
(choose from `cassandra`, `clickhouse`, `cratedb`, `influx`, `iotdb`, `mongo`,
`questdb`, `siridb`, `timescaledb` or `victoriametrics`)

Given the above steps you can now generate a dataset (or multiple
datasets, if you chose to generate for multiple databases) that can
Expand Down
46 changes: 46 additions & 0 deletions cmd/tsbs_generate_queries/databases/iotdb/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package iotdb

import (
"time"

"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils"
"github.com/timescale/tsbs/pkg/query"
)

const iotdbTimeFmt = "2006-01-02 15:04:05"

// BaseGenerator contains settings specific for IoTDB
type BaseGenerator struct {
BasicPath string // e.g. "root.sg" is basic path of "root.sg.device". default : "root"
BasicPathLevel int32 // e.g. 0 for "root", 1 for "root.device"
}

// GenerateEmptyQuery returns an empty query.Mongo.
func (g *BaseGenerator) GenerateEmptyQuery() query.Query {
return query.NewIoTDB()
}

// fillInQuery fills the query struct with data.
func (g *BaseGenerator) fillInQuery(qi query.Query, humanLabel, humanDesc, sql string) {
q := qi.(*query.IoTDB)
q.HumanLabel = []byte(humanLabel)
q.HumanDescription = []byte(humanDesc)
q.SqlQuery = []byte(sql)
}

// NewDevops creates a new devops use case query generator.
func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryGenerator, error) {
core, err := devops.NewCore(start, end, scale)

if err != nil {
return nil, err
}

devops := &Devops{
BaseGenerator: g,
Core: core,
}

return devops, nil
}
215 changes: 215 additions & 0 deletions cmd/tsbs_generate_queries/databases/iotdb/devops.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package iotdb

import (
"fmt"
"strings"
"time"

"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/pkg/query"
)

// TODO: Remove the need for this by continuing to bubble up errors
func panicIfErr(err error) {
if err != nil {
panic(err.Error())
}
}

// Devops produces IoTDB-specific queries for all the devops query types.
type Devops struct {
*BaseGenerator
*devops.Core
}

// modifyHostnames makes sure IP address can appear in the path.
// Node names in path can NOT contain "." unless enclosing it within either single quote (') or double quote (").
// In this case, quotes are recognized as part of the node name to avoid ambiguity.
func (d *Devops) modifyHostnames(hostnames []string) []string {
for i, hostname := range hostnames {
if strings.Contains(hostname, ".") {
if !(hostname[:1] == "`" && hostname[len(hostname)-1:] == "`") {
// not modified yet
hostnames[i] = "`" + hostnames[i] + "`"
}

}
}
return hostnames
}

// getHostFromWithHostnames creates FROM SQL statement for multiple hostnames.
// e.g. A storage group "root.cpu" has two devices.
// Two hostnames are "host1" and "host2"
// This function returns "root.cpu.host1, root.cpu.host2" (without "FROM")
func (d *Devops) getHostFromWithHostnames(hostnames []string) string {
hostnames = d.modifyHostnames(hostnames)
var hostnameClauses []string
for _, hostname := range hostnames {
hostnameClauses = append(hostnameClauses, fmt.Sprintf("%s.cpu.%s", d.BasicPath, hostname))
}
return strings.Join(hostnameClauses, ", ")
}

// getHostFromString gets multiple random hostnames and creates a FROM SQL statement for these hostnames.
// e.g. A storage group "root.cpu" has two devices, named "host1" and "host2"
// Two paths for them are "root.cpu.host1" and "root.cpu.host2"
// This function returns "root.cpu.host1, root.cpu.host2" (without "FROM")
func (d *Devops) getHostFromString(nHosts int) string {
hostnames, err := d.GetRandomHosts(nHosts)
panicIfErr(err)
fromClauses := d.getHostFromWithHostnames(hostnames)
return fromClauses
}

// getSelectClausesAggMetrics gets clauses for aggregate functions.
func (d *Devops) getSelectClausesAggMetrics(agg string, metrics []string) []string {
selectClauses := make([]string, len(metrics))
for i, m := range metrics {
selectClauses[i] = fmt.Sprintf("%s(%s)", agg, m)
}

return selectClauses
}

// getSelectClausesAggMetricsString gets a whole select clause for aggregate functions.
func (d *Devops) getSelectClausesAggMetricsString(agg string, metrics []string) string {
selectClauses := d.getSelectClausesAggMetrics(agg, metrics)
return strings.Join(selectClauses, ", ")
}

// GroupByTime selects the MAX for numMetrics metrics under 'cpu',
// per minute for nhosts hosts,
// e.g. in pseudo-SQL:
//
// SELECT minute, max(metric1), ..., max(metricN)
// FROM cpu
// WHERE (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY minute ORDER BY minute ASC
func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) {
interval := d.Interval.MustRandWindow(timeRange)
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
panicIfErr(err)
selectClause := d.getSelectClausesAggMetricsString("MAX_VALUE", metrics)
fromHosts := d.getHostFromString(nHosts)

humanLabel := fmt.Sprintf("IoTDB %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
sql := ""
sql = sql + fmt.Sprintf("SELECT %s", selectClause)
sql = sql + fmt.Sprintf(" FROM %s", fromHosts)
sql = sql + fmt.Sprintf(" GROUP BY ([%s, %s), 1m), LEVEL = %d", interval.Start().Format(iotdbTimeFmt), interval.End().Format(iotdbTimeFmt), d.BasicPathLevel+1)

d.fillInQuery(qi, humanLabel, humanDesc, sql)
}

// GroupByTimeAndPrimaryTag selects the AVG of numMetrics metrics under 'cpu' per device per hour for a day,
// e.g. in pseudo-SQL:
//
// SELECT AVG(metric1), ..., AVG(metricN)
// FROM cpu
// WHERE time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY hour, hostname ORDER BY hour
func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
panicIfErr(err)
interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration)
selectClause := d.getSelectClausesAggMetricsString("AVG", metrics)

humanLabel := devops.GetDoubleGroupByLabel("IoTDB", numMetrics)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
sql := ""
sql = sql + fmt.Sprintf("SELECT %s", selectClause)
sql = sql + fmt.Sprintf(" FROM %s.cpu.*", d.BasicPath)
sql = sql + fmt.Sprintf(" GROUP BY ([%s, %s), 1h)", interval.Start().Format(iotdbTimeFmt), interval.End().Format(iotdbTimeFmt))

d.fillInQuery(qi, humanLabel, humanDesc, sql)
}

// LastPointPerHost finds the last row for every host in the dataset
func (d *Devops) LastPointPerHost(qi query.Query) {
humanLabel := "IoTDB last row per host"
humanDesc := humanLabel + ": cpu"

sql := fmt.Sprintf("SELECT LAST * FROM %s.cpu.*", d.BasicPath)
d.fillInQuery(qi, humanLabel, humanDesc, sql)
}

// MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for nhosts hosts,
// e.g. in pseudo-SQL:
//
// SELECT MAX(metric1), ..., MAX(metricN)
// FROM cpu WHERE (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY hour ORDER BY hour
func (d *Devops) MaxAllCPU(qi query.Query, nHosts int, duration time.Duration) {
interval := d.Interval.MustRandWindow(duration)
fromHosts := d.getHostFromString(nHosts)
selectClause := d.getSelectClausesAggMetricsString("MAX_VALUE", devops.GetAllCPUMetrics())

humanLabel := devops.GetMaxAllLabel("IoTDB", nHosts)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
sql := ""
sql = sql + fmt.Sprintf("SELECT %s", selectClause)
sql = sql + fmt.Sprintf(" FROM %s", fromHosts)
sql = sql + fmt.Sprintf(" GROUP BY ([%s, %s), 1h), LEVEL=%d", interval.Start().Format(iotdbTimeFmt), interval.End().Format(iotdbTimeFmt), d.BasicPathLevel+1)

d.fillInQuery(qi, humanLabel, humanDesc, sql)
}

// GroupByOrderByLimit benchmarks a query that has a time WHERE clause, that groups by a truncated date, orders by that date, and takes a limit:
// SELECT date_trunc('minute', time) AS t, MAX(cpu) FROM cpu
// WHERE time < '$TIME'
// GROUP BY t ORDER BY t DESC
// LIMIT $LIMIT
func (d *Devops) GroupByOrderByLimit(qi query.Query) {
interval := d.Interval.MustRandWindow(time.Hour)
selectClause := d.getSelectClausesAggMetricsString("MAX_VALUE", []string{"usage_user"})

sql := ""
sql = sql + fmt.Sprintf("SELECT %s", selectClause)
sql = sql + fmt.Sprintf(" FROM %s.cpu.*", d.BasicPath)
startTime := interval.Start()
endTime := interval.End()
optimizedStartTime := endTime.Add(time.Second * -300) // 5 mins ago
if optimizedStartTime.After(startTime) {
startTime = optimizedStartTime
}
sql = sql + fmt.Sprintf(" GROUP BY ([%s, %s), 1m), LEVEL = %d", startTime.Format(iotdbTimeFmt), endTime.Format(iotdbTimeFmt), d.BasicPathLevel+1)
sql = sql + " ORDER BY TIME DESC LIMIT 5"

humanLabel := "IoTDB max cpu over last 5 min-intervals (random end)"
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())

d.fillInQuery(qi, humanLabel, humanDesc, sql)
}

// HighCPUForHosts populates a query that gets CPU metrics when the CPU has high
// usage between a time period for a number of hosts (if 0, it will search all hosts),
// e.g. in pseudo-SQL:
//
// SELECT * FROM cpu
// WHERE usage_user > 90.0
// AND time >= '$TIME_START' AND time < '$TIME_END'
// AND (hostname = '$HOST' OR hostname = '$HOST2'...)
func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
interval := d.Interval.MustRandWindow(devops.HighCPUDuration)

var fromHosts string
if nHosts <= 0 {
fromHosts = fmt.Sprintf("%s.cpu.*", d.BasicPath)
} else {
fromHosts = d.getHostFromString(nHosts)
}

humanLabel, err := devops.GetHighCPULabel("IoTDB", nHosts)
panicIfErr(err)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())

sql := "SELECT *"
sql = sql + fmt.Sprintf(" FROM %s", fromHosts)
sql = sql + fmt.Sprintf(" WHERE usage_user > 90 AND time >= %s AND time < %s ALIGN BY DEVICE", interval.Start().Format(iotdbTimeFmt), interval.End().Format(iotdbTimeFmt))

d.fillInQuery(qi, humanLabel, humanDesc, sql)
}
Loading