Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 195 additions & 37 deletions pkg/test/ginkgo/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"io"
"strings"
"sync"

"k8s.io/apimachinery/pkg/util/sets"
)

// parallelByFileTestQueue runs tests in parallel unless they have
Expand All @@ -16,7 +18,165 @@ type parallelByFileTestQueue struct {
commandContext *commandContext
}

type TestFunc func(ctx context.Context, test *testCase)
// getTestConflictGroup returns the conflict group for a test.
// Conflicts are only checked within the same conflict group.
func getTestConflictGroup(test *testCase) string {
return "default"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to get replaced from the content in the spec right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spec doesn't define group. Group is designed to support mode.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm not quite understanding what this function does. Isolation has two fields relevant, Conflicts and Mode. I think we agreed to only support mode=exec for now, but shouldn't we get the Conflicts out of the Isolation struct?

Why does this always return default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to have a framework to support mode in the future. So conflict is only checked within a conflictGroup. Right now all tests belong to default group and therefore work like mode=exec. But just in case another mode is needed, more conflictGroup will be created for that purpose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to call it mode here instead of conflict group, and use the name we're implementing ("exec")? It's not clear to me "group" is linked to the "mode"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mode is not clear to indicate a grouping mechanism. But conflictGroup is clear about its functionality of grouping. But configGroup can be used to implement mode. So I think conflictGroup is still better.

}

// TestScheduler defines the interface for test scheduling
// Different implementations can provide various scheduling strategies
type TestScheduler interface {
// GetNextTestToRun blocks until a test is available, then returns it.
// Returns nil when all tests have been distributed (queue is empty) or context is cancelled.
// When a test is returned, it is atomically removed from queue and marked as running.
// This method can be safely called from multiple goroutines concurrently.
GetNextTestToRun(ctx context.Context) *testCase

// MarkTestComplete marks a test as complete, cleaning up its conflicts and taints.
// This may unblock other tests that were waiting.
// This method can be safely called from multiple goroutines concurrently.
MarkTestComplete(test *testCase)
}

// testScheduler manages test scheduling based on conflicts, taints, and tolerations
// It maintains an ordered queue of tests and provides thread-safe scheduling operations
type testScheduler struct {
mu sync.Mutex
cond *sync.Cond // condition variable to signal when tests complete
tests []*testCase // ordered queue of tests to execute
runningConflicts map[string]sets.Set[string] // tracks which conflicts are running per group: group -> set of conflicts
activeTaints map[string]int // tracks how many tests are currently applying each taint
}

// newTestScheduler creates a test scheduler. Potentially this can order the
// tests in any order and schedule tests based on resulted order.
func newTestScheduler(tests []*testCase) TestScheduler {
ts := &testScheduler{
tests: tests,
runningConflicts: make(map[string]sets.Set[string]),
activeTaints: make(map[string]int),
}
ts.cond = sync.NewCond(&ts.mu)
return ts
}

// GetNextTestToRun blocks until a test is available to run, or returns nil if all tests have been distributed
// or the context is cancelled. It continuously scans the queue and waits for state changes when no tests are runnable.
// When a test is returned, it is atomically removed from queue and marked as running.
func (ts *testScheduler) GetNextTestToRun(ctx context.Context) *testCase {
ts.mu.Lock()
defer ts.mu.Unlock()

for {
// Check if context is cancelled
if ctx.Err() != nil {
return nil
}

// Check if all tests have been distributed
if len(ts.tests) == 0 {
return nil
}

// Scan from beginning to find first runnable test
for i, test := range ts.tests {
conflictGroup := getTestConflictGroup(test)

// Ensure the conflict group set exists
if ts.runningConflicts[conflictGroup] == nil {
ts.runningConflicts[conflictGroup] = sets.New[string]()
}

// Check if any of the test's conflicts are currently running within its group
hasConflict := false
for _, conflict := range test.isolation.Conflict {
if ts.runningConflicts[conflictGroup].Has(conflict) {
hasConflict = true
break
}
}

// Check if test can tolerate all currently active taints
canTolerate := ts.canTolerateTaints(test)

if !hasConflict && canTolerate {
// Found a runnable test - ATOMICALLY:
// 1. Mark conflicts as running
for _, conflict := range test.isolation.Conflict {
ts.runningConflicts[conflictGroup].Insert(conflict)
}

// 2. Activate taints
for _, taint := range test.isolation.Taint {
ts.activeTaints[taint]++
}

// 3. Remove test from queue
ts.tests = append(ts.tests[:i], ts.tests[i+1:]...)

// 4. Return the test (now safe to run)
return test
}
}

// No runnable test found, but tests still exist in queue - wait for state change
ts.cond.Wait()
}
}

