Skip to content

Commit

Permalink
test: Updated tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Aug 11, 2021
1 parent cb038e8 commit 69230e4
Show file tree
Hide file tree
Showing 15 changed files with 78 additions and 55 deletions.
2 changes: 1 addition & 1 deletion test/db-e2e/db_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestDBSource(t *testing.T) {
InitSchema: true,
},
}},
Sinks: []Sink{{Log: &Log{}}},
Sinks: []Sink{DefaultLogSink},
},
},
},
Expand Down
4 changes: 2 additions & 2 deletions test/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ func SkipIfCI(t *testing.T) {
}

func Setup(t *testing.T) (teardown func()) {
log.Println("set-up")
DeletePipelines()
WaitForPodsToBeDeleted()

stopTestAPIPortForward = StartPortForward("testapi-0", 8378)

ResetCount()

log.Printf("🌀 START: %s", t.Name())

return func() {
log.Println("tear-down")
stopTestAPIPortForward()
r := recover() // tests should panic on error, we recover so we can run other tests
if r != nil {
Expand Down
4 changes: 2 additions & 2 deletions test/http-stress/http_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestHTTPSourceStress(t *testing.T) {

defer StartTPSReporter(t, "main", prefix, n)()

go PumpHTTP("https://http-main/sources/default", prefix, n)
go PumpHTTP("https://http-main/sources/default", prefix, n, Params.MessageSize)
WaitForStep(TotalSunkMessages(n), Params.Timeout)
}

Expand All @@ -55,7 +55,7 @@ func TestHTTPSinkStress(t *testing.T) {
Cat: &Cat{},
Replicas: Params.Replicas,
Sources: []Source{{HTTP: &HTTPSource{}}},
Sinks: []Sink{{HTTP: &HTTPSink{URL: "http://testapi/count/incr"}}, {Name: "log", Log: &Log{}}},
Sinks: []Sink{{HTTP: &HTTPSink{URL: "http://testapi/count/incr"}}, DefaultLogSink},
}},
},
})
Expand Down
20 changes: 11 additions & 9 deletions test/http-stress/test-results.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
{
"TestHTTPSinkStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 850,
"TestHTTPSinkStress/currentContext=docker-desktop,replicas=2,n=10000.tps": 400,
"TestHTTPSinkStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=1,n=10000.tps": 950,
"TestHTTPSinkStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=2,n=10000.tps": 650,
"TestHTTPSourceStress/currentContext=docker-desktop,replicas=1,n=10000,messageSize=1000000.tps": 1500,
"TestHTTPSourceStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 1500,
"TestHTTPSourceStress/currentContext=docker-desktop,replicas=2,n=10000.tps": 850,
"TestHTTPSourceStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=1,n=10000.tps": 1650,
"TestHTTPSourceStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=2,n=10000.tps": 2300
"TestHTTPSinkStress/.tps": 700,
"TestHTTPSinkStress/N=10,messageSize=100.tps": 0,
"TestHTTPSinkStress/N=10,messageSize=1000.tps": 500,
"TestHTTPSinkStress/messageSize=1000.tps": 450,
"TestHTTPSinkStress/replicas=2.tps": 650,
"TestHTTPSourceStress/.tps": 1000,
"TestHTTPSourceStress/N=10,messageSize=100.tps": 0,
"TestHTTPSourceStress/N=10,messageSize=1000.tps": 1050,
"TestHTTPSourceStress/messageSize=1000.tps": 550,
"TestHTTPSourceStress/messageSize=1000000.tps": 600,
"TestHTTPSourceStress/replicas=2.tps": 950
}
4 changes: 2 additions & 2 deletions test/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func SendMessageViaHTTP(msg string) {
}

func PumpHTTP(_url, prefix string, n int, opts ...interface{}) {
size := 16
size := 0
for _, opt := range opts {
switch v := opt.(type) {
case int:
Expand All @@ -39,7 +39,7 @@ func PumpHTTP(_url, prefix string, n int, opts ...interface{}) {
panic(fmt.Errorf("unknown option type %T", opt))
}
}
log.Printf("sending %d messages %q via HTTP to %q\n", n, prefix, _url)
log.Printf("sending %d messages sized %d prefixed %q via HTTP to %q\n", n, size, prefix, _url)
InvokeTestAPI("/http/pump?url=%s&prefix=%s&n=%d&sleep=0&size=%d", url.QueryEscape(_url), prefix, n, size)
}

Expand Down
2 changes: 1 addition & 1 deletion test/kafka-fmea/kafka_fmea_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestKafkaFMEA_PipelineDeletedDisruption(t *testing.T) {
Cat: &Cat{},
Sources: []Source{{Kafka: &KafkaSource{Kafka: Kafka{Topic: topic}}}},
Sinks: []Sink{
{Name: "log", Log: &Log{}},
DefaultLogSink,
{HTTP: &HTTPSink{URL: "http://testapi/count/incr"}},
},
}},
Expand Down
6 changes: 3 additions & 3 deletions test/kafka-stress/kafka_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestKafkaSourceStress(t *testing.T) {

defer StartTPSReporter(t, "main", prefix, n)()

go PumpKafkaTopic(topic, n, prefix)
go PumpKafkaTopic(topic, n, prefix, Params.MessageSize)
WaitForStep(TotalSunkMessages(n), Params.Timeout)
}

