Skip to content

Commit

Permalink
support the streaming RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
hslam committed Dec 17, 2021
1 parent aa785e4 commit 7485e8c
Show file tree
Hide file tree
Showing 26 changed files with 678 additions and 618 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ Package rpc implements a remote procedure call over TCP, UNIX, HTTP and WS. The
* **[Codec](https://github.com/hslam/codec "codec")** json/[code](https://github.com/hslam/code "code")/pb
* Multiplexing/Pipelining
* [Auto batching](https://github.com/hslam/writer "writer")
* Call/Go/RoundTrip/Ping/Watch/CallWithContext
* Server push
* Call/Go/RoundTrip/Ping/Stream/CallWithContext
* Conn/Transport/Client
* TLS

Expand All @@ -28,7 +27,6 @@ Package rpc implements a remote procedure call over TCP, UNIX, HTTP and WS. The
|Pipelining|No|No|Yes|No|No|
|Auto Batching|No|No|Yes|No|No|
|Transport|No|No|Yes|No|No|
|Server Push|No|No|Yes|Yes|Yes|

## [Benchmark](http://github.com/hslam/rpc-benchmark "rpc-benchmark")

Expand Down
12 changes: 6 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,19 @@ func (c *Client) CallWithContext(ctx context.Context, serviceMethod string, args
return err
}

// Watch returns the Watcher.
func (c *Client) Watch(key string) (Watcher, error) {
// NewStream creates a new Stream for the client side.
func (c *Client) NewStream(serviceMethod string) (Stream, error) {
address, target, err := c.director()
if err != nil {
return c.transport().Watch("", key)
return c.transport().NewStream("", serviceMethod)
}
if len(address) > 0 {
return c.transport().Watch(address, key)
return c.transport().NewStream(address, serviceMethod)
}
start := time.Now()
watcher, err := c.transport().Watch(target.address, key)
stream, err := c.transport().NewStream(target.address, serviceMethod)
target.Update(c.Alpha, int64(time.Now().Sub(start)), err)
return watcher, err
return stream, err
}

// Go invokes the function asynchronously. It returns the Call structure representing
Expand Down
49 changes: 1 addition & 48 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ func TestClient(t *testing.T) {
network := "tcp"
addr := ":9999"
codec := "json"
var k = "foo"
var str = "bar"
opts := DefaultOptions()
opts.Network = network
opts.Codec = codec
Expand All @@ -32,31 +30,12 @@ func TestClient(t *testing.T) {
server.ListenWithOptions(addr, opts)
}()
time.Sleep(time.Millisecond * 10)
server.PushFunc(func(key string) (value []byte, ok bool) {
if key == k {
return []byte(str), true
}
return nil, false
})

client := NewClient(opts, addr)
err = client.Ping()
if err != nil {
t.Error(err)
}
watch, err := client.Watch(k)
if err != nil {
t.Error(err)
}
v, err := watch.Wait()
if err != nil {
t.Error(err)
} else if string(v) != str {
t.Error(string(v))
}
watch.Stop()
if _, err := watch.Wait(); err == nil {
t.Error()
}
A := int32(4)
B := int32(8)
req := &service.ArithRequest{A: A, B: B}
Expand Down Expand Up @@ -113,10 +92,6 @@ func TestClient(t *testing.T) {
if err == nil {
t.Error()
}
_, err := client.Watch(k)
if err == nil {
t.Error()
}
A := int32(4)
B := int32(8)
req := &service.ArithRequest{A: A, B: B}
Expand Down Expand Up @@ -210,8 +185,6 @@ func TestClientLeastTime(t *testing.T) {
network := "tcp"
addrs := []string{":9997", ":9998", ":9999"}
codec := "json"
var k = "foo"
var str = "bar"
opts := DefaultOptions()
opts.Network = network
opts.Codec = codec
Expand All @@ -230,33 +203,13 @@ func TestClientLeastTime(t *testing.T) {
}()
}
time.Sleep(time.Millisecond * 10)
server.PushFunc(func(key string) (value []byte, ok bool) {
if key == k {
return []byte(str), true
}
return nil, false
})
client := NewClient(opts, addrs...)
client.Scheduling = LeastTimeScheduling
for i := 0; i < 64; i++ {
err = client.Ping()
if err != nil {
t.Error(err)
}
watch, err := client.Watch(k)
if err != nil {
t.Error(err)
}
v, err := watch.Wait()
if err != nil {
t.Error(err)
} else if string(v) != str {
t.Error(string(v))
}
watch.Stop()
if _, err := watch.Wait(); err == nil {
t.Error()
}
A := int32(4)
B := int32(8)
req := &service.ArithRequest{A: A, B: B}
Expand Down
2 changes: 2 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type Context struct {
sending *sync.Mutex
codec ServerCodec
value []byte
stream *stream
ctx *Context
}

// Reset resets the Context.
Expand Down
2 changes: 1 addition & 1 deletion codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestServerCodecAndClientCodec(t *testing.T) {
server.Close()
var ctx *Context
ctx = &Context{Error: "error", codec: &mockServerCodec{}, upgrade: getUpgrade(), buffer: server.bufferPool.GetBuffer(0)}
if err := server.ServeRequest(ctx, nil, nil, nil); err == nil {
if err := server.ServeRequest(ctx, nil, nil, nil, nil); err == nil {
t.Error()
}
ctx = &Context{Error: "error", codec: &mockServerCodec{}, upgrade: getUpgrade(), buffer: server.bufferPool.GetBuffer(0)}
Expand Down
Loading

0 comments on commit 7485e8c

Please sign in to comment.