Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change approach for request simulation and prediction #1

Merged
merged 3 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 30 additions & 21 deletions app-loadtest/app-loadtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ import (
)

type Task struct {
ID int
Num int
ID int
}


var (
totalRequestsProcessed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "app_loadtest_requests_total",
Expand Down Expand Up @@ -96,7 +94,7 @@ func getSSMParam(parameterName string) (string, error){
}

var mutex sync.Mutex
func main() {
func main(){
taskQueue := make(chan Task, 1000)
var wg sync.WaitGroup
appLoadtestReplicasCount, err := getSSMParam("/khanh-thesis/app_loadtest_replicas")
Expand Down Expand Up @@ -137,44 +135,54 @@ func main() {
requestURL := fmt.Sprintf("http://app-simulate.app-simulate.svc.cluster.local:5000/bytes?%s", queryParams.Encode())
// requestURL := fmt.Sprintf("http://localhost:5000/bytes?%s", queryParams.Encode())
// requestURL := fmt.Sprintf("http://google.com")
duration := 20*time.Second
interval := duration/time.Duration(numRequests)
fmt.Println("DATA INPUT: ")
fmt.Println("Bytes response each request: ", bytesResponseEachRequest)
fmt.Println("Number of request per minutes: ", numRequests)
fmt.Println("Goroutine interval: ", interval)
fmt.Println("START SENDING REQUEST: ")

currentTime := time.Now()
nextMinute := currentTime.Truncate(time.Minute).Add(time.Minute)
fmt.Println("currentTime: ", currentTime)
fmt.Println("nextMinute :", nextMinute)
duration := nextMinute.Sub(currentTime)
fmt.Println("duration :", duration)
nanoseconds := duration.Nanoseconds()
time.Sleep(time.Duration(nanoseconds) * time.Nanosecond)

// Create pool for goroutines
poolSize := 100
poolSize := 1000
for i := 0; i < poolSize; i++ {
go worker(taskQueue, i, requestURL, &wg)
}
for {
startTime := time.Now()
startTimeRequest := time.Now()
// Generate tasks and send them to taskQueue
numRequestsEachReplica := numRequests/numReplicas
fmt.Println("START:===============================")
fmt.Println("numRequests each replica: ", numRequestsEachReplica)
time.Sleep(10000 * time.Millisecond)
for i := 0; i < numRequestsEachReplica; i ++ {
task := Task{ID: i, Num: i + 1}
task := Task{ID: i}
taskQueue <- task
wg.Add(1)
}

// Close the taskQueue to signal that no more tasks will be added
currentTime := time.Now()
nextMinute := currentTime.Truncate(time.Minute).Add(time.Minute)
fmt.Println("currentTime: ", currentTime)
fmt.Println("nextMinute :", nextMinute )
duration := nextMinute.Sub(currentTime)
fmt.Println("duration :", duration)
nanoseconds := duration.Nanoseconds()
time.Sleep(time.Duration(nanoseconds) * time.Nanosecond)
fmt.Println("nanoseconds :", nanoseconds)
fmt.Println("END:===============================")
// close(taskQueue)
wg.Wait()
endTime := time.Now()
loadtestDuration := endTime.Sub(startTime)
endTimeRequest := time.Now()
loadtestDuration := endTimeRequest.Sub(startTimeRequest)
loadtestSeconds := loadtestDuration.Seconds()
timeSleep := 0.0
if loadtestSeconds < 60.0 {
timeSleep = 60.0 - loadtestSeconds
}
appLoadtestResponseDurationAll.Set(loadtestSeconds)
fmt.Println("loadtestSeconds: ", loadtestSeconds)
fmt.Println("timeSleep: ",timeSleep)
time.Sleep(time.Duration(int(timeSleep)) * time.Second)
fmt.Println("=============================")
}
}

Expand All @@ -201,6 +209,7 @@ func processTask(task Task, pool int, requestURL string) {
responseSize.Observe(float64(numBytes))
successRequestsCount.Inc()
}
// time.Sleep(100 * time.Millisecond)
endTimeEachRequest := time.Now()
loadtestDurationEachRequest := endTimeEachRequest.Sub(startTimeEachRequest)
loadtestSecondsEachRequest := loadtestDurationEachRequest.Seconds()
Expand Down
223 changes: 223 additions & 0 deletions app-loadtest/app-test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package main

import (
"fmt"
"log"
"strconv"
"io/ioutil"
"net/http"
"net/url"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ssm"
)

type Task struct {
ID int
}

var (
totalRequestsProcessed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "app_loadtest_requests_total",
Help: "Total number of app load test requests made by the app",
})

successRequestsCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "app_loadtest_requests_successful_total",
Help: "Total number of app load test requests successful made by the app",
})

errorRequestsCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "app_loadtest_requests_failed_total",
Help: "Total number of app load test requests failed made by the app",
})

responseSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "app_loadtest_response_size_bytes",
Help: "Size of HTTP app load test responses",
Buckets: []float64{100, 200, 300, 400, 500, 1000, 2000},
})

responseDurationEachRequest = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "app_loadtest_response_duration_each_request",
Help: "Duration of HTTP app load test response each request",
})

appLoadtestResponseDurationAll = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "app_loadtest_response_duration_all",
Help: "Duration of HTTP app load test response all requests received",
})
)

var (
sess *session.Session
ssmClient *ssm.SSM
)

func init() {
region := "ap-southeast-1"
// Initialize the AWS session and SSM client
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
})
if err != nil {
log.Fatal(err)
}
ssmClient = ssm.New(sess)
// Add go runtime metrics and process collectors.
prometheus.MustRegister(
totalRequestsProcessed,
successRequestsCount,
errorRequestsCount,
responseSize,
responseDurationEachRequest,
appLoadtestResponseDurationAll,
)
}

func getSSMParam(parameterName string) (string, error){
// Create an input object for the GetParameter API
input := &ssm.GetParameterInput{
Name: aws.String(parameterName),
WithDecryption: aws.Bool(true),
}
result, err := ssmClient.GetParameter(input)
if err != nil {
return "", fmt.Errorf("failed to retrieve parameter: %v", err)
}
return *result.Parameter.Value, nil
}

var mutex sync.Mutex
func main(){
taskQueue := make(chan Task, 1000)
var wg sync.WaitGroup
appLoadtestReplicasCount, err := getSSMParam("/khanh-thesis/app_loadtest_replicas")
if err != nil {
fmt.Println("Error retrieving replicas number param:", err)
return
}
appLoadtestRequestParamName, err := getSSMParam("/khanh-thesis/app_loadtest_request")
if err != nil {
fmt.Println("Error retrieving request number param:", err)
return
}
appLoadtestBytesParamName, err := getSSMParam("/khanh-thesis/app_loadtest_bytes")
if err != nil {
fmt.Println("Error retrieving bytes:", err)
return
}
numReplicas, err := strconv.Atoi(appLoadtestReplicasCount)
if err != nil {
log.Fatal(err)
}
numBytes, err := strconv.Atoi(appLoadtestBytesParamName)
if err != nil {
log.Fatal(err)
}
// Start the HTTP server to expose metrics
go func() {
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(":8000", nil))
}()
numRequests, err := strconv.Atoi(appLoadtestRequestParamName)
if err != nil {
log.Fatal(err)
}
bytesResponseEachRequest := strconv.Itoa(numBytes/numRequests)
queryParams := url.Values{}
queryParams.Set("num_bytes", bytesResponseEachRequest)
// requestURL := fmt.Sprintf("http://app-simulate.app-simulate.svc.cluster.local:5000/bytes?%s", queryParams.Encode())
// requestURL := fmt.Sprintf("http://localhost:5000/bytes?%s", queryParams.Encode())
requestURL := fmt.Sprintf("http://google.com")
fmt.Println("DATA INPUT: ")
fmt.Println("Bytes response each request: ", bytesResponseEachRequest)
fmt.Println("Number of request per minutes: ", numRequests)
fmt.Println("START SENDING REQUEST: ")

currentTime := time.Now()
nextMinute := currentTime.Truncate(time.Minute).Add(time.Minute)
fmt.Println("currentTime: ", currentTime)
fmt.Println("nextMinute :", nextMinute)
duration := nextMinute.Sub(currentTime)
fmt.Println("duration :", duration)
nanoseconds := duration.Nanoseconds()
time.Sleep(time.Duration(nanoseconds) * time.Nanosecond)