// canTolerateTaints checks if a test can tolerate all currently active taints
func (ts *testScheduler) canTolerateTaints(test *testCase) bool {
// Check if test tolerates all active taints
for taint, count := range ts.activeTaints {
// Skip taints with zero count (should be cleaned up but being defensive)
if count <= 0 {
continue
}

tolerated := false
for _, toleration := range test.isolation.Toleration {
if toleration == taint {
tolerated = true
break
}
}
if !tolerated {
return false // Test cannot tolerate this active taint
}
}
return true
}

// MarkTestComplete marks all conflicts and taints of a test as no longer running/active
// and signals waiting workers that blocked tests may now be runnable
// This should be called after a test completes execution
func (ts *testScheduler) MarkTestComplete(test *testCase) {
ts.mu.Lock()
defer ts.mu.Unlock()

// Get the conflict group for this test
conflictGroup := getTestConflictGroup(test)

// Clean up conflicts within this group
if groupConflicts, exists := ts.runningConflicts[conflictGroup]; exists {
for _, conflict := range test.isolation.Conflict {
groupConflicts.Delete(conflict)
}
}

// Clean up taints with reference counting
for _, taint := range test.isolation.Taint {
ts.activeTaints[taint]--
if ts.activeTaints[taint] <= 0 {
delete(ts.activeTaints, taint)
}
}

// Signal waiting workers that the state has changed
// Some blocked tests might now be runnable
ts.cond.Broadcast()
}

func newParallelTestQueue(commandContext *commandContext) *parallelByFileTestQueue {
return &parallelByFileTestQueue{
Expand Down Expand Up @@ -54,35 +214,24 @@ func abortOnFailure(parentContext context.Context) (testAbortFunc, context.Conte
}, testCtx
}

// queueAllTests writes all the tests to the channel and closes it when all are finished
// even with buffering, this can take a while since we don't infinitely buffer.
func queueAllTests(remainingParallelTests chan *testCase, tests []*testCase) {
for i := range tests {
curr := tests[i]
remainingParallelTests <- curr
}

close(remainingParallelTests)
}

// runTestsUntilChannelEmpty reads from the channel to consume tests, run them, and return when the channel is closed.
func runTestsUntilChannelEmpty(ctx context.Context, remainingParallelTests chan *testCase, testSuiteRunner testSuiteRunner) {
// runTestsUntilDone continuously gets tests from the scheduler, runs them, and marks them complete.
// GetNextTestToRun() blocks internally when no tests are runnable and returns nil when all tests are distributed
// or context is cancelled. Returns when there are no more tests to take from the queue or context is cancelled.
func runTestsUntilDone(ctx context.Context, scheduler TestScheduler, testSuiteRunner testSuiteRunner) {
for {
select {
// if the context is finished, simply return
case <-ctx.Done():
return
// Get next test - this blocks until a test is available, queue is empty, or context is cancelled
test := scheduler.GetNextTestToRun(ctx)

case test, ok := <-remainingParallelTests:
if !ok { // channel closed, then we're done
return
}
// if the context is finished, simply return
if ctx.Err() != nil {
return
}
testSuiteRunner.RunOneTest(ctx, test)
if test == nil {
// No more tests to take from queue or context cancelled
return
}

// Run the test
testSuiteRunner.RunOneTest(ctx, test)

// Mark test as complete (clean up conflicts/taints and signal waiting workers)
scheduler.MarkTestComplete(test)
}
}

Expand All @@ -105,21 +254,30 @@ func execute(ctx context.Context, testSuiteRunner testSuiteRunner, tests []*test
return
}

// Split tests into two categories: serial and parallel (including isolated)
serial, parallel := splitTests(tests, isSerialTest)

remainingParallelTests := make(chan *testCase, 100)
go queueAllTests(remainingParallelTests, parallel)
if len(parallel) > 0 {
// Create test scheduler with all parallel tests
// TestScheduler encapsulates the queue and scheduling logic
var scheduler TestScheduler = newTestScheduler(parallel)

var wg sync.WaitGroup

// Run all non-serial tests with conflict-aware workers
// Each worker polls the scheduler for the next runnable test in order
for i := 0; i < parallelism; i++ {
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
runTestsUntilDone(ctx, scheduler, testSuiteRunner)
}(ctx)
}

var wg sync.WaitGroup
for i := 0; i < parallelism; i++ {
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
runTestsUntilChannelEmpty(ctx, remainingParallelTests, testSuiteRunner)
}(ctx)
wg.Wait()
}
wg.Wait()

// Run serial tests sequentially at the end
for _, test := range serial {
if ctx.Err() != nil {
return
Expand Down
Loading