Skip to content

Commit

Permalink
chore: add serverinfo to sideinput on client side (numaproj#1488)
Browse files Browse the repository at this point in the history
Signed-off-by: a3hadi <[email protected]>
  • Loading branch information
ayildirim21 authored Feb 5, 2024
1 parent 20cf66d commit 6444cb5
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 13 deletions.
21 changes: 11 additions & 10 deletions pkg/sdkclient/sideinput/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (

sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/numaproj/numaflow-go/pkg/info"

"github.com/numaproj/numaflow/pkg/sdkclient"
"github.com/numaproj/numaflow/pkg/shared/util"
)

// client contains the grpc connection and the grpc client.
Expand All @@ -21,22 +23,21 @@ type client struct {

var _ Client = (*client)(nil)

// New creates a new client object. SideInput doesn't require server info to start ATM.
func New(inputOptions ...sdkclient.Option) (*client, error) {
// New creates a new client object.
func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (*client, error) {
var opts = sdkclient.DefaultOptions(sdkclient.SideInputAddr)

for _, inputOption := range inputOptions {
inputOption(opts)
}
_, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
c := new(client)
sockAddr := fmt.Sprintf("%s:%s", sdkclient.UDS, opts.UdsSockAddr())
conn, err := grpc.Dial(sockAddr, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(opts.MaxMessageSize()), grpc.MaxCallSendMsgSize(opts.MaxMessageSize())))

// Connect to the server
conn, err := util.ConnectToServer(opts.UdsSockAddr(), serverInfo, opts.MaxMessageSize())
if err != nil {
return nil, fmt.Errorf("failed to execute grpc.Dial(%q): %w", sockAddr, err)
return nil, err
}

c := new(client)
c.conn = conn
c.grpcClt = sideinputpb.NewSideInputClient(conn)
return c, nil
Expand Down
10 changes: 9 additions & 1 deletion pkg/sideinputs/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"time"

"github.com/numaproj/numaflow/pkg/sdkserverinfo"

cronlib "github.com/robfig/cron/v3"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/emptypb"
Expand Down Expand Up @@ -80,8 +82,14 @@ func (sim *sideInputsManager) Start(ctx context.Context) error {
return fmt.Errorf("unrecognized isbsvc type %q", sim.isbSvcType)
}

// Wait for server info to be ready
serverInfo, err := sdkserverinfo.SDKServerInfo()
if err != nil {
return err
}

// Create a new gRPC client for Side Input
sideInputClient, err := sideinput.New()
sideInputClient, err := sideinput.New(serverInfo)
if err != nil {
return fmt.Errorf("failed to create a new gRPC client: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion test/sideinputs-e2e/testdata/map-sideinput-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ spec:
- name: myticker
container:
# A map side input example , see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/map-side-input
image: "quay.io/numaio/numaproj-contrib/e2e-map-sideinput-example:v0.0.2"
image: "quay.io/numaio/numaproj-contrib/e2e-map-sideinput-example:v0.0.3"
imagePullPolicy: Always
trigger:
schedule: "@every 5s"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ spec:
- name: myticker
container:
# A reduce side input, see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/reduce-side-input
image: "quay.io/numaio/numaproj-contrib/e2e-reduce-sideinput-example:v0.0.2"
image: "quay.io/numaio/numaproj-contrib/e2e-reduce-sideinput-example:v0.0.3"
imagePullPolicy: Always
trigger:
schedule: "@every 5s"
Expand Down
2 changes: 2 additions & 0 deletions test/sideinputs-e2e/testdata/sideinput_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ spec:
sideInputs:
- name: myticker
container:
# see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/sideInput-function
image: "quay.io/numaio/numaflow-sideinput/sideinput-sink-e2e-test:latest"
imagePullPolicy: Always
trigger:
Expand All @@ -24,6 +25,7 @@ spec:
sink:
udsink:
container:
# see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/sink-side-input
image: "quay.io/numaio/numaflow-sink/redis-e2e-test-sink-with-sideinput:latest"
sideInputs:
- myticker
Expand Down
3 changes: 3 additions & 0 deletions test/sideinputs-e2e/testdata/sideinput_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ spec:
sideInputs:
- name: myticker
container:
# see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/sideInput-function
image: "quay.io/numaio/numaflow-sideinput/sideinput-sink-e2e-test:latest"
imagePullPolicy: Always
trigger:
Expand All @@ -18,13 +19,15 @@ spec:
source:
udsource:
container:
# see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/simple-source-with-sideinput
image: "quay.io/numaio/numaflow-source/simple_source_with_sideinput:latest"
sideInputs:
- myticker
- name: redis-uds
sink:
udsink:
container:
# see https://github.com/numaproj/numaflow-sinks/tree/78693ec48a5b8bc7ca7be2898dd8dd722392b53a/redis-e2e-test-sink
image: "quay.io/numaio/numaflow-sink/redis-e2e-test-sink:v0.5.0"
sideInputs:
- myticker
Expand Down

0 comments on commit 6444cb5

Please sign in to comment.