Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
38f8826
feat: add subscriptions hook
alepane21 Jul 18, 2025
824362b
chore: add tests of OnSubscriptionStartFn
alepane21 Jul 19, 2025
cdfa9bf
chore: remove duplicated test
alepane21 Jul 19, 2025
f8de362
chore: add test of error with Start method
alepane21 Jul 19, 2025
e500d1c
chore: document OnSubscriptionStartFn
alepane21 Jul 19, 2025
c970c07
Merge branch 'master' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 19, 2025
2a6b5f0
chore: refactores the OnSubscriptionStart hook to be called from the …
alepane21 Jul 22, 2025
1e6ed5f
fix: behaviour when errors happend have been changed by mistake
alepane21 Jul 22, 2025
d776943
chore: change OnSubscriptionStart hook contract
alepane21 Jul 22, 2025
2f34327
chore: hooks now can decide if the subscription has to end
alepane21 Jul 22, 2025
dc89310
chore: events are written directly on the resolveCtx
alepane21 Jul 23, 2025
3cd5690
Merge branch 'master' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 23, 2025
bfad223
chore: improve test
alepane21 Jul 23, 2025
91dbadd
chore: improve names to align with the ones in the ADR of the router
alepane21 Jul 23, 2025
207adf7
chore: execute hooks inside worker, so that we don't block the main l…
alepane21 Jul 23, 2025
3257c52
fix: if only one event was to be skipped, every event was skipped! re…
alepane21 Jul 23, 2025
ec79b06
chore: remove now useless handleTriggerUpdateSubscription
alepane21 Jul 23, 2025
e74d079
chore: send updates on subscription on time (there is still a data ra…
alepane21 Jul 24, 2025
41b92a0
chore: avoid workChan races
alepane21 Jul 24, 2025
7bb7431
chore: lint issue
alepane21 Jul 24, 2025
e0845be
Merge branch 'master' into ale/eng-7600-add-subscriptiononstarthandler
alepane21 Jul 29, 2025
3b61ef2
chore: rename SubscriptionOnStartFns to StartupHooks for clarity
alepane21 Jul 29, 2025
ae55c92
Merge remote-tracking branch 'origin/master' into ale/eng-7600-add-su…
alepane21 Jul 29, 2025
87cb3a8
Merge remote-tracking branch 'origin/ale/eng-7600-add-subscriptionons…
alepane21 Jul 29, 2025
ea8ab75
chore: wait for the initial hook execution before starting the dataso…
alepane21 Aug 5, 2025
2ed6134
fix: avoid locking worker goroutine if the trigger was already set up
alepane21 Aug 6, 2025
5fa5a76
chore: add test to verify Resolve beahviour with more than one subscr…
alepane21 Aug 6, 2025
8731ff2
chore: remove return close option
alepane21 Aug 19, 2025
6793f09
chore: HookableSubscriptionDataSource should also be without close
alepane21 Aug 19, 2025
fdf829a
chore: fix initialHooksClose value
alepane21 Aug 19, 2025
7b569b6
chore: simplify hooks implementation
alepane21 Aug 20, 2025
4a94540
chore: remove pinned subscription
alepane21 Aug 20, 2025
a673838
chore: remove unused code, fix naming
alepane21 Aug 20, 2025
46d97b2
chore: fix test
alepane21 Aug 20, 2025
447c103
chore: avoid data source start if the subscription should close
alepane21 Aug 20, 2025
7bb99a5
chore: improve resolve structure
alepane21 Aug 21, 2025
f8c9d06
chore: type
alepane21 Aug 21, 2025
baec1da
chore: readd pinned updater to sub
alepane21 Aug 21, 2025
2100f14
chore: simplify events flow
alepane21 Aug 22, 2025
311b021
chore: fix compile issues
StarpTech Aug 23, 2025
e0af0a0
chore: fix tests
alepane21 Aug 25, 2025
f4eba63
chore: remove pubsub datasource implementations and related tests, it…
alepane21 Aug 25, 2025
0efce3c
test: add new test for handling multiple subscriptions with the same …
alepane21 Aug 25, 2025
6234791
chore: execute startup hooks inside worker go routine
alepane21 Sep 3, 2025
8df1b67
chore: remove parameter not used anymore
alepane21 Sep 3, 2025
a0024f0
chore: add subscription to registry after startup hooks execution
alepane21 Sep 9, 2025
f87d764
feat: add subscription management methods to subscriptionUpdater
alepane21 Sep 24, 2025
ddc652f
Merge branch 'topic/streams-v1' into ale/eng-7600-add-subscriptionons…
dkorittki Sep 30, 2025
1d6b668
Merge branch 'ale/eng-7600-add-subscriptiononstarthandler' into ale/e…
dkorittki Oct 1, 2025
2391c87
fix: implement missing subscription updater methods (#1310)
dkorittki Oct 6, 2025
15d578b
fix: don't send heartbeat on some subscription resolver tests (#1313)
dkorittki Oct 9, 2025
ba9a827
Merge branch 'master' into topic/streams-v1
dkorittki Oct 9, 2025
17f8612
Merge branch 'master' into dominik/merge-upstream-to-cosmo-streams
dkorittki Oct 10, 2025
1353de9
fix: fix flaky engine subscription tests (#1318)
dkorittki Oct 16, 2025
06f55f1
Merge branch 'topic/streams-v1' into dominik/merge-upstream-to-cosmo-…
dkorittki Oct 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/wundergraph/graphql-go-tools/v2/pkg/astparser"
"github.com/wundergraph/graphql-go-tools/v2/pkg/asttransform"
grpcdatasource "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/grpc_datasource"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"github.com/wundergraph/graphql-go-tools/v2/pkg/federation"
"github.com/wundergraph/graphql-go-tools/v2/pkg/operationreport"
)
Expand Down Expand Up @@ -103,6 +104,11 @@ type SingleTypeField struct {
FieldName string
}

// SubscriptionOnStartFn defines a hook function that is called when a subscription starts.
// It receives the resolve context and the input of the subscription.
// The function can return an error.
type SubscriptionOnStartFn func(ctx resolve.StartupHookContext, input []byte) (err error)

type SubscriptionConfiguration struct {
URL string
Header http.Header
Expand All @@ -119,6 +125,8 @@ type SubscriptionConfiguration struct {
// these headers by itself.
ForwardedClientHeaderRegularExpressions []RegularExpression
WsSubProtocol string
// StartupHooks contains the method called when a subscription is started
StartupHooks []SubscriptionOnStartFn
}

type FetchConfiguration struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,8 @@ func (p *Planner[T]) ConfigureSubscription() plan.SubscriptionConfiguration {
return plan.SubscriptionConfiguration{
Input: string(input),
DataSource: &SubscriptionSource{
client: p.subscriptionClient,
client: p.subscriptionClient,
subscriptionOnStartFns: p.config.subscription.StartupHooks,
},
Variables: p.variables,
PostProcessing: DefaultPostProcessingConfiguration,
Expand Down Expand Up @@ -1953,7 +1954,8 @@ type RegularExpression struct {
}

type SubscriptionSource struct {
client GraphQLSubscriptionClient
client GraphQLSubscriptionClient
subscriptionOnStartFns []SubscriptionOnStartFn
}

func (s *SubscriptionSource) AsyncStart(ctx *resolve.Context, id uint64, input []byte, updater resolve.SubscriptionUpdater) error {
Expand Down Expand Up @@ -2003,3 +2005,12 @@ func (s *SubscriptionSource) UniqueRequestID(ctx *resolve.Context, input []byte,
}
return s.client.UniqueRequestID(ctx, options, xxh)
}

// SubscriptionOnStart is called when a subscription is started.
// Each hook is called in a separate goroutine.
func (s *SubscriptionSource) SubscriptionOnStart(ctx resolve.StartupHookContext, input []byte) (err error) {
for _, fn := range s.subscriptionOnStartFns {
return fn(ctx, input)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -4018,7 +4018,7 @@ func TestGraphQLDataSource(t *testing.T) {
Trigger: resolve.GraphQLSubscriptionTrigger{
Input: []byte(`{"url":"wss://swapi.com/graphql","body":{"query":"subscription{remainingJedis}"}}`),
Source: &SubscriptionSource{
NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
client: NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
},
PostProcessing: DefaultPostProcessingConfiguration,
},
Expand Down Expand Up @@ -8251,7 +8251,6 @@ func (f *FailingSubscriptionClient) SubscribeAsync(ctx *resolve.Context, id uint
}

func (f *FailingSubscriptionClient) Unsubscribe(id uint64) {

}

func (f *FailingSubscriptionClient) Subscribe(ctx *resolve.Context, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error {
Expand Down Expand Up @@ -8331,6 +8330,19 @@ func (t *testSubscriptionUpdater) Close(kind resolve.SubscriptionCloseKind) {
t.closed = true
}

// empty method to satisfy the interface, not used in this tests
func (t *testSubscriptionUpdater) CloseSubscription(kind resolve.SubscriptionCloseKind, id resolve.SubscriptionIdentifier) {
}

// empty method to satisfy the interface, not used in this tests
func (t *testSubscriptionUpdater) Subscriptions() map[context.Context]resolve.SubscriptionIdentifier {
return make(map[context.Context]resolve.SubscriptionIdentifier)
}

// empty method to satisfy the interface, not used in this tests
func (t *testSubscriptionUpdater) UpdateSubscription(id resolve.SubscriptionIdentifier, data []byte) {
}

func TestSubscriptionSource_Start(t *testing.T) {
chatServer := httptest.NewServer(subscriptiontesting.ChatGraphQLEndpointHandler())
defer chatServer.Close()
Expand Down Expand Up @@ -8904,6 +8916,60 @@ func TestSanitizeKey(t *testing.T) {
}
}

func TestSubscriptionSource_SubscriptionOnStart(t *testing.T) {

t.Run("SubscriptionOnStart calls subscriptionOnStartFns", func(t *testing.T) {
ctx := resolve.StartupHookContext{
Context: context.Background(),
Updater: func(data []byte) {},
}

type fnData struct {
ctx resolve.StartupHookContext
input []byte
}

startFnCalled := make(chan fnData, 1)
subscriptionSource := SubscriptionSource{
subscriptionOnStartFns: []SubscriptionOnStartFn{
func(ctx resolve.StartupHookContext, input []byte) error {
startFnCalled <- fnData{ctx, input}
return nil
},
},
}

err := subscriptionSource.SubscriptionOnStart(ctx, []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`))
require.NoError(t, err)
var called fnData
select {
case called = <-startFnCalled:
case <-time.After(1 * time.Second):
t.Fatal("SubscriptionOnStartFn was not called")
}
assert.Equal(t, []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`), called.input)
})

t.Run("SubscriptionOnStart calls subscriptionOnStartFns and returns error if one of the functions returns an error", func(t *testing.T) {
ctx := resolve.StartupHookContext{
Context: context.Background(),
Updater: func(data []byte) {},
}

subscriptionSource := SubscriptionSource{
subscriptionOnStartFns: []SubscriptionOnStartFn{
func(ctx resolve.StartupHookContext, input []byte) error {
return errors.New("test error")
},
},
}

err := subscriptionSource.SubscriptionOnStart(ctx, []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`))
require.Error(t, err)
assert.ErrorContains(t, err, "test error")
})
}

const interfaceSelectionSchema = `

scalar String
Expand Down

This file was deleted.

Loading