Skip to content

Conversation

@alepane21
Copy link
Contributor

@alepane21 alepane21 commented Oct 6, 2025

This PR introduces hooks inside the subscription lifecycle. We also decided to remove old pubsub implementation that is already deprecated and the router is not using anymore.

This change is primarily needed for Cosmo Streams support in the router. It allows to run a hook during the initialization of a subscription client. The changes are meant to be backwards compatible to the router, so the router can use an engine version with these changes without adjustments.

Summary by CodeRabbit

  • New Features

    • Added startup hooks for GraphQL subscriptions, enabling custom initialization logic to execute when subscriptions start.
    • Enhanced subscription lifecycle management with per-subscription update capabilities.
  • Revert

    • Removed Pub/Sub datasource implementation, including all NATS and Kafka integrations and related event handling functionality.

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.

alepane21 and others added 30 commits July 18, 2025 18:56
…resolve at the initialization of each client subscription
…ce to fix when a subscription is completed while a hook is still sending messages)
…tarthandler' into ale/eng-7600-add-subscriptiononstarthandler
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 6, 2025

Walkthrough

Adds a subscription startup hook mechanism and per-subscription updater API to resolver and GraphQL subscription sources; removes the entire pub/sub datasource subsystem (NATS/Kafka) and associated tests.

Changes

Cohort / File(s) Summary
Subscription hook types & config
v2/pkg/engine/datasource/graphql_datasource/configuration.go
Added SubscriptionOnStartFn type and StartupHooks []SubscriptionOnStartFn to SubscriptionConfiguration; imported resolve.
GraphQL subscription source
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
Stored startup hook functions on SubscriptionSource and added SubscriptionOnStart(ctx resolve.StartupHookContext, input []byte) error to invoke configured hooks.
GraphQL datasource tests
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go
Adjusted test wiring for subscription client field; added no-op methods to test doubles; added tests verifying SubscriptionOnStart invocation and error propagation.
Resolve subscription hooks & updater API
v2/pkg/engine/resolve/datasource.go, v2/pkg/engine/resolve/resolve.go
Added HookableSubscriptionDataSource interface; added StartupHookContext type; extended trigger with updater *subscriptionUpdater; introduced subscriptionUpdater methods (UpdateSubscription, CloseSubscription, Subscriptions) and executeStartupHooks, sendUpdateToSubscription, and an UpdateSubscription event kind; wired updater/subscription id registry through trigger lifecycle.
Resolver tests
v2/pkg/engine/resolve/resolve_test.go
Extended createFakeStream signature to accept subscriptionOnStartFn and wired _fakeStream.SubscriptionOnStart to delegate to the provided hook; updated call sites to pass nil where appropriate.
Pub/Sub datasource removal
v2/pkg/engine/datasource/pubsub_datasource/{kafka_event_manager.go, nats_event_manager.go, pubsub_datasource.go, pubsub_datasource_test.go, pubsub_kafka.go, pubsub_nats.go}
Deleted entire pub/sub datasource implementation and tests — removed NATS/Kafka event managers, configurations, planner/factory, data source types, interfaces, helpers, and test coverage.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Resolver
    participant Trigger
    participant DataSource
    participant Updater

    Client->>Resolver: addSubscription(request)
    Resolver->>Trigger: addSubscription
    Trigger->>DataSource: (if Hookable) SubscriptionOnStart(ctx, input)
    alt startup hook succeeds
        DataSource->>Updater: UpdateSubscription(id, data)
        Updater->>Client: push update
        Resolver->>DataSource: Start (after hooks)
        DataSource->>Updater: Update / Complete / Close events
    else startup hook returns error
        DataSource-->>Resolver: error
        Resolver->>Trigger: unsubscribe / propagate error
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Areas to focus on:

  • References/imports and build breaks after removal of pub/sub files (search for removed public types/usages).
  • Correct initialization and propagation of trig.updater and updater.subsFn across trigger creation/reuse.
  • Concurrency and thread-safety when accessing subscription maps in new updater methods and sendUpdateToSubscription.
  • Startup hook error handling and unsubscribe behavior in executeStartupHooks.
  • Tests updated to include new hook/signature correctness and that test doubles match the expanded interface.

Possibly related PRs

Pre-merge checks and finishing touches

✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: allows hook in the subscriptions' directly reflects the main change: introducing startup hooks for subscriptions. It accurately captures the core feature being added.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch topic/streams-v1

Comment @coderabbitai help to get the list of available commands and usage tips.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Tests**
* Enhanced subscription-related test coverage for the GraphQL data
source, aligning test utilities with current interfaces to improve
reliability and maintainability.
* Validated subscribe/unsubscribe and update flows under various
scenarios to reduce brittleness and increase confidence in behavior.
* Performed minor cleanup in test code to improve readability without
altering functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

## Checklist

- [ ] I have discussed my proposed changes in an issue and have received
approval to proceed.
- [ ] I have followed the coding standards of the project.
- [ ] Tests or benchmarks have been added or updated.

Implements missing methods to satisfy the `resolve.SubscriptionUpdater`
interface, so tests can build. Since these methods are not used by the
tests, they are empty. There tests in place already in resolve_test.go,
which test wether the updater can manage subscriptions individually, so
I did not add any.
@coderabbitai summary

## Checklist

- [ ] I have discussed my proposed changes in an issue and have received
approval to proceed.
- [ ] I have followed the coding standards of the project.
- [ ] Tests or benchmarks have been added or updated.

I adjusted two tests which are failing during Githubs CI run:

```
--- FAIL: TestResolver_ResolveGraphQLSubscription (31.69s)
    --- FAIL: TestResolver_ResolveGraphQLSubscription/SubscriptionOnStart_ctx_updater_only_updates_the_right_subscription (0.01s)
        resolve_test.go:5604: 
            	Error Trace:	/home/runner/work/graphql-go-tools/graphql-go-tools/v2/pkg/engine/resolve/resolve_test.go:5604
            	Error:      	Should be true
            	Test:       	TestResolver_ResolveGraphQLSubscription/SubscriptionOnStart_ctx_updater_only_updates_the_right_subscription
    --- FAIL: TestResolver_ResolveGraphQLSubscription/SubscriptionOnStart_ctx_updater_on_multiple_subscriptions_with_same_trigger_works (0.01s)
        resolve_test.go:5674: 
            	Error Trace:	/home/runner/work/graphql-go-tools/graphql-go-tools/v2/pkg/engine/resolve/resolve_test.go:5674
            	Error:      	should not be here
            	Test:       	TestResolver_ResolveGraphQLSubscription/SubscriptionOnStart_ctx_updater_on_multiple_subscriptions_with_same_trigger_works
FAIL
```