// Create pool for goroutines
poolSize := 100
for i := 0; i < poolSize; i++ {
go worker(taskQueue, i, requestURL, &wg)
}
for {
startTimeRequest := time.Now()
// Generate tasks and send them to taskQueue
numRequestsEachReplica := numRequests/numReplicas
fmt.Println("numRequests each replica: ", numRequestsEachReplica)
fmt.Println("START:===============================")
for i := 0; i < numRequestsEachReplica; i ++ {
task := Task{ID: i}
taskQueue <- task
wg.Add(1)
}
currentTime := time.Now()
nextMinute := currentTime.Truncate(time.Minute).Add(time.Minute)
fmt.Println("currentTime: ", currentTime)
fmt.Println("nextMinute :", nextMinute )
duration := nextMinute.Sub(currentTime)
fmt.Println("duration :", duration)
nanoseconds := duration.Nanoseconds()
time.Sleep(time.Duration(nanoseconds) * time.Nanosecond)
fmt.Println("nanoseconds :", nanoseconds)
fmt.Println("END:===============================")
// close(taskQueue)
wg.Wait()
endTimeRequest := time.Now()
loadtestDuration := endTimeRequest.Sub(startTimeRequest)
loadtestSeconds := loadtestDuration.Seconds()
appLoadtestResponseDurationAll.Set(loadtestSeconds)
fmt.Println("loadtestSeconds: ", loadtestSeconds)
}
}

func worker(taskQueue chan Task, pool int, requestURL string, wg *sync.WaitGroup) {
for task := range taskQueue {
processTask(task, pool, requestURL)
wg.Done()
}
}

func processTask(task Task, pool int, requestURL string) {
startTimeEachRequest := time.Now()
rs, err := http.Get(requestURL)
if err != nil {
errorRequestsCount.Inc()
} else {
defer rs.Body.Close()
// Read the response body to get the bytes
responseBody, err := ioutil.ReadAll(rs.Body)
if err != nil {
errorRequestsCount.Inc()
}
numBytes := len(responseBody)
responseSize.Observe(float64(numBytes))
successRequestsCount.Inc()
}
endTimeEachRequest := time.Now()
loadtestDurationEachRequest := endTimeEachRequest.Sub(startTimeEachRequest)
loadtestSecondsEachRequest := loadtestDurationEachRequest.Seconds()
// if loadtestSecondsEachRequest < 0.002 {
// fmt.Println("loadtestSecondsEachRequest ",loadtestSecondsEachRequest)
// }
responseDurationEachRequest.Set(loadtestSecondsEachRequest)

// Increment the requests counter
mutex.Lock()
totalRequestsProcessed.Inc()
mutex.Unlock()
}
9 changes: 8 additions & 1 deletion app-loadtest/deploy/02-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,16 @@ spec:
values:
- app-loadtest
topologyKey: "kubernetes.io/hostname"
tolerations:
- key: "node-group"
operator: "Equal"
value: "request"
effect: "NoExecute"
nodeSelector:
eks.amazonaws.com/nodegroup: app-request
containers:
- name: app-loadtest
image: 832438989008.dkr.ecr.ap-southeast-1.amazonaws.com/app-loadtest@sha256:7ae3c0d1436c21fabbea6af8a2615ba1cd80fcbfc724ae2cf72bc8bc41fcf6a5
image: 832438989008.dkr.ecr.ap-southeast-1.amazonaws.com/app-loadtest@sha256:05f059352185538396bd6f4056dca78070ca17c646d5069d04e9cb9e45098918
ports:
- name: prometheus
containerPort: 8000
Expand Down
35 changes: 26 additions & 9 deletions app-simulate/deploy/02-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ metadata:
prometheus.io/port: "15020"
prometheus.io/scrape: "true"
spec:
replicas: 4
replicas: 1
selector:
matchLabels:
app: app-simulate
Expand All @@ -25,14 +25,31 @@ spec:
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- app-simulate
topologyKey: "kubernetes.io/hostname"
# requiredDuringSchedulingIgnoredDuringExecution:
# - labelSelector:
# matchExpressions:
# - key: app
# operator: In
# values:
# - app-simulate
# topologyKey: "kubernetes.io/hostname"
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- app-simulate
topologyKey: "kubernetes.io/hostname"
tolerations:
- key: "node-group"
operator: "Equal"
value: "app"
effect: "NoExecute"
nodeSelector:
eks.amazonaws.com/nodegroup: app-app
containers:
- name: app-simulate
image: 832438989008.dkr.ecr.ap-southeast-1.amazonaws.com/app-simulate@sha256:593a9d2443ac0b2bd8111202baab0c66b6acef0116d167ae1b6777173a5e27bd
Expand Down
Loading
Loading