Skip to content

Commit c65aa22

Browse files
feat(pubsub/v2): add single message transform samples (#5352)
* feat(pubsub/v2): add single message transform samples * address review comments * lingering review comment --------- Co-authored-by: Brian Dorsey <[email protected]>
1 parent d3bdada commit c65aa22

File tree

4 files changed

+194
-0
lines changed

4 files changed

+194
-0
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package subscriptions
16+
17+
// [START pubsub_create_subscription_with_smt]
18+
import (
19+
"context"
20+
"fmt"
21+
"io"
22+
23+
"cloud.google.com/go/pubsub/v2"
24+
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
25+
)
26+
27+
// createSubscriptionWithSMT creates a subscription with a single message transform function applied.
28+
func createSubscriptionWithSMT(w io.Writer, projectID, topicID, subID string) error {
29+
// projectID := "my-project-id"
30+
// topicID := "my-topic"
31+
// subID := "my-sub"
32+
ctx := context.Background()
33+
client, err := pubsub.NewClient(ctx, projectID)
34+
if err != nil {
35+
return fmt.Errorf("pubsub.NewClient: %w", err)
36+
}
37+
defer client.Close()
38+
39+
code := `function redactSSN(message, metadata) {
40+
const data = JSON.parse(message.data);
41+
delete data['ssn'];
42+
message.data = JSON.stringify(data);
43+
return message;
44+
}`
45+
46+
transform := &pubsubpb.MessageTransform{
47+
Transform: &pubsubpb.MessageTransform_JavascriptUdf{
48+
JavascriptUdf: &pubsubpb.JavaScriptUDF{
49+
FunctionName: "redactSSN",
50+
Code: code,
51+
},
52+
},
53+
}
54+
55+
sub := &pubsubpb.Subscription{
56+
Name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subID),
57+
Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
58+
MessageTransforms: []*pubsubpb.MessageTransform{transform},
59+
}
60+
sub, err = client.SubscriptionAdminClient.CreateSubscription(ctx, sub)
61+
if err != nil {
62+
return fmt.Errorf("CreateSubscription: %w", err)
63+
}
64+
fmt.Fprintf(w, "Created subscription with message transform: %v\n", sub)
65+
return nil
66+
}
67+
68+
// [END pubsub_create_subscription_with_smt]

pubsub/subscriptions/subscription_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import (
3636
trace "cloud.google.com/go/trace/apiv1"
3737
"cloud.google.com/go/trace/apiv1/tracepb"
3838
"google.golang.org/api/iterator"
39+
"google.golang.org/grpc/codes"
40+
"google.golang.org/grpc/status"
3941

4042
"github.com/GoogleCloudPlatform/golang-samples/internal/testutil"
4143
)
@@ -731,3 +733,34 @@ func createOrGetStorageBucket(projectID, bucketID string) error {
731733

732734
return nil
733735
}
736+
737+
func TestCreateSubscriptionWithSMT(t *testing.T) {
738+
ctx := context.Background()
739+
tc := testutil.SystemTest(t)
740+
client := setup(t)
741+
742+
smtSubID := subID + "-smt"
743+
smtTopicID := topicID + "-smt"
744+
testutil.Retry(t, 10, time.Second, func(r *testutil.R) {
745+
_, err := client.TopicAdminClient.CreateTopic(ctx, &pb.Topic{
746+
Name: fmt.Sprintf("projects/%s/topics/%s", tc.ProjectID, smtTopicID),
747+
})
748+
if err != nil {
749+
if st, ok := status.FromError(err); !ok || st.Code() != codes.AlreadyExists {
750+
r.Errorf("CreateTopic: %v", err)
751+
}
752+
}
753+
})
754+
755+
testutil.Retry(t, 10, time.Second, func(r *testutil.R) {
756+
buf := new(bytes.Buffer)
757+
if err := createSubscriptionWithSMT(buf, tc.ProjectID, smtTopicID, smtSubID); err != nil {
758+
r.Errorf("failed to create subscription with SMT: %v", err)
759+
}
760+
got := buf.String()
761+
want := "Created subscription with message transform"
762+
if !strings.Contains(got, want) {
763+
r.Errorf("got: %s, want: %v", got, want)
764+
}
765+
})
766+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package topics
16+
17+
// [START pubsub_create_topic_with_smt]
18+
import (
19+
"context"
20+
"fmt"
21+
"io"
22+
23+
"cloud.google.com/go/pubsub/v2"
24+
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
25+
)
26+
27+
// createTopicWithSMT creates a topic with a single message transform function applied.
28+
func createTopicWithSMT(w io.Writer, projectID, topicID string) error {
29+
// projectID := "my-project-id"
30+
// topicID := "my-topic"
31+
ctx := context.Background()
32+
client, err := pubsub.NewClient(ctx, projectID)
33+
if err != nil {
34+
return fmt.Errorf("pubsub.NewClient: %w", err)
35+
}
36+
defer client.Close()
37+
38+
code := `function redactSSN(message, metadata) {
39+
const data = JSON.parse(message.data);
40+
delete data['ssn'];
41+
message.data = JSON.stringify(data);
42+
return message;
43+
}`
44+
transform := &pubsubpb.MessageTransform{
45+
Transform: &pubsubpb.MessageTransform_JavascriptUdf{
46+
JavascriptUdf: &pubsubpb.JavaScriptUDF{
47+
FunctionName: "redactSSN",
48+
Code: code,
49+
},
50+
},
51+
}
52+
53+
topic := &pubsubpb.Topic{
54+
Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
55+
MessageTransforms: []*pubsubpb.MessageTransform{transform},
56+
}
57+
58+
topic, err = client.TopicAdminClient.CreateTopic(ctx, topic)
59+
if err != nil {
60+
return fmt.Errorf("CreateTopic: %w", err)
61+
}
62+
63+
fmt.Fprintf(w, "Created topic with message transform: %v\n", topic)
64+
return nil
65+
}
66+
67+
// [END pubsub_create_topic_with_smt]

pubsub/topics/topics_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import (
3434
"cloud.google.com/go/trace/apiv1/tracepb"
3535
"github.com/GoogleCloudPlatform/golang-samples/internal/testutil"
3636
"google.golang.org/api/iterator"
37+
"google.golang.org/grpc/codes"
38+
"google.golang.org/grpc/status"
3739
)
3840

3941
var topicID string
@@ -350,6 +352,30 @@ func TestPublishWithCompression(t *testing.T) {
350352
}
351353
}
352354

355+
func TestCreateTopicWithSMT(t *testing.T) {
356+
setup(t)
357+
tc := testutil.SystemTest(t)
358+
smtTopicID := topicID + "-smt"
359+
testutil.Retry(t, 10, time.Second, func(r *testutil.R) {
360+
buf := new(bytes.Buffer)
361+
err := createTopicWithSMT(buf, tc.ProjectID, smtTopicID)
362+
if err != nil {
363+
st, ok := status.FromError(err)
364+
if ok && st.Code() == codes.AlreadyExists {
365+
return // This is expected on a retry.
366+
}
367+
r.Errorf("failed to create topic with SMT: %v", err)
368+
return
369+
}
370+
371+
got := buf.String()
372+
want := "Created topic with message transform"
373+
if !strings.Contains(got, want) {
374+
r.Errorf("got %q, want to contain %q", got, want)
375+
}
376+
})
377+
}
378+
353379
func createTopic(ctx context.Context, client *pubsub.Client, topicName string) error {
354380
_, err := client.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{
355381
Name: topicName,

0 commit comments

Comments
 (0)