diff --git a/pubsub/v1samples/subscriptions/subscription_test.go b/pubsub/v1samples/subscriptions/subscription_test.go index 6cf430452b..5aaaaa90eb 100644 --- a/pubsub/v1samples/subscriptions/subscription_test.go +++ b/pubsub/v1samples/subscriptions/subscription_test.go @@ -1078,15 +1078,18 @@ func publishMsgs(ctx context.Context, t *pubsub.Topic, numMsgs int) error { // getOrCreateTopic gets a topic or creates it if it doesn't exist. func getOrCreateTopic(ctx context.Context, client *pubsub.Client, topicID string) (*pubsub.Topic, error) { - topic := client.Topic(topicID) - ok, err := topic.Exists(ctx) + // avoid async race conditions by attempting to create first and checking error + // rather than checking for existence, then later creating + topic, err := client.CreateTopic(ctx, topicID) if err != nil { - return nil, fmt.Errorf("failed to check if topic exists: %w", err) - } - if !ok { - topic, err = client.CreateTopic(ctx, topicID) - if err != nil { - return nil, fmt.Errorf("failed to create topic (%q): %w", topicID, err) + st, ok := status.FromError(err) + if !ok { + return nil, fmt.Errorf("CreateTopic failed with unknown err: %v", err) + } + if st.Code() == codes.AlreadyExists { + topic = client.Topic(topicID) + } else { + return nil, fmt.Errorf("CreateTopic: %v", err) } } return topic, nil @@ -1174,23 +1177,20 @@ func TestCreateSubscriptionWithSMT(t *testing.T) { client := setup(t) smtSubID := subID + "-smt" + smtTopicID := topicID + "-smt" var topic *pubsub.Topic - var err error + testutil.Retry(t, 10, time.Second, func(r *testutil.R) { - topic, err = client.CreateTopic(ctx, topicID) + var err error + // use = to set topic in outer scope + topic, err = getOrCreateTopic(ctx, client, smtTopicID) if err != nil { - st, ok := status.FromError(err) - if !ok { - r.Errorf("CreateTopic failed with unknown err: %v", err) - } - // Don't return error if topic already exists, just use that for the test. - if st.Code() != codes.AlreadyExists { - r.Errorf("CreateTopic: %v", err) - } + r.Errorf("getOrCreateTopic: %v", err) + return } - }) + defer topic.Delete(ctx) + defer topic.Stop() - testutil.Retry(t, 10, time.Second, func(r *testutil.R) { buf := new(bytes.Buffer) if err := createSubscriptionWithSMT(buf, tc.ProjectID, smtSubID, topic); err != nil { r.Errorf("failed to create subscription with SMT: %v", err)