Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions pubsub/v1samples/subscriptions/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,15 +1078,18 @@ func publishMsgs(ctx context.Context, t *pubsub.Topic, numMsgs int) error {

// getOrCreateTopic gets a topic or creates it if it doesn't exist.
func getOrCreateTopic(ctx context.Context, client *pubsub.Client, topicID string) (*pubsub.Topic, error) {
topic := client.Topic(topicID)
ok, err := topic.Exists(ctx)
// avoid async race conditions by attempting to create first and checking error
// rather than checking for existence, then later creating
topic, err := client.CreateTopic(ctx, topicID)
if err != nil {
return nil, fmt.Errorf("failed to check if topic exists: %w", err)
}
if !ok {
topic, err = client.CreateTopic(ctx, topicID)
if err != nil {
return nil, fmt.Errorf("failed to create topic (%q): %w", topicID, err)
st, ok := status.FromError(err)
if !ok {
return nil, fmt.Errorf("CreateTopic failed with unknown err: %v", err)
}
if st.Code() == codes.AlreadyExists {
topic = client.Topic(topicID)
} else {
return nil, fmt.Errorf("CreateTopic: %v", err)
}
}
return topic, nil
Expand Down Expand Up @@ -1174,23 +1177,20 @@ func TestCreateSubscriptionWithSMT(t *testing.T) {
client := setup(t)

smtSubID := subID + "-smt"
smtTopicID := topicID + "-smt"
var topic *pubsub.Topic
var err error

testutil.Retry(t, 10, time.Second, func(r *testutil.R) {
topic, err = client.CreateTopic(ctx, topicID)
var err error
// use = to set topic in outer scope
topic, err = getOrCreateTopic(ctx, client, smtTopicID)
if err != nil {
st, ok := status.FromError(err)
if !ok {
r.Errorf("CreateTopic failed with unknown err: %v", err)
}
// Don't return error if topic already exists, just use that for the test.
if st.Code() != codes.AlreadyExists {
r.Errorf("CreateTopic: %v", err)
}
r.Errorf("getOrCreateTopic: %v", err)
return
}
})
defer topic.Delete(ctx)
defer topic.Stop()

testutil.Retry(t, 10, time.Second, func(r *testutil.R) {
buf := new(bytes.Buffer)
if err := createSubscriptionWithSMT(buf, tc.ProjectID, smtSubID, topic); err != nil {
r.Errorf("failed to create subscription with SMT: %v", err)
Expand Down
Loading