Skip to content

Commit

Permalink
add benchmark for stream
Browse files Browse the repository at this point in the history
Signed-off-by: hslam <[email protected]>
  • Loading branch information
hslam committed Feb 22, 2023
1 parent 83c0c18 commit 725baaa
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 0 deletions.
80 changes: 80 additions & 0 deletions benchmarks/stream/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"flag"
"fmt"
"github.com/hslam/rpc"
"github.com/hslam/rpc/benchmarks/stream/service"
"github.com/hslam/stats"
"log"
"math/rand"
)

var network string
var addr string
var codec string
var clients int
var total int
var parallel int
var bar bool

func init() {
flag.StringVar(&network, "network", "tcp", "-network=tcp")
flag.StringVar(&addr, "addr", ":9999", "-addr=:9999")
flag.StringVar(&codec, "codec", "pb", "-codec=code")
flag.IntVar(&total, "total", 100000, "-total=100000")
flag.IntVar(&parallel, "parallel", 1, "-parallel=1")
flag.IntVar(&clients, "clients", 1, "-clients=1")
flag.BoolVar(&bar, "bar", true, "-bar=true")
flag.Parse()
stats.SetBar(bar)
fmt.Printf("./client -network=%s -addr=%s -codec=%s -total=%d -parallel=%d -clients=%d\n", network, addr, codec, total, parallel, clients)
}

func main() {
if clients < 1 || parallel < 1 || total < 1 {
return
}
var wrkClients []stats.Client
for i := 0; i < clients; i++ {
conn, err := rpc.Dial(network, addr, codec)
if err != nil {
log.Fatalln("dailing error: ", err)
}
conn.SetNoCopy(true)
for j := 0; j < parallel; j++ {
stream, err := conn.NewStream("Arith.StreamMultiply")
if err != nil {
panic(err)
}
wrkClients = append(wrkClients, &WrkClient{Stream: stream, buf: make([]byte, 8)})
}
}
parallel = 1
stats.StartPrint(parallel, total, wrkClients)
}

type WrkClient struct {
rpc.Stream
buf []byte
}

func (c *WrkClient) Call() (int64, int64, bool) {
A := rand.Int31n(100)
B := rand.Int31n(100)
req := &service.ArithRequest{A: A, B: B}
var res service.ArithResponse
if err := c.Stream.WriteMessage(req); err != nil {
fmt.Println(err)
return 0, 0, false
}
if err := c.Stream.ReadMessage(c.buf, &res); err != nil {
fmt.Println(err)
return 0, 0, false
}
if res.Pro == A*B {
return 0, 0, true
}
fmt.Printf("err %d * %d = %d\n", A, B, res.Pro)
return 0, 0, false
}
12 changes: 12 additions & 0 deletions benchmarks/stream/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package main

import (
"github.com/hslam/rpc"
"github.com/hslam/rpc/benchmarks/stream/service"
)

func main() {
rpc.Register(new(service.Arith))
rpc.SetNoCopy(true)
rpc.Listen("tcp", ":9999", "pb")
}
46 changes: 46 additions & 0 deletions benchmarks/stream/service/arith.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package service

import (
"github.com/hslam/rpc"
)

//Arith defines the struct of arith.
type Arith struct{}

//StreamMultiply operation
func (a *Arith) StreamMultiply(stream *Stream) error {
for {
buf := make([]byte, 64)
var req = &ArithRequest{}
if err := stream.Read(buf, req); err != nil {
return err
}
res := ArithResponse{}
res.Pro = req.A * req.B
if err := stream.Write(&res); err != nil {
println(err.Error())
return err
}
}
}

// Stream is used to connect rpc Stream.
type Stream struct {
stream rpc.Stream
}

// Connect connects rpc Stream.
func (s *Stream) Connect(stream rpc.Stream) error {
s.stream = stream
return nil
}

//Read reads a message from the rpc stream.
func (s *Stream) Read(buf []byte, req *ArithRequest) error {
return s.stream.ReadMessage(buf, req)
}

//Write writes a message to the rpc stream.
func (s *Stream) Write(res *ArithResponse) error {
return s.stream.WriteMessage(res)
}
171 changes: 171 additions & 0 deletions benchmarks/stream/service/arith.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 725baaa

Please sign in to comment.