These tests work locally, at least on my machine. I assumed it only
manifests itself on slower CPU´s like it's the case on Github workers.
After testing around with some time.Sleeps() here and there, I found
that if messages in tests are not emitted fast enough, then heartbeat
messages might find their way into the recorders message list. As these
tests have nothing to do with heartbeat testing I disabled them here. I
am not 100% sure this is the problem, but I am fairly certain and I
think it makes sense anyway.
@coderabbitai summary

## Checklist

- [x] I have discussed my proposed changes in an issue and have received
approval to proceed.
- [x] I have followed the coding standards of the project.
- [x] Tests or benchmarks have been added or updated.

# Context

Some notes first:
- Its only happening for tests introduced on the cosmo streams topic
branch
- It seems to be a race condition in tests rather than actual engine
code

I spotted two tests failing on Github Actions due to race conditions.
They work locally and are CPU timings related.

Those two tests are
- test 1 `SubscriptionOnStart ctx updater only updates the right
subscription`
- test 2 `SubscriptionOnStart ctx updater on multiple subscriptions with
same trigger works`

### test 1:
There is a race condition going on. Here is the output of the test on
Github runners with engine logs enabled.
```
resolver:trigger:subscription:add:17241709254077376921:1
resolver:create:trigger:17241709254077376921
resolver:trigger:start:17241709254077376921
resolver:subscription_updater:update:17241709254077376921
resolver:trigger:initialized:17241709254077376921
resolver:subscription_updater:update:17241709254077376921
resolver:trigger:subscription:update:17241709254077376921:1,1
resolver:trigger:update:17241709254077376921
resolver:trigger:subscription:add:17241709254077376921:2
resolver:trigger:subscription:added:17241709254077376921:2
resolver:trigger:subscription:update:1
resolver:trigger:subscription:flushed:1
resolver:trigger:subscription:update:1
resolver:trigger:subscription:flushed:1
resolver:trigger:started:17241709254077376921
resolver:subscription_updater:complete:17241709254077376921
resolver:subscription_updater:complete:sent_event:17241709254077376921
resolver:trigger:complete:17241709254077376921
resolver:trigger:complete:17241709254077376921
resolver:trigger:subscription:closed:17241709254077376921:1
resolver:trigger:subscription:closed:17241709254077376921:2

recorder 1 messages: [{"data":{"counter":1000}} {"data":{"counter":0}}]
recorder 2 messages: []
```

As you can see recorder 2 misses its one expected message. The reason is
that we update the trigger with the counter=0 message (line 8) before
the second subscriber is added (line 9). So it misses the message. This
happens because in the test we don't wait for the subscriber to finish
registration on the trigger before sending the counter=0 message. Now we
actually wait for that.

### test 2:
Kind of the same error. Here is the engine debug output from a failing
Github Actions run:

```
resolver:trigger:subscription:add:15889878720417707388:1
resolver:create:trigger:15889878720417707388
resolver:trigger:start:15889878720417707388
resolver:subscription_updater:update:15889878720417707388
resolver:trigger:initialized:15889878720417707388
resolver:subscription_updater:update:15889878720417707388
resolver:trigger:subscription:update:15889878720417707388:1,1
resolver:trigger:update:15889878720417707388
resolver:trigger:subscription:add:15889878720417707388:2
resolver:trigger:subscription:added:15889878720417707388:2
resolver:subscription_updater:update:15889878720417707388
resolver:trigger:subscription:update:15889878720417707388:1,2
resolver:trigger:subscription:update:2
resolver:trigger:started:15889878720417707388
resolver:trigger:subscription:update:1
resolver:trigger:subscription:flushed:2
resolver:trigger:subscription:flushed:1
resolver:trigger:subscription:update:1
resolver:trigger:subscription:flushed:1
resolver:subscription_updater:complete:15889878720417707388
resolver:subscription_updater:complete:sent_event:15889878720417707388
resolver:trigger:complete:15889878720417707388
resolver:trigger:complete:15889878720417707388
resolver:trigger:subscription:closed:15889878720417707388:1
resolver:trigger:subscription:closed:15889878720417707388:2

recorder 1 messages: [{"data":{"counter":1000}} {"data":{"counter":0}}]
recorder 2 messages: [{"data":{"counter":1000}}]
```

As you can see recorder 2 misses the counter=0 message. Both are
expected to have the same messages in the same order. Both recorders
have the counter=1000 message, which is delivered via
subscription-on-start hook but recorder 2 misses the counter=0 message,
delivered via fake stream. The count=0 message is delivered (line 8)
before recorder 2 is subscribed (line 9). This happens because in this
test, like in the other, we don't wait for the recorders to finish
subscribing to the trigger, and sending off the counter=0 messages via
fake stream early. Its fixed by waiting for a complete subscription.
@dkorittki dkorittki requested a review from jensneuse November 10, 2025 14:36
@dkorittki
Copy link
Contributor

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 10, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (3)

8274-8286: Interface shims OK; add compile-time assertion

No-op methods correctly satisfy the expanded resolve.SubscriptionUpdater. Suggest adding a compile-time interface check to avoid future drift.

 type testSubscriptionUpdaterChan struct {
   updates  chan string
   complete chan struct{}
   closed   chan resolve.SubscriptionCloseKind
 }
 
+// ensure interface stays satisfied
+var _ resolve.SubscriptionUpdater = (*testSubscriptionUpdaterChan)(nil)

8412-8424: Mirror compile-time check for the second test updater

Same suggestion as above to guard against interface changes.

 type testSubscriptionUpdater struct {
   updates []string
   done    bool
   closed  bool
   mux     sync.Mutex
 }
 
+// ensure interface stays satisfied
+var _ resolve.SubscriptionUpdater = (*testSubscriptionUpdater)(nil)

8998-9051: Good coverage of SubscriptionOnStart; consider two extra cases

Solid tests for invocation and error propagation. Consider:

  • An integration test asserting Start() invokes SubscriptionOnStart and propagates hook errors (e.g., returns resolve.ErrUnableToResolve if that’s the contract).
  • A hook that calls ctx.Updater(...) to verify startup hooks can emit an initial event. Based on learnings.

If helpful, I can draft a small test that fails if SubscriptionOnStart isn’t invoked from Start().

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ecbac93 and 067b413.

📒 Files selected for processing (12)
  • v2/pkg/engine/datasource/graphql_datasource/configuration.go (3 hunks)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (3 hunks)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (4 hunks)
  • v2/pkg/engine/datasource/pubsub_datasource/kafka_event_manager.go (0 hunks)
  • v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go (0 hunks)
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go (0 hunks)
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go (0 hunks)
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_kafka.go (0 hunks)
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_nats.go (0 hunks)
  • v2/pkg/engine/resolve/datasource.go (1 hunks)
  • v2/pkg/engine/resolve/resolve.go (11 hunks)
  • v2/pkg/engine/resolve/resolve_test.go (20 hunks)
