diff --git a/service/common/type.go b/service/common/type.go index d1f5e7db..1883b3c3 100644 --- a/service/common/type.go +++ b/service/common/type.go @@ -99,6 +99,8 @@ type Subscription struct { Match string `json:"match"` // Priority is the priority in which to evaluate the match (lower to higher). Priority int `json:"priority"` + // DisableTopicValidation allows to receive events from publisher topics that differ from the subscribed topic. + DisableTopicValidation bool `json:"disableTopicValidation"` } const ( diff --git a/service/grpc/topic.go b/service/grpc/topic.go index 7b565ef1..e09d8663 100644 --- a/service/grpc/topic.go +++ b/service/grpc/topic.go @@ -85,7 +85,17 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*p return &pb.TopicEventResponse{Status: pb.TopicEventResponse_DROP}, errors.New("pub/sub and topic names required") } key := in.PubsubName + "-" + in.Topic - if sub, ok := s.topicRegistrar[key]; ok { + noValidationKey := in.PubsubName + + var sub *internal.TopicRegistration + var ok bool + + sub, ok = s.topicRegistrar[key] + if !ok { + sub, ok = s.topicRegistrar[noValidationKey] + } + + if ok { data := interface{}(in.Data) if len(in.Data) > 0 { mediaType, _, err := mime.ParseMediaType(in.DataContentType) diff --git a/service/grpc/topic_test.go b/service/grpc/topic_test.go index ffddb895..6390aee3 100644 --- a/service/grpc/topic_test.go +++ b/service/grpc/topic_test.go @@ -137,6 +137,36 @@ func TestTopic(t *testing.T) { stopTestServer(t, server) } +func TestTopicWithValidationDisabled(t *testing.T) { + ctx := context.Background() + + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "*", + DisableTopicValidation: true, + } + server := getTestServer() + + err := server.AddTopicEventHandler(sub, eventHandler) + assert.Nil(t, err) + + startTestServer(server) + + in := &runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: "test", + PubsubName: sub.PubsubName, + } + + _, err = server.OnTopicEvent(ctx, in) + assert.NoError(t, err) +} + func TestTopicWithErrors(t *testing.T) { ctx := context.Background() diff --git a/service/internal/topicregistrar.go b/service/internal/topicregistrar.go index 3ac17658..ec4efb9e 100644 --- a/service/internal/topicregistrar.go +++ b/service/internal/topicregistrar.go @@ -29,7 +29,14 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi if fn == nil { return fmt.Errorf("topic handler required") } - key := sub.PubsubName + "-" + sub.Topic + + var key string + if !sub.DisableTopicValidation { + key = sub.PubsubName + "-" + sub.Topic + } else { + key = sub.PubsubName + } + ts, ok := m[key] if !ok { ts = &TopicRegistration{