diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..d02fde4 --- /dev/null +++ b/client_test.go @@ -0,0 +1,26 @@ +package main + +import ( + "context" + "fmt" + "testing" + + "github.com/liftbridge-io/go-liftbridge" + "github.com/stretchr/testify/assert" +) + +func (mlbc MockLiftBridgeClient) CreateStream(ctx context.Context, stream liftbridge.StreamInfo) error { + fmt.Println("fake CreateStream() called") + return nil +} + +func Test_Client_CreateStream(t *testing.T) { + mlbc := MockLiftBridgeClient{ + streamInfo: liftbridge.StreamInfo{ + Subject: "foo", + Name: "foo-stream", + ReplicationFactor: 3, + }, + } + assert.NoError(t, createStream(mlbc, context.Background(), mlbc.streamInfo)) +} diff --git a/main.go b/main.go index e528f2d..8b3cf93 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "sync" liftbridge "github.com/liftbridge-io/go-liftbridge" @@ -9,12 +10,14 @@ import ( type EventStreamClient interface { Pub() Sub() + CreateStream(context.Context, liftbridge.StreamInfo) error } type LiftBridgeClient struct { wg *sync.WaitGroup servers []string streamInfo liftbridge.StreamInfo + client liftbridge.Client } func main() { diff --git a/main_test.go b/main_test.go index 690752e..a77f61f 100644 --- a/main_test.go +++ b/main_test.go @@ -5,11 +5,13 @@ import ( "sync" "testing" + liftbridge "github.com/liftbridge-io/go-liftbridge" "github.com/stretchr/testify/assert" ) type MockLiftBridgeClient struct { - wg *sync.WaitGroup + wg *sync.WaitGroup + streamInfo liftbridge.StreamInfo } func (mlbc MockLiftBridgeClient) Pub() { diff --git a/sub.go b/sub.go index 959d980..3ab128c 100644 --- a/sub.go +++ b/sub.go @@ -8,6 +8,10 @@ import ( lift "github.com/liftbridge-io/go-liftbridge/liftbridge-grpc" ) +func sub(lbcIface EventStreamClient) { + lbcIface.Sub() +} + func (lbc LiftBridgeClient) Sub() { defer lbc.wg.Done() fmt.Println("real Sub() called") @@ -17,9 +21,10 @@ func (lbc LiftBridgeClient) Sub() { if err != nil { panic(err) } - defer client.Close() + lbc.client = client + defer lbc.client.Close() - if err := client.CreateStream(context.Background(), lbc.streamInfo); err != nil { + if err := lbc.client.CreateStream(context.Background(), lbc.streamInfo); err != nil { if err != liftbridge.ErrStreamExists { panic(err) } @@ -38,6 +43,11 @@ func (lbc LiftBridgeClient) Sub() { <-ctx.Done() } -func sub(lbcIface EventStreamClient) { - lbcIface.Sub() +func createStream(lbcIface EventStreamClient, ctx context.Context, stream liftbridge.StreamInfo) error { + return lbcIface.CreateStream(ctx, stream) +} + +func (lbc LiftBridgeClient) CreateStream(ctx context.Context, stream liftbridge.StreamInfo) error { + fmt.Println("real createStream() called") + return nil }