Skip to content

Commit 55d48bf

Browse files
authored
feat(mono/zip): add 'ZipWith' operator and reimplement 'Zip' (#95)
1 parent d693310 commit 55d48bf

File tree

11 files changed

+245
-82
lines changed

11 files changed

+245
-82
lines changed

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ module github.com/rsocket/rsocket-go
33
go 1.11
44

55
require (
6-
github.com/golang/mock v1.4.3
6+
github.com/golang/mock v1.4.4
77
github.com/google/uuid v1.1.2
88
github.com/gorilla/websocket v1.4.2
9-
github.com/jjeffcaii/reactor-go v0.4.5
9+
github.com/jjeffcaii/reactor-go v0.5.0
1010
github.com/pkg/errors v0.9.1
1111
github.com/pkg/profile v1.5.0
1212
github.com/stretchr/testify v1.6.1
13-
github.com/urfave/cli/v2 v2.1.1
13+
github.com/urfave/cli/v2 v2.3.0
1414
go.uber.org/atomic v1.7.0
1515
)

go.sum

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma
44
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
55
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
66
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
7-
github.com/golang/mock v1.4.3 h1:GV+pQPG/EUUbkh47niozDcADz6go/dUwhVzdUQHIVRw=
8-
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
7+
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
8+
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
99
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
1010
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
1111
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
1212
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
13-
github.com/jjeffcaii/reactor-go v0.4.5 h1:FOc3ICOIxPHzJBLYmA6Hf1jpPduX0Czzh4i62Wz17A4=
14-
github.com/jjeffcaii/reactor-go v0.4.5/go.mod h1:2ZzeNFnQ2c55NHRh0KJ4k5yMvmrcpx1APzh7BKkRNQE=
13+
github.com/jjeffcaii/reactor-go v0.5.0 h1:Y8VVp31JGij/ilMh8bXyNEo8k2SVSmc4Cc+mzPGL07w=
14+
github.com/jjeffcaii/reactor-go v0.5.0/go.mod h1:qYN34C2UANFOtDeUGvhxlExLpFMDbvSrphX3Gb3H6S8=
1515
github.com/panjf2000/ants/v2 v2.4.3 h1:wHghL17YKFanB62QjPQ9o+DuM4q7WrQ7zAhoX8+eBXU=
1616
github.com/panjf2000/ants/v2 v2.4.3/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
1717
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -30,23 +30,21 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
3030
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
3131
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
3232
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
33-
github.com/urfave/cli/v2 v2.1.1 h1:Qt8FeAtxE/vfdrLmR3rxR6JRE0RoVmbXu8+6kZtYU4k=
34-
github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
33+
github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
34+
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
3535
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
3636
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
3737
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
3838
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
3939
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
4040
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
41-
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
4241
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
4342
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
4443
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
4544
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
4645
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
46+
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
4747
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
4848
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
4949
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
5050
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
51-
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
52-
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=

rx/mono/mono.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212

1313
// ReleaseFunc can be used to release resources.
1414
type ReleaseFunc func()
15+
type Item = rx.Item
16+
type Combinator2 = func(first, second Item) (payload.Payload, error)
1517

1618
// Mono is a Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.
1719
type Mono interface {
@@ -49,6 +51,8 @@ type Mono interface {
4951
SwitchIfError(alternative func(error) Mono) Mono
5052
// SwitchValueIfError switch to an alternative Payload if this Mono is end with an error.
5153
SwitchValueIfError(alternative payload.Payload) Mono
54+
// ZipWith combines the result from this mono and another into a new Payload.
55+
ZipWith(alternative Mono, cmb Combinator2) Mono
5256
// Raw returns low-level reactor.Mono which defined in reactor-go library.
5357
Raw() mono.Mono
5458
// ToChan subscribe Mono and puts items into a chan.
@@ -65,9 +69,3 @@ type Sink interface {
6569
// Error emits an error then complete current Sink.
6670
Error(error)
6771
}
68-
69-
// Processor combine Sink and Mono.
70-
type Processor interface {
71-
Sink
72-
Mono
73-
}

rx/mono/proxy_default.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,6 @@ func (p proxy) Raw() mono.Mono {
2727
return p.Mono
2828
}
2929

30-
func (p proxy) Success(v payload.Payload) {
31-
mustProcessor(p.Mono).Success(v)
32-
}
33-
34-
func (p proxy) Error(e error) {
35-
mustProcessor(p.Mono).Error(e)
36-
}
37-
3830
func (p proxy) ToChan(ctx context.Context) (<-chan payload.Payload, <-chan error) {
3931
return toChan(ctx, p.Mono)
4032
}
@@ -120,12 +112,12 @@ func (p proxy) DoOnCancel(fn rx.FnOnCancel) Mono {
120112
}
121113

122114
func (p proxy) SwitchIfEmpty(alternative Mono) Mono {
123-
return newProxy(p.Mono.SwitchIfEmpty(alternative.Raw()))
115+
return newProxy(p.Mono.SwitchIfEmpty(unpackRawPublisher(alternative)))
124116
}
125117

126118
func (p proxy) SwitchIfError(alternative func(error) Mono) Mono {
127119
return newProxy(p.Mono.SwitchIfError(func(err error) mono.Mono {
128-
return alternative(err).Raw()
120+
return unpackRawPublisher(alternative(err))
129121
}))
130122
}
131123

@@ -137,6 +129,20 @@ func (p proxy) Timeout(timeout time.Duration) Mono {
137129
return newProxy(p.Mono.Timeout(timeout))
138130
}
139131

132+
func (p proxy) ZipWith(alternative Mono, cmb Combinator2) Mono {
133+
return Zip(p, alternative).ToMono(func(item rx.Tuple) (payload.Payload, error) {
134+
first, err := convertItem(item[0])
135+
if err != nil {
136+
return nil, err
137+
}
138+
second, err := convertItem(item[1])
139+
if err != nil {
140+
return nil, err
141+
}
142+
return cmb(first, second)
143+
})
144+
}
145+
140146
func (p proxy) Subscribe(ctx context.Context, options ...rx.SubscriberOption) {
141147
p.SubscribeWith(ctx, rx.NewSubscriber(options...))
142148
}

rx/mono/proxy_oneshot.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,6 @@ func returnOneshotProxy(o *oneshotProxy) (raw mono.Mono) {
3535
return
3636
}
3737

38-
func (o *oneshotProxy) Success(p payload.Payload) {
39-
mustProcessor(o.Mono).Success(p)
40-
}
41-
42-
func (o *oneshotProxy) Error(err error) {
43-
mustProcessor(o.Mono).Error(err)
44-
}
45-
4638
func (o *oneshotProxy) SubscribeWith(ctx context.Context, s rx.Subscriber) {
4739
var sub reactor.Subscriber
4840
if s == rx.EmptySubscriber {
@@ -143,13 +135,13 @@ func (o *oneshotProxy) Block(ctx context.Context) (payload.Payload, error) {
143135
}
144136

145137
func (o *oneshotProxy) SwitchIfEmpty(alternative Mono) Mono {
146-
o.Mono = o.Mono.SwitchIfEmpty(alternative.Raw())
138+
o.Mono = o.Mono.SwitchIfEmpty(unpackRawPublisher(alternative))
147139
return o
148140
}
149141

150142
func (o *oneshotProxy) SwitchIfError(alternative func(error) Mono) Mono {
151143
o.Mono = o.Mono.SwitchIfError(func(err error) mono.Mono {
152-
return alternative(err).Raw()
144+
return unpackRawPublisher(alternative(err))
153145
})
154146
return o
155147
}
@@ -158,6 +150,19 @@ func (o *oneshotProxy) SwitchValueIfError(alternative payload.Payload) Mono {
158150
o.Mono = o.Mono.SwitchValueIfError(alternative)
159151
return o
160152
}
153+
func (o *oneshotProxy) ZipWith(alternative Mono, cmb Combinator2) Mono {
154+
return Zip(o, alternative).ToMonoOneshot(func(item rx.Tuple) (payload.Payload, error) {
155+
first, err := convertItem(item[0])
156+
if err != nil {
157+
return nil, err
158+
}
159+
second, err := convertItem(item[1])
160+
if err != nil {
161+
return nil, err
162+
}
163+
return cmb(first, second)
164+
})
165+
}
161166

162167
func (o *oneshotProxy) Raw() mono.Mono {
163168
return o.Mono

rx/mono/utils.go

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/jjeffcaii/reactor-go/scheduler"
99
"github.com/pkg/errors"
1010
"github.com/rsocket/rsocket-go/payload"
11+
"github.com/rsocket/rsocket-go/rx"
1112
)
1213

1314
var empty = newProxy(mono.Empty())
@@ -32,6 +33,10 @@ func Raw(input mono.Mono) Mono {
3233
return newProxy(input)
3334
}
3435

36+
func RawOneshot(origin mono.Mono) Mono {
37+
return borrowOneshotProxy(origin)
38+
}
39+
3540
// Just wrap an exist Payload to a Mono.
3641
func Just(input payload.Payload) Mono {
3742
return newProxy(mono.Just(input))
@@ -156,14 +161,6 @@ func toChan(ctx context.Context, publisher mono.Mono) (<-chan payload.Payload, <
156161
return value, err
157162
}
158163

159-
func mustProcessor(origin mono.Mono) mono.Processor {
160-
m, ok := origin.(mono.Processor)
161-
if !ok {
162-
panic(errors.Errorf("require processor but %v", origin))
163-
}
164-
return m
165-
}
166-
167164
func toBlock(ctx context.Context, m mono.Mono) (payload.Payload, error) {
168165
done := make(chan struct{})
169166
vchan := make(chan payload.Payload, 1)
@@ -184,3 +181,35 @@ func toBlock(ctx context.Context, m mono.Mono) (payload.Payload, error) {
184181
return nil, nil
185182
}
186183
}
184+
185+
func unpackRawPublisher(source Mono) mono.Mono {
186+
if source == nil {
187+
return nil
188+
}
189+
switch t := source.(type) {
190+
case *oneshotProxy:
191+
return returnOneshotProxy(t)
192+
default:
193+
return t.Raw()
194+
}
195+
}
196+
197+
func convertItem(item *reactor.Item) (result rx.Item, err error) {
198+
if item == nil {
199+
return
200+
}
201+
if item.E != nil {
202+
result.E = item.E
203+
return
204+
}
205+
if item.V == nil {
206+
return
207+
}
208+
p, ok := item.V.(payload.Payload)
209+
if !ok {
210+
err = errors.Errorf("require Payload value type instead of %t", item.V)
211+
return
212+
}
213+
result.V = p
214+
return
215+
}

rx/mono/zip.go

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,82 @@ package mono
33
import (
44
"github.com/jjeffcaii/reactor-go"
55
"github.com/jjeffcaii/reactor-go/mono"
6-
"github.com/jjeffcaii/reactor-go/tuple"
6+
"github.com/rsocket/rsocket-go/internal/common"
77
"github.com/rsocket/rsocket-go/payload"
88
"github.com/rsocket/rsocket-go/rx"
99
)
1010

11+
// Zip merges given Monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple.
1112
func Zip(first Mono, second Mono, others ...Mono) ZipBuilder {
12-
var all []Mono
13-
all = append(all, first, second)
14-
all = append(all, others...)
15-
return ZipAll(all...)
13+
if len(others) < 1 {
14+
return []mono.Mono{
15+
unpackRawPublisher(first),
16+
unpackRawPublisher(second),
17+
}
18+
}
19+
sources := make([]mono.Mono, 2+len(others))
20+
sources[0] = unpackRawPublisher(first)
21+
sources[1] = unpackRawPublisher(second)
22+
for i := 0; i < len(others); i++ {
23+
sources[i+2] = unpackRawPublisher(others[i])
24+
}
25+
return sources
1626
}
1727

28+
// ZipAll merges given Monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple.
1829
func ZipAll(sources ...Mono) ZipBuilder {
19-
if len(sources) < 1 {
20-
panic("at least one Mono for zip operation")
21-
}
2230
all := make([]mono.Mono, len(sources))
2331
for i := 0; i < len(all); i++ {
24-
all[i] = sources[i].Raw()
32+
all[i] = unpackRawPublisher(sources[i])
2533
}
2634
return all
2735
}
2836

37+
// ZipBuilder can be used to build a zipped Mono.
2938
type ZipBuilder []mono.Mono
3039

31-
func (z ZipBuilder) ToMono(transform func(rx.Tuple) (payload.Payload, error)) Mono {
32-
return Raw(mono.ZipAll(z...).Map(func(any reactor.Any) (reactor.Any, error) {
33-
tup := rx.NewTuple(any.(tuple.Tuple))
34-
return transform(tup)
35-
}))
40+
// ToMonoOneshot builds as a oneshot Mono.
41+
func (z ZipBuilder) ToMonoOneshot(transform func(rx.Tuple) (payload.Payload, error)) Mono {
42+
return RawOneshot(mono.ZipCombineOneshot(cmb(transform), pinItem, z...))
43+
}
44+
45+
// ToMono builds a Mono.
46+
func (z ZipBuilder) ToMono(transform func(item rx.Tuple) (payload.Payload, error)) Mono {
47+
return Raw(mono.ZipCombine(cmb(transform), pinItem, z...))
48+
}
49+
50+
func unpinItem(item *reactor.Item) {
51+
if item == nil {
52+
return
53+
}
54+
if r, _ := item.V.(common.Releasable); r != nil {
55+
r.Release()
56+
}
57+
if r, _ := item.E.(common.Releasable); r != nil {
58+
r.Release()
59+
}
60+
}
61+
62+
func pinItem(item *reactor.Item) {
63+
if item == nil {
64+
return
65+
}
66+
if r, _ := item.V.(common.Releasable); r != nil {
67+
r.IncRef()
68+
}
69+
if r, _ := item.E.(common.Releasable); r != nil {
70+
r.IncRef()
71+
}
72+
}
73+
74+
func cmb(transform func(rx.Tuple) (payload.Payload, error)) func(...*reactor.Item) (reactor.Any, error) {
75+
return func(values ...*reactor.Item) (reactor.Any, error) {
76+
defer func() {
77+
for i := 0; i < len(values); i++ {
78+
unpinItem(values[i])
79+
}
80+
}()
81+
t := rx.NewTuple(values...)
82+
return transform(t)
83+
}
3684
}

rx/mono/zip_test.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ import (
1212

1313
func TestZipBuilder_ToMono(t *testing.T) {
1414
v, err := mono.Zip(mono.Just(_fakePayload), mono.Just(_fakePayload)).
15-
ToMono(func(tuple rx.Tuple) (payload.Payload, error) {
16-
assert.Equal(t, 2, tuple.Len())
17-
for i := 0; i < tuple.Len(); i++ {
18-
v, err := tuple.Get(i)
15+
ToMono(func(items rx.Tuple) (payload.Payload, error) {
16+
assert.Equal(t, 2, items.Len())
17+
for i := 0; i < len(items); i++ {
18+
v, err := items.Get(i)
1919
assert.NoError(t, err)
2020
assert.Equal(t, _fakePayload, v)
2121
}
@@ -33,3 +33,16 @@ func TestZip_Empty(t *testing.T) {
3333
}).Block(context.Background())
3434
})
3535
}
36+
37+
func TestZipWith(t *testing.T) {
38+
p1 := payload.NewString("hello", "")
39+
p2 := payload.NewString("world", "")
40+
res, err := mono.Just(p1).
41+
ZipWith(mono.Just(p2), func(first, second mono.Item) (payload.Payload, error) {
42+
data := first.V.DataUTF8() + " " + second.V.DataUTF8() + "!"
43+
return payload.NewString(data, ""), nil
44+
}).
45+
Block(context.Background())
46+
assert.NoError(t, err, "should not return error")
47+
assert.Equal(t, "hello world!", res.DataUTF8(), "bad result")
48+
}

0 commit comments

Comments
 (0)