Expand All @@ -63,7 +63,7 @@ func TestKafkaSinkStress(t *testing.T) {
Sources: []Source{{Kafka: &KafkaSource{Kafka: Kafka{Topic: topic}}}},
Sinks: []Sink{
{Kafka: &KafkaSink{Async: Params.Async, Kafka: Kafka{Topic: sinkTopic}}},
{Name: "log", Log: &Log{}},
DefaultLogSink,
},
}},
},
Expand All @@ -80,6 +80,6 @@ func TestKafkaSinkStress(t *testing.T) {

defer StartTPSReporter(t, "main", prefix, n)()

go PumpKafkaTopic(topic, n, prefix)
go PumpKafkaTopic(topic, n, prefix, Params.MessageSize)
WaitForStep(TotalSunkMessages(n*2), Params.Timeout) // 2 sinks
}
20 changes: 11 additions & 9 deletions test/kafka-stress/test-results.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
{
"TestKafkaSinkStress/currentContext=docker-desktop,replicas=1,n=10000,async=true.tps": 400,
"TestKafkaSinkStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 650,
"TestKafkaSinkStress/replicas=1.tps": 100,
"TestKafkaSinkStress/replicas=2.tps": 250,
"TestKafkaSourceStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 1200,
"TestKafkaSourceStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=1,n=10000.tps": 1700,
"TestKafkaSourceStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=2,n=10000.tps": 1550,
"TestKafkaSourceStress/replicas=1.tps": 100,
"TestKafkaSourceStress/replicas=2.tps": 250
"TestKafkaSinkStress/.tps": 350,
"TestKafkaSinkStress/N=10,messageSize=100.tps": 200,
"TestKafkaSinkStress/N=10,messageSize=1000.tps": 150,
"TestKafkaSinkStress/async=true.tps": 400,
"TestKafkaSinkStress/messageSize=1000.tps": 300,
"TestKafkaSinkStress/replicas=2.tps": 400,
"TestKafkaSourceStress/.tps": 1100,
"TestKafkaSourceStress/N=10,messageSize=100.tps": 450,
"TestKafkaSourceStress/N=10,messageSize=1000.tps": 650,
"TestKafkaSourceStress/messageSize=1000.tps": 850,
"TestKafkaSourceStress/replicas=2.tps": 950
}
9 changes: 7 additions & 2 deletions test/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ func CreateKafkaTopic() string {
func PumpKafkaTopic(topic string, n int, opts ...interface{}) {
var sleep time.Duration
var prefix string
var size int
for _, opt := range opts {
switch v := opt.(type) {
case time.Duration:
sleep = v
case string:
prefix = v
case int:
size = v
default:
panic(fmt.Errorf("unexpected option type %T", opt))
}
}
log.Printf("puming Kafka topic %q sleeping %v with %d messages\n", topic, sleep, n)
InvokeTestAPI("/kafka/pump-topic?topic=%s&sleep=%v&n=%d&prefix=%s", topic, sleep, n, prefix)
log.Printf("puming Kafka topic %q sleeping %v with %d messages sized %d\n", topic, sleep, n, size)
InvokeTestAPI("/kafka/pump-topic?topic=%s&sleep=%v&n=%d&prefix=%s&size=%d", topic, sleep, n, prefix, size)
}
4 changes: 2 additions & 2 deletions test/log_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package test
import dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"

var (
truncate = uint64(16)
truncate = uint64(32)
truncatePtr = &truncate
DefaultLogSink = dfv1.Sink{Log: &dfv1.Log{Truncate: truncatePtr}}
DefaultLogSink = dfv1.Sink{Name: "log", Log: &dfv1.Log{Truncate: truncatePtr}}
)
6 changes: 3 additions & 3 deletions test/stan-stress/stan_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestStanSourceStress(t *testing.T) {

defer StartTPSReporter(t, "main", prefix, n)()

go PumpSTANSubject(longSubject, n, prefix)
go PumpSTANSubject(longSubject, n, prefix, Params.MessageSize)
WaitForStep(TotalSunkMessages(n), Params.Timeout)
}

Expand All @@ -61,7 +61,7 @@ func TestStanSinkStress(t *testing.T) {
Cat: &Cat{},
Replicas: Params.Replicas,
Sources: []Source{{STAN: &STAN{Subject: subject}}},
Sinks: []Sink{{STAN: &STAN{Subject: sinkSubject}}, {Name: "log", Log: &Log{}}},
Sinks: []Sink{{STAN: &STAN{Subject: sinkSubject}}, DefaultLogSink},
}},
},
})
Expand All @@ -76,6 +76,6 @@ func TestStanSinkStress(t *testing.T) {
prefix := "stan-sink-stress"
defer StartTPSReporter(t, "main", prefix, n)()

go PumpSTANSubject(longSubject, n, prefix)
go PumpSTANSubject(longSubject, n, prefix, Params.MessageSize)
WaitForStep(TotalSunkMessages(n*2), Params.Timeout) // 2 sinks
}
19 changes: 9 additions & 10 deletions test/stan-stress/test-results.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
{
"TestStanSinkStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 300,
"TestStanSinkStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=1,n=10000.tps": 200,
"TestStanSinkStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=2,n=10000.tps": 300,
"TestStanSinkStress/replicas=1.tps": 100,
"TestStanSinkStress/replicas=2.tps": 200,
"TestStanSinkStress/.tps": 200,
"TestStanSinkStress/N=10,messageSize=100.tps": 150,
"TestStanSinkStress/replicas=2.tps": 300,
"TestStanSinkStress/replicas=2tps": 250,
"TestStanSinkStress/replicas=3.tps": 300,
"TestStanSinkStress/replicas=4.tps": 300,
"TestStanSinkStress/replicas=5.tps": 350,
"TestStanSourceStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 1150,
"TestStanSourceStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=1,n=10000.tps": 1500,
"TestStanSourceStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=2,n=10000.tps": 2850,
"TestStanSourceStress/replicas=1.tps": 550,
"TestStanSourceStress/replicas=2.tps": 700,
"TestStanSourceStress/.tps": 1050,
"TestStanSourceStress/N=10,messageSize=100.tps": 850,
"TestStanSourceStress/N=10,messageSize=1000.tps": 400,
"TestStanSourceStress/messageSize=1000.tps": 950,
"TestStanSourceStress/replicas=2.tps": 1300,
"TestStanSourceStress/replicas=3.tps": 750,
"TestStanSourceStress/replicas=4.tps": 750,
"TestStanSourceStress/replicas=5.tps": 750,
Expand Down
9 changes: 7 additions & 2 deletions test/stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ func RandomSTANSubject() (longSubject string, subject string) {
func PumpSTANSubject(subject string, n int, opts ...interface{}) {
var sleep time.Duration
var prefix string
var size int
for _, opt := range opts {
switch v := opt.(type) {
case time.Duration:
sleep = v
case string:
prefix = v
case int:
size = v
default:
panic(fmt.Errorf("unexpected option type %T", opt))
}
}
log.Printf("puming stan subject %q sleeping %v with %d messages\n", subject, sleep, n)
InvokeTestAPI("/stan/pump-subject?subject=%s&sleep=%v&n=%d&prefix=%s", subject, sleep, n, prefix)
log.Printf("puming stan subject %q sleeping %v with %d messages sized %d\n", subject, sleep, n, size)
InvokeTestAPI("/stan/pump-subject?subject=%s&sleep=%v&n=%d&prefix=%s&size=%d", subject, sleep, n, prefix, size)
}
6 changes: 3 additions & 3 deletions test/stress/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ func setTestResult(testName string, key string, value int) {
}
x := make(map[string]int)
if err := json.Unmarshal(data, &x); err != nil {
panic(err)
panic(fmt.Errorf("failed to unmarshall JSON results: %w", err))
}
x[fmt.Sprintf("%s.%s", testName, key)] = value
if data, err := json.MarshalIndent(x, "", " "); err != nil {
panic(err)
panic(fmt.Errorf("failed to marshall JSON results: %w", err))
} else {
if err := ioutil.WriteFile(filename, data, 0o600); err != nil {
panic(err)
panic(fmt.Errorf("failed to write results file: %w", err))
}
}
}
18 changes: 14 additions & 4 deletions test/stress/tps.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log"
"math"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -68,14 +69,23 @@ func StartTPSReporter(t *testing.T, step, prefix string, n int) (stopTPSLogger f
if start == nil || end == nil {
panic("failed to calculate start time or end time")
}
textName := fmt.Sprintf("%s/currentContext=%s,replicas=%d,n=%d", t.Name(), currentContext, Params.Replicas, Params.N)
var params []string
if currentContext != "docker-desktop" {
params = append(params, "currentContext="+currentContext)
}
if Params.Replicas != 1 {
params = append(params, fmt.Sprintf("replicas=%d", Params.Replicas))
}
if Params.N != 10000 {
params = append(params, fmt.Sprintf("N=%d", Params.N))
}
if Params.Async {
textName += ",async=true"
params = append(params, "async=true")
}
if Params.MessageSize > 0 {
textName += fmt.Sprintf(",messageSize=%d", Params.MessageSize)
params = append(params, fmt.Sprintf("messageSize=%d", Params.MessageSize))
}
setTestResult(textName, "tps", roundToNearest50(value()))
setTestResult(fmt.Sprintf("%s/%s", t.Name(), strings.Join(params, ",")), "tps", roundToNearest50(value()))
}
}

Expand Down

0 comments on commit 69230e4

Please sign in to comment.