💤 Files with no reviewable changes (6)
  • v2/pkg/engine/datasource/pubsub_datasource/kafka_event_manager.go
  • v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_kafka.go
  • v2/pkg/engine/datasource/pubsub_datasource/pubsub_nats.go
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:339-347
Timestamp: 2025-09-19T14:51:33.724Z
Learning: In the graphql-go-tools resolver, the event loop follows a strict single-threaded design where all subscription lifecycle management (add/remove from triggers) must happen through the events channel to the main processEvents() goroutine. Worker goroutines should not directly modify subscription state or call unsubscribe methods, as this would break the single-threaded event loop architecture and create race conditions.
📚 Learning: 2025-09-19T14:51:33.724Z
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:339-347
Timestamp: 2025-09-19T14:51:33.724Z
Learning: In the graphql-go-tools resolver, the event loop follows a strict single-threaded design where all subscription lifecycle management (add/remove from triggers) must happen through the events channel to the main processEvents() goroutine. Worker goroutines should not directly modify subscription state or call unsubscribe methods, as this would break the single-threaded event loop architecture and create race conditions.

Applied to files:

  • v2/pkg/engine/resolve/resolve.go
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go
  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-09-19T14:50:19.528Z
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:999-1011
Timestamp: 2025-09-19T14:50:19.528Z
Learning: In the graphql-go-tools resolver, when handling backpressure for async unsubscribe operations, the team prefers using individual goroutines over bounded deferrer implementations. The rationale is that bounded implementations can also overflow, and the preferred solution for overload is scaling the deployment rather than internal buffering. The current approach prevents deadlocks at the cost of goroutine creation, which is an acceptable tradeoff.

Applied to files:

  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go
  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-08-29T09:35:47.969Z
Learnt from: ysmolski
Repo: wundergraph/graphql-go-tools PR: 1282
File: v2/pkg/engine/plan/visitor.go:5-5
Timestamp: 2025-08-29T09:35:47.969Z
Learning: The wundergraph/graphql-go-tools project does not support Go versions < 1.23, so compatibility concerns for features available in Go 1.21+ (like cmp.Or) should not be raised.

Applied to files:

  • v2/pkg/engine/datasource/graphql_datasource/configuration.go
📚 Learning: 2025-10-16T13:05:19.838Z
Learnt from: Noroth
Repo: wundergraph/graphql-go-tools PR: 1323
File: v2/pkg/engine/datasource/grpc_datasource/compiler.go:683-702
Timestamp: 2025-10-16T13:05:19.838Z
Learning: In GraphQL field resolver context resolution (v2/pkg/engine/datasource/grpc_datasource/compiler.go), when traversing paths in resolveContextDataForPath, the code can safely assume that intermediate path segments will only be messages or lists, never scalars. This is because field resolvers are only defined on GraphQL object types, not scalar types, so the parent function must return either a message or a list. This invariant is enforced by the GraphQL type system design.

Applied to files:

  • v2/pkg/engine/datasource/graphql_datasource/configuration.go
🔇 Additional comments (2)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (1)

4009-4010: LGTM: test wiring matches new client field

Using SubscriptionSource.client here reflects the new startup-hook wiring path and keeps tests realistic.

v2/pkg/engine/resolve/resolve_test.go (1)

797-824: Good defensive programming with nil check

The implementation properly checks for nil before invoking the subscriptionOnStartFn callback, which maintains backward compatibility and prevents panics when the hook is not provided.

Comment on lines 2010 to 2016
// Each hook is called in a separate goroutine.
func (s *SubscriptionSource) SubscriptionOnStart(ctx resolve.StartupHookContext, input []byte) (err error) {
for _, fn := range s.subscriptionOnStartFns {
return fn(ctx, input)
}
return nil
}
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Run every configured subscription startup hook

Returning from inside the loop means only the first hook in StartupHooks ever runs, silently skipping the rest. That breaks configurations that rely on multiple hooks (logging, auditing, metrics, etc.) even though the API exposes a slice. Execute all hooks sequentially, short‑circuiting only on error, and update the comment to match the behavior.

Apply this diff to fix the logic:

-// SubscriptionOnStart is called when a subscription is started.
-// Each hook is called in a separate goroutine.
-//
-func (s *SubscriptionSource) SubscriptionOnStart(ctx resolve.StartupHookContext, input []byte) (err error) {
-	for _, fn := range s.subscriptionOnStartFns {
-		return fn(ctx, input)
-	}
-	return nil
+// SubscriptionOnStart is called when a subscription is started.
+// Hooks are invoked sequentially so errors can be propagated back to the caller.
+func (s *SubscriptionSource) SubscriptionOnStart(ctx resolve.StartupHookContext, input []byte) error {
+	for _, fn := range s.subscriptionOnStartFns {
+		if err := fn(ctx, input); err != nil {
+			return err
+		}
+	}
+	return nil
 }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go around
lines 2010 to 2016, the loop currently returns on the first iteration so only
the first StartupHook runs; change the loop to call each fn sequentially and
only return if a hook returns a non‑nil error (i.e., for _, fn := range
s.subscriptionOnStartFns { if err = fn(ctx, input); err != nil { return err } }
return nil), and update the comment above the function to state that hooks are
executed sequentially and the function short‑circuits on the first error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the current implementation of SubscriptionOnStart in the PR branch
echo "=== Current SubscriptionOnStart implementation ==="
ast-grep --pattern $'func (s *SubscriptionSource) SubscriptionOnStart($$$) $$$'

Length of output: 988


@dkorittki Thanks for the fix! ✅

I've confirmed the implementation now correctly iterates through all startup hooks sequentially, returning early only if a hook returns an error. This ensures all configured hooks will execute as expected.


🧠 Learnings used
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:339-347
Timestamp: 2025-09-19T14:51:33.724Z
Learning: In the graphql-go-tools resolver, the event loop follows a strict single-threaded design where all subscription lifecycle management (add/remove from triggers) must happen through the events channel to the main processEvents() goroutine. Worker goroutines should not directly modify subscription state or call unsubscribe methods, as this would break the single-threaded event loop architecture and create race conditions.

Comment on lines 5459 to 5871
t.Run("should call SubscriptionOnStart hook", func(t *testing.T) {
c, cancel := context.WithCancel(context.Background())
defer cancel()

called := make(chan bool, 1)

fakeStream := createFakeStream(func(counter int) (message string, done bool) {
return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == 0
}, 1*time.Millisecond, func(input []byte) {
assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
}, func(ctx StartupHookContext, input []byte) (err error) {
called <- true
return nil
})

resolver, plan, recorder, id := setup(c, fakeStream)

ctx := &Context{
ctx: context.Background(),
}

err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
assert.NoError(t, err)

select {
case <-called:
t.Log("SubscriptionOnStart hook was called")
case <-time.After(defaultTimeout):
t.Fatal("SubscriptionOnStart hook was not called")
}

recorder.AwaitComplete(t, defaultTimeout)
})

t.Run("SubscriptionOnStart ctx has a working subscription updater", func(t *testing.T) {
c, cancel := context.WithCancel(context.Background())
defer cancel()

fakeStream := createFakeStream(func(counter int) (message string, done bool) {
return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == 0
}, 1*time.Millisecond, func(input []byte) {
assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
}, func(ctx StartupHookContext, input []byte) (err error) {
ctx.Updater([]byte(`{"data":{"counter":1000}}`))
return nil
})

resolver, plan, recorder, id := setup(c, fakeStream)

ctx := &Context{
ctx: context.Background(),
ExecutionOptions: ExecutionOptions{
SendHeartbeat: true,
},
}

err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
assert.NoError(t, err)

recorder.AwaitComplete(t, defaultTimeout)
assert.Equal(t, 2, len(recorder.Messages()))
assert.Equal(t, `{"data":{"counter":1000}}`, recorder.Messages()[0])
assert.Equal(t, `{"data":{"counter":0}}`, recorder.Messages()[1])
})

t.Run("SubscriptionOnStart ctx updater only updates the right subscription", func(t *testing.T) {
c, cancel := context.WithCancel(context.Background())
defer cancel()

executed := atomic.Bool{}

id2 := SubscriptionIdentifier{
ConnectionID: 1,
SubscriptionID: 2,
}

streamCanStart := make(chan struct{})
startupHookWaitGroup := sync.WaitGroup{}
startupHookWaitGroup.Add(2)

// this message must come as last, on both recorders.
messageFn := func(counter int) (message string, done bool) {
<-streamCanStart
return `{"data":{"counter":0}}`, true
}

onStartFn := func(input []byte) {
assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
}

// this message must come first on the first recorder to be added to the trigger.
subscriptionOnStartFn := func(ctx StartupHookContext, input []byte) (err error) {
defer startupHookWaitGroup.Done()
if executed.Load() {
return
}
executed.Store(true)
ctx.Updater([]byte(`{"data":{"counter":1000}}`))
return nil
}

fakeStream := createFakeStream(messageFn, time.Millisecond, onStartFn, subscriptionOnStartFn)
fakeStream.uniqueRequestFn = func(ctx *Context, input []byte, xxh *xxhash.Digest) (err error) {
return nil
}

resolver, plan, recorder, id := setup(c, fakeStream)

recorder2 := &SubscriptionRecorder{
buf: &bytes.Buffer{},
messages: []string{},
complete: atomic.Bool{},
}
recorder2.complete.Store(false)

ctx := &Context{
ctx: context.Background(),
ExecutionOptions: ExecutionOptions{
SendHeartbeat: false,
},
}

ctx2 := &Context{
ctx: context.Background(),
ExecutionOptions: ExecutionOptions{
SendHeartbeat: false,
},
}

err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
assert.NoError(t, err)

err2 := resolver.AsyncResolveGraphQLSubscription(ctx2, plan, recorder2, id2)
assert.NoError(t, err2)

done := make(chan struct{})
go func() {
startupHookWaitGroup.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(defaultTimeout):
t.Fatal("timed out waiting for subscription startup hooks")
}

// Wait for both subscriptions startup hooks to be executed
startupHookWaitGroup.Wait()

// Signal the stream to send its message now that both subscriptions are ready
close(streamCanStart)

recorder.AwaitComplete(t, defaultTimeout)
recorder2.AwaitComplete(t, defaultTimeout)

recorderWith1Message := false
recorderWith2Messages := false

recorders := []*SubscriptionRecorder{recorder, recorder2}

for _, r := range recorders {
if len(r.Messages()) == 2 {
recorderWith2Messages = true
assert.Equal(t, `{"data":{"counter":1000}}`, r.Messages()[0])
assert.Equal(t, `{"data":{"counter":0}}`, r.Messages()[1])
}
if len(r.Messages()) == 1 {
recorderWith1Message = true
assert.Equal(t, `{"data":{"counter":0}}`, r.Messages()[0])
}
}

assert.True(t, recorderWith1Message, "recorder 1: %v, recorder 2: %v", recorder.messages, recorder2.messages)
assert.True(t, recorderWith2Messages, "recorder 1: %v, recorder 2: %v", recorder.messages, recorder2.messages)
})

t.Run("SubscriptionOnStart ctx updater on multiple subscriptions with same trigger works", func(t *testing.T) {
c, cancel := context.WithCancel(context.Background())
defer cancel()

id2 := SubscriptionIdentifier{
ConnectionID: 1,
SubscriptionID: 2,
}

streamCanStart := make(chan struct{})

// Message function that waits for signal before sending the final message
messageFn := func(counter int) (message string, done bool) {
<-streamCanStart
return `{"data":{"counter":0}}`, true
}

onStartFn := func(input []byte) {
assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
}

// this handler pushes the first message to both subscribers via subscription-on-start hook.
subscriptionOnStartFn := func(ctx StartupHookContext, input []byte) (err error) {
ctx.Updater([]byte(`{"data":{"counter":1000}}`))
return nil
}

fakeStream := createFakeStream(messageFn, 1*time.Millisecond, onStartFn, subscriptionOnStartFn)
fakeStream.uniqueRequestFn = func(ctx *Context, input []byte, xxh *xxhash.Digest) (err error) {
_, err = xxh.WriteString("unique")
return
}

resolver, plan, recorder, id := setup(c, fakeStream)

recorder2 := &SubscriptionRecorder{
buf: &bytes.Buffer{},
messages: []string{},
complete: atomic.Bool{},
}
recorder2.complete.Store(false)

ctx := &Context{
ctx: context.Background(),
ExecutionOptions: ExecutionOptions{
SendHeartbeat: false,
},
}

ctx2 := &Context{
ctx: context.Background(),
ExecutionOptions: ExecutionOptions{
SendHeartbeat: false,
},
}

err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
assert.NoError(t, err)

err2 := resolver.AsyncResolveGraphQLSubscription(ctx2, plan, recorder2, id2)
assert.NoError(t, err2)

// Wait for subscriptions to receive their initial message from the subscription-on-start hook.
// Then we know the subscriptions are fully registered on the trigger and then we can send
// the next message (by closing the streamCanStart channel).
recorder.AwaitAnyMessageCount(t, defaultTimeout)
recorder2.AwaitAnyMessageCount(t, defaultTimeout)
close(streamCanStart)

recorder.AwaitComplete(t, defaultTimeout)
recorder2.AwaitComplete(t, defaultTimeout)

// Both recorders should have received both messages in the correct order.
for _, r := range []*SubscriptionRecorder{recorder, recorder2} {
assert.Len(t, r.Messages(), 2, "recorder messages: %v", r.messages)
assert.Equal(t, `{"data":{"counter":1000}}`, r.Messages()[0])
assert.Equal(t, `{"data":{"counter":0}}`, r.Messages()[1])
}
})

t.Run("SubscriptionOnStart can send a lot of updates without blocking", func(t *testing.T) {
c, cancel := context.WithCancel(context.Background())
defer cancel()
workChanBufferSize := 10000

fakeStream := createFakeStream(func(counter int) (message string, done bool) {
return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == 0
}, 1*time.Millisecond, func(input []byte) {
assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
}, func(ctx StartupHookContext, input []byte) (err error) {
for i := 0; i < workChanBufferSize+1; i++ {
ctx.Updater([]byte(fmt.Sprintf(`{"data":{"counter":%d}}`, i+100)))
}
return nil
})

resolver, plan, recorder, id := setup(c, fakeStream)

ctx := &Context{
ctx: context.Background(),
ExecutionOptions: ExecutionOptions{
SendHeartbeat: true,
},
}

err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
assert.NoError(t, err)

recorder.AwaitComplete(t, defaultTimeout)
assert.Equal(t, workChanBufferSize+2, len(recorder.Messages()))
for i := 0; i < workChanBufferSize; i++ {
assert.Equal(t, fmt.Sprintf(`{"data":{"counter":%d}}`, i+100), recorder.Messages()[i])
}
assert.Equal(t, `{"data":{"counter":0}}`, recorder.Messages()[workChanBufferSize+1])
})

t.Run("SubscriptionOnStart can send a lot of updates in a go routine while updates are coming from other sources", func(t *testing.T) {
c, cancel := context.WithCancel(context.Background())
defer cancel()

messagesToSendFromHook := int32(100)
messagesDroppedFromHook := &atomic.Int32{}
messagesToSendFromOtherSources := int32(100)

firstMessageArrived := make(chan bool, 1)
hookCompleted := make(chan bool, 1)
fakeStream := createFakeStream(func(counter int) (message string, done bool) {
if counter == 0 {
select {
case firstMessageArrived <- true:
default:
}
}
if counter == int(messagesToSendFromOtherSources)-1 {
select {
case hookCompleted <- true:
case <-time.After(defaultTimeout):
}
}
return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == int(messagesToSendFromOtherSources)-1
}, 1*time.Millisecond, func(input []byte) {
assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
}, func(ctx StartupHookContext, input []byte) (err error) {
// send the first update immediately
ctx.Updater([]byte(fmt.Sprintf(`{"data":{"counter":%d}}`, 0+20000)))

// start a go routine to send the updates after the source started emitting messages
go func() {
// Wait for the first message to arrive before sending updates
select {
case <-firstMessageArrived:
for i := 1; i < int(messagesToSendFromHook); i++ {
ctx.Updater([]byte(fmt.Sprintf(`{"data":{"counter":%d}}`, i+20000)))
}
hookCompleted <- true
case <-time.After(defaultTimeout):
// if the first message did not arrive, do not send any updates
return
}
}()

return nil
})

resolver, plan, recorder, id := setup(c, fakeStream)

ctx := &Context{
ctx: context.Background(),
ExecutionOptions: ExecutionOptions{
SendHeartbeat: false,
},
}

err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
assert.NoError(t, err)

recorder.AwaitComplete(t, defaultTimeout*2)

var messagesHeartbeat int32
for _, m := range recorder.Messages() {
if m == "{}" {
messagesHeartbeat++
}
}
assert.Equal(t, int32(messagesToSendFromHook+messagesToSendFromOtherSources-messagesDroppedFromHook.Load()+messagesHeartbeat), int32(len(recorder.Messages())))
assert.Equal(t, `{"data":{"counter":20000}}`, recorder.Messages()[0])
})

t.Run("it is possible to have two subscriptions to the same trigger", func(t *testing.T) {
c, cancel := context.WithCancel(context.Background())
defer cancel()

fakeStream := createFakeStream(func(counter int) (message string, done bool) {
return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == 100
}, 1*time.Millisecond, func(input []byte) {
assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
}, func(ctx StartupHookContext, input []byte) (err error) {
return nil
})
fakeStream.uniqueRequestFn = func(ctx *Context, input []byte, xxh *xxhash.Digest) (err error) {
_, err = xxh.WriteString("unique")
if err != nil {
return
}
_, err = xxh.Write(input)
return err
}

resolver1, plan1, recorder1, id1 := setup(c, fakeStream)
_, _, recorder2, id2 := setup(c, fakeStream)
id2.ConnectionID = id1.ConnectionID + 1
id2.SubscriptionID = id1.SubscriptionID + 1

ctx1 := &Context{
ctx: context.Background(),
}
ctx2 := &Context{
ctx: context.Background(),
}

err1 := resolver1.AsyncResolveGraphQLSubscription(ctx1, plan1, recorder1, id1)
assert.NoError(t, err1)

err2 := resolver1.AsyncResolveGraphQLSubscription(ctx2, plan1, recorder2, id2)
assert.NoError(t, err2)

// complete is called only on the last recorder
recorder1.AwaitComplete(t, defaultTimeout)
require.Equal(t, 101, len(recorder1.Messages()))
assert.Equal(t, `{"data":{"counter":0}}`, recorder1.Messages()[0])
assert.Equal(t, `{"data":{"counter":100}}`, recorder1.Messages()[100])

recorder2.AwaitComplete(t, defaultTimeout)
require.Equal(t, 101, len(recorder2.Messages()))
assert.Equal(t, `{"data":{"counter":0}}`, recorder2.Messages()[0])
assert.Equal(t, `{"data":{"counter":100}}`, recorder2.Messages()[100])
})
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing test case: error propagation from SubscriptionOnStart hook

The AI summary indicates this PR "validates propagation of errors from the hook," but none of the seven new tests verify what happens when SubscriptionOnStart returns an error. All tests either pass nil for the hook function or explicitly return nil from it.

Add a test case to verify error handling:

t.Run("should propagate errors from SubscriptionOnStart hook", func(t *testing.T) {
	c, cancel := context.WithCancel(context.Background())
	defer cancel()

	expectedErr := errors.New("startup hook failed")
	fakeStream := createFakeStream(func(counter int) (message string, done bool) {
		return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == 0
	}, 1*time.Millisecond, func(input []byte) {
		assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
	}, func(ctx StartupHookContext, input []byte) (err error) {
		return expectedErr
	})

	resolver, plan, recorder, id := setup(c, fakeStream)

	ctx := &Context{
		ctx: context.Background(),
	}

	err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
	// Verify the error from the hook is properly propagated
	assert.Error(t, err)
	// Optionally verify it's the expected error or check error message in recorder
})
🤖 Prompt for AI Agents
In v2/pkg/engine/resolve/resolve_test.go around lines 5459-5871, add a new
subtest that ensures errors returned by the SubscriptionOnStart hook are
propagated: create a fakeStream whose startup hook returns a sentinel error
(e.g. errors.New("startup hook failed")), call setup(...) to get resolver, plan,
recorder and id, call resolver.AsyncResolveGraphQLSubscription(ctx, plan,
recorder, id) and assert that an error is returned (and optionally that it
matches the sentinel error), and assert that no successful subscription messages
are produced/recorder did not complete if applicable; keep the test pattern and
timeouts consistent with the surrounding tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added such a test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for the test case that propagates errors from SubscriptionOnStart hook
echo "=== Searching for error propagation test by name ==="
rg -n "should propagate errors from SubscriptionOnStart" --type go

echo -e "\n=== Searching for error propagation test patterns ==="
rg -n -A5 "SubscriptionOnStart.*error" v2/pkg/engine/resolve/resolve_test.go v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go

echo -e "\n=== Searching for test functions mentioning error and SubscriptionOnStart ==="
rg -n -B2 -A10 't\.Run.*SubscriptionOnStart.*(?:error|propagat)' --type go v2/pkg/engine/resolve/ v2/pkg/engine/datasource/graphql_datasource/

Length of output: 2918


@dkorittki Thank you for adding the test! I can confirm that the test case "should propagate errors from SubscriptionOnStart hook" has been added to v2/pkg/engine/resolve/resolve_test.go at line 5872. This properly validates that errors returned by the SubscriptionOnStart hook are correctly propagated.

The test coverage for this feature now looks comprehensive with error handling tested at both:

  • The datasource level in graphql_datasource_test.go
  • The resolver integration level in resolve_test.go

Great work addressing the feedback! ✅


🧬 Code Graph Analysis Results
  • File: v2/pkg/engine/resolve/datasource.go
    Lines 12-15:

    DataSource interface {
    	Load(ctx context.Context, input []byte, out *bytes.Buffer) (err error)
    	LoadWithFiles(ctx context.Context, input []byte, files []*httpclient.FileUpload, out *bytes.Buffer) (err error)
    }
    
  • File: v2/pkg/engine/resolve/inputtemplate.go
    Lines 22-29:

    TemplateSegment struct {
    	SegmentType        SegmentType
    	Data               []byte
    	VariableKind       VariableKind
    	VariableSourcePath []string
    	Renderer           VariableRenderer
    	Segments           []TemplateSegment
    }
    

    Lines 31-37:

    InputTemplate struct {
    	Segments []TemplateSegment
    	// SetTemplateOutputToNullOnVariableNull will safely return "null" if one of the template variables renders to null
    	// This is the case, e.g. when using batching and one sibling is null, resulting in a null value for one batch item
    	// Returning null in this case tells the batch implementation to skip this item
      SetTemplateOutputToNullOnVariableNull bool
    }
    
  • File: v2/pkg/engine/resolve/fetchtree.go
    Lines 59-67:

    func Single(fetch Fetch, path ...FetchItemPathElement) *FetchTreeNode {
    	return &FetchTreeNode{
    		Kind: FetchTreeNodeKindSingle,
    		Item: &FetchItem{
    			Fetch:     fetch,
    			FetchPath: path,
    		},
    	}
    }
    

    Lines 69-82:

    func SingleWithPath(fetch Fetch, responsePath string, path ...FetchItemPathElement) *FetchTreeNode {
    	node := &FetchTreeNode{
    		Kind: FetchTreeNodeKindSingle,
    		Item: &FetchItem{
    			Fetch:        fetch,
    			FetchPath:    path,
    			ResponsePath: responsePath,
    		},
    	}
    	if responsePath != "" {
    		node.Item.ResponsePathElements = strings.Split(responsePath, ".")
    	}
    	return node
    }
    
  • File: v2/pkg/engine/resolve/resolve.go
    Lines 173-234:

    func New(ctx context.Context, options ResolverOptions) *Resolver {
    	// options.Debug = true
    	if options.MaxConcurrency <= 0 {
    		options.MaxConcurrency = 32
    	}
    
    	if options.SubscriptionHeartbeatInterval <= 0 {
    		options.SubscriptionHeartbeatInterval = DefaultHeartbeatInterval
    	}
    
    	// We transform the allowed fields into a map for faster lookups
    	allowedExtensionFields := make(map[string]struct{}, len(options.AllowedErrorExtensionFields))
    	for _, field := range options.AllowedErrorExtensionFields {
    		allowedExtensionFields[field] = struct{}{}
    	}
    
    	// always allow "message" and "path"
    	allowedErrorFields := map[string]struct{}{
    		"message": {},
    		"path":    {},
    	}
    
    	if options.MaxSubscriptionFetchTimeout == 0 {
    		options.MaxSubscriptionFetchTimeout = 30 * time.Second
    	}
    
    	if !options.OmitSubgraphErrorExtensions {
    		allowedErrorFields["extensions"] = struct{}{}
    	}
    
    	if !options.OmitSubgraphErrorLocations {
    		allowedErrorFields["locations"] = struct{}{}
    	}
    
    	for _, field := range options.AllowedSubgraphErrorFields {
    		allowedErrorFields[field] = struct{}{}
    	}
    
    	resolver := &Resolver{
    		ctx:                          ctx,
    		options:                      options,
    		propagateSubgraphErrors:      options.PropagateSubgraphErrors,
    		propagateSubgraphStatusCodes: options.PropagateSubgraphStatusCodes,
    		events:                       make(chan subscriptionEvent),
    		triggers:                     make(map[uint64]*trigger),
    		reporter:                     options.Reporter,
    		asyncErrorWriter:             options.AsyncErrorWriter,
    		triggerUpdateBuf:             bytes.NewBuffer(make([]byte, 0, 1024)),
    		allowedErrorExtensionFields:  allowedExtensionFields,
    		allowedErrorFields:           allowedErrorFields,
    		heartbeatInterval:            options.SubscriptionHeartbeatInterval,
    		maxSubscriptionFetchTimeout:  options.MaxSubscriptionFetchTimeout,
    	}
    	resolver.maxConcurrency = make(chan struct{}, options.MaxConcurrency)
    	for i := 0; i < options.MaxConcurrency; i++ {
    		resolver.maxConcurrency <- struct{}{}
    	}
    
    	go resolver.processEvents()
    
    	return resolver
    }
    
  • File: v2/pkg/engine/resolve/response.go
    Lines 34-42:

    GraphQLResponse struct {
    	Data *Object
    
    	RawFetches []*FetchItem
    	Fetches    *FetchTreeNode
    
    	Info        *GraphQLResponseInfo
    	DataSources []DataSourceInfo
    }
    

    Lines 44-50:

    GraphQLResponseInfo struct {
    	OperationType ast.OperationType
    }
    
  • File: v2/pkg/engine/resolve/variables.go
    Lines 27-31:

    Variables []Variable
    
    func NewVariables(variables ...Variable) Variables {
    	return variables
    }
    

    Lines 51-54:

    ContextVariable struct {
    	Path     []string
    	Renderer VariableRenderer
    }
    

🧠 Learnings used
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:339-347
Timestamp: 2025-09-19T14:51:33.724Z
Learning: In the graphql-go-tools resolver, the event loop follows a strict single-threaded design where all subscription lifecycle management (add/remove from triggers) must happen through the events channel to the main processEvents() goroutine. Worker goroutines should not directly modify subscription state or call unsubscribe methods, as this would break the single-threaded event loop architecture and create race conditions.

Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:999-1011
Timestamp: 2025-09-19T14:50:19.528Z
Learning: In the graphql-go-tools resolver, when handling backpressure for async unsubscribe operations, the team prefers using individual goroutines over bounded deferrer implementations. The rationale is that bounded implementations can also overflow, and the preferred solution for overload is scaling the deployment rather than internal buffering. The current approach prevents deadlocks at the cost of goroutine creation, which is an acceptable tradeoff.

@dkorittki dkorittki marked this pull request as ready for review November 10, 2025 15:26
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
v2/pkg/engine/resolve/resolve_test.go (2)

4797-4803: Startup hook plumbing in test fake stream: good; consider options for ergonomics

Adding subscriptionOnStartFn to createFakeStream is correct and keeps tests focused. To reduce widespread nils and improve readability, consider an options struct (e.g., fakeStreamOpts{messageFn, delay, onStart, onStartHook}) instead of multiple positional params.


5561-5564: Make UniqueRequestID overrides write to the digest for deterministic grouping

Returning nil without writing to xxh relies on an all-zero digest. Write a constant to ensure stable “same trigger” semantics and future-proof against internal changes.

Apply this diff:

-        fakeStream.uniqueRequestFn = func(ctx *Context, input []byte, xxh *xxhash.Digest) (err error) {
-            return nil
-        }
+        fakeStream.uniqueRequestFn = func(ctx *Context, input []byte, xxh *xxhash.Digest) error {
+            if _, err := xxh.WriteString("same-trigger"); err != nil {
+                return err
+            }
+            _, err := xxh.Write(input)
+            return err
+        }
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 067b413 and 423a60c.

📒 Files selected for processing (2)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (3 hunks)
  • v2/pkg/engine/resolve/resolve_test.go (20 hunks)
🧰 Additional context used
🧠 Learnings (7)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:339-347
Timestamp: 2025-09-19T14:51:33.724Z
Learning: In the graphql-go-tools resolver, the event loop follows a strict single-threaded design where all subscription lifecycle management (add/remove from triggers) must happen through the events channel to the main processEvents() goroutine. Worker goroutines should not directly modify subscription state or call unsubscribe methods, as this would break the single-threaded event loop architecture and create race conditions.
📚 Learning: 2025-09-19T14:51:33.724Z
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:339-347
Timestamp: 2025-09-19T14:51:33.724Z
Learning: In the graphql-go-tools resolver, the event loop follows a strict single-threaded design where all subscription lifecycle management (add/remove from triggers) must happen through the events channel to the main processEvents() goroutine. Worker goroutines should not directly modify subscription state or call unsubscribe methods, as this would break the single-threaded event loop architecture and create race conditions.

Applied to files:

  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-09-19T14:50:19.528Z
Learnt from: endigma
Repo: wundergraph/graphql-go-tools PR: 1298
File: v2/pkg/engine/resolve/resolve.go:999-1011
Timestamp: 2025-09-19T14:50:19.528Z
Learning: In the graphql-go-tools resolver, when handling backpressure for async unsubscribe operations, the team prefers using individual goroutines over bounded deferrer implementations. The rationale is that bounded implementations can also overflow, and the preferred solution for overload is scaling the deployment rather than internal buffering. The current approach prevents deadlocks at the cost of goroutine creation, which is an acceptable tradeoff.

Applied to files:

  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-10-16T13:05:19.838Z
Learnt from: Noroth
Repo: wundergraph/graphql-go-tools PR: 1323
File: v2/pkg/engine/datasource/grpc_datasource/compiler.go:683-702
Timestamp: 2025-10-16T13:05:19.838Z
Learning: In GraphQL field resolver context resolution (v2/pkg/engine/datasource/grpc_datasource/compiler.go), when traversing paths in resolveContextDataForPath, the code can safely assume that intermediate path segments will only be messages or lists, never scalars. This is because field resolvers are only defined on GraphQL object types, not scalar types, so the parent function must return either a message or a list. This invariant is enforced by the GraphQL type system design.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-10-15T13:34:15.892Z
Learnt from: ysmolski
Repo: wundergraph/graphql-go-tools PR: 1322
File: v2/pkg/astvalidation/operation_rule_defer_stream_on_root_fields.go:92-127
Timestamp: 2025-10-15T13:34:15.892Z
Learning: In the graphql-go-tools repository, validation for defer and stream directives runs after normalization, which performs fragment inlining. Therefore, fragment spreads don't exist in the AST when these validation rules execute—they're already expanded into inline fragments or fields.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-08-08T09:43:07.433Z
Learnt from: Noroth
Repo: wundergraph/graphql-go-tools PR: 1262
File: v2/pkg/engine/datasource/grpc_datasource/json_builder.go:0-0
Timestamp: 2025-08-08T09:43:07.433Z
Learning: In v2/pkg/engine/datasource/grpc_datasource/json_builder.go, mergeEntities intentionally uses the loop index when calling indexMap.getResultIndex because the index map is type-aware, making per-type counters unnecessary under the current assumptions. Avoid suggesting per-type ordinal counters for this path in future reviews.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
📚 Learning: 2025-07-02T15:28:02.122Z
Learnt from: SkArchon
Repo: wundergraph/graphql-go-tools PR: 1203
File: v2/pkg/engine/resolve/loader.go:63-67
Timestamp: 2025-07-02T15:28:02.122Z
Learning: In the graphql-go-tools codebase, result structs are consistently initialized with non-nil bytes.Buffer instances, making additional nil checks for res.out unnecessary defensive programming.

Applied to files:

  • v2/pkg/engine/resolve/resolve_test.go
🧬 Code graph analysis (2)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (2)
v2/pkg/engine/datasource/graphql_datasource/configuration.go (1)
  • SubscriptionOnStartFn (110-110)
v2/pkg/engine/resolve/resolve.go (1)
  • StartupHookContext (595-598)
v2/pkg/engine/resolve/resolve_test.go (2)
v2/pkg/engine/resolve/resolve.go (2)
  • StartupHookContext (595-598)
  • SubscriptionIdentifier (1017-1020)
v2/pkg/engine/resolve/context.go (2)
  • Context (16-35)
  • ExecutionOptions (37-41)
🔇 Additional comments (5)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (3)

452-453: LGTM!

The subscription startup hooks are correctly wired from the configuration into the SubscriptionSource during subscription setup.


1956-1959: LGTM!

The struct fields appropriately store the subscription client and startup hook functions for later invocation.


2011-2020: LGTM!

The implementation correctly executes all registered startup hooks sequentially, returning early only if a hook returns an error. This matches the fix confirmed in the past review comments.

v2/pkg/engine/resolve/resolve_test.go (2)

4819-4824: No-op safe SubscriptionOnStart

Nil-guard with delegation is correct and keeps fake compliant with the new interface. LGTM.


5872-5901: Error propagation from SubscriptionOnStart hook is validated

Good addition. Confirms hook errors surface as GraphQL errors without failing the resolver call.

Comment on lines +2009 to +2010
// SubscriptionOnStart is called when a subscription is started.
// Each hook is called in a separate goroutine.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Update the comment to match the implementation.

The comment states "Each hook is called in a separate goroutine," but the implementation executes hooks sequentially on lines 2012-2017. This mismatch could confuse future maintainers about the execution model.

Apply this diff to fix the comment:

-// SubscriptionOnStart is called when a subscription is started.
-// Each hook is called in a separate goroutine.
+// SubscriptionOnStart is called when a subscription is started.
+// Hooks are invoked sequentially, short-circuiting on the first error.
🤖 Prompt for AI Agents
In v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go around
lines 2009–2010, update the comment for SubscriptionOnStart to reflect the
actual behavior: replace "Each hook is called in a separate goroutine." with
wording that states hooks are executed sequentially (e.g., "Each hook is
executed sequentially." or "Hooks are executed sequentially, not concurrently.")
so the comment matches the implementation on lines 2012–2017.

Comment on lines +5751 to +5821
t.Run("SubscriptionOnStart can send a lot of updates in a go routine while updates are coming from other sources", func(t *testing.T) {
c, cancel := context.WithCancel(context.Background())
defer cancel()

messagesToSendFromHook := int32(100)
messagesToSendFromOtherSources := int32(100)

firstMessageArrived := make(chan bool, 1)
hookCompleted := make(chan bool, 1)
fakeStream := createFakeStream(func(counter int) (message string, done bool) {
if counter == 0 {
select {
case firstMessageArrived <- true:
default:
}
}
if counter == int(messagesToSendFromOtherSources)-1 {
select {
case hookCompleted <- true:
case <-time.After(defaultTimeout):
}
}
return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == int(messagesToSendFromOtherSources)-1
}, 1*time.Millisecond, func(input []byte) {
assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
}, func(ctx StartupHookContext, input []byte) (err error) {
// send the first update immediately
ctx.Updater([]byte(fmt.Sprintf(`{"data":{"counter":%d}}`, 0+20000)))

// start a go routine to send the updates after the source started emitting messages
go func() {
// Wait for the first message to arrive before sending updates
select {
case <-firstMessageArrived:
for i := 1; i < int(messagesToSendFromHook); i++ {
ctx.Updater([]byte(fmt.Sprintf(`{"data":{"counter":%d}}`, i+20000)))
}
hookCompleted <- true
case <-time.After(defaultTimeout):
// if the first message did not arrive, do not send any updates
return
}
}()

return nil
})

resolver, plan, recorder, id := setup(c, fakeStream)

ctx := &Context{
ctx: context.Background(),
ExecutionOptions: ExecutionOptions{
SendHeartbeat: false,
},
}

err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
assert.NoError(t, err)

recorder.AwaitComplete(t, defaultTimeout*2)

var messagesHeartbeat int32
for _, m := range recorder.Messages() {
if m == "{}" {
messagesHeartbeat++
}
}
assert.Equal(t, int32(messagesToSendFromHook+messagesToSendFromOtherSources+messagesHeartbeat), int32(len(recorder.Messages())))
assert.Equal(t, `{"data":{"counter":20000}}`, recorder.Messages()[0])
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Blocked goroutine risk: write-only channel ‘hookCompleted’ has two senders and no receivers

hookCompleted is never read. The hook goroutine sends to it unconditionally, while messageFunc also tries to send (buffer=1). This can block the goroutine and leak. Also, heartbeat counting looks for "{}", but SubscriptionRecorder.Heartbeat() appends "heartbeat".

Recommend removing hookCompleted entirely and matching heartbeat tokens:

-        firstMessageArrived := make(chan bool, 1)
-        hookCompleted := make(chan bool, 1)
+        firstMessageArrived := make(chan bool, 1)
@@
-            if counter == int(messagesToSendFromOtherSources)-1 {
-                select {
-                case hookCompleted <- true:
-                case <-time.After(defaultTimeout):
-                }
-            }
+            // no-op; completion is determined by counter check
@@
-                    hookCompleted <- true
+                    // no-op: do not signal; no receiver
@@
-        var messagesHeartbeat int32
-        for _, m := range recorder.Messages() {
-            if m == "{}" {
-                messagesHeartbeat++
-            }
-        }
+        var messagesHeartbeat int32
+        for _, m := range recorder.Messages() {
+            if m == "heartbeat" {
+                messagesHeartbeat++
+            }
+        }

Based on learnings

🤖 Prompt for AI Agents
In v2/pkg/engine/resolve/resolve_test.go around lines 5751-5821, the test
creates a write-only channel hookCompleted that is never read and is written to
from two places which can block goroutines; also the test counts heartbeats by
looking for "{}" while SubscriptionRecorder.Heartbeat() appends the literal
"heartbeat". Remove the hookCompleted channel and all sends to it (use
firstMessageArrived only to coordinate), update the fakeStream/messageFunc so it
no longer writes to hookCompleted, and change the heartbeat counting/assertion
to look for "heartbeat" (and adjust the expected total accordingly).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants