Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/stretchr/testify v1.11.1
golang.org/x/crypto v0.48.0
golang.org/x/net v0.51.0
golang.org/x/sync v0.19.0
gorm.io/driver/mysql v1.6.0
gorm.io/driver/postgres v1.6.0
gorm.io/gorm v1.31.1
Expand Down Expand Up @@ -118,7 +119,6 @@ require (
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/arch v0.22.0 // indirect
golang.org/x/oauth2 v0.34.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/term v0.40.0 // indirect
golang.org/x/text v0.34.0 // indirect
Expand Down
54 changes: 42 additions & 12 deletions pkg/handlers/search_handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package handlers

import (
"context"
"fmt"
"log"
"net/http"
"sort"
"strconv"
Expand All @@ -14,6 +16,7 @@ import (
"github.com/zxh326/kite/pkg/handlers/resources"
"github.com/zxh326/kite/pkg/middleware"
"github.com/zxh326/kite/pkg/utils"
"golang.org/x/sync/errgroup"
)

type SearchHandler struct {
Expand Down Expand Up @@ -54,21 +57,53 @@ func (h *SearchHandler) createCacheKey(clusterName, query string, limit int) str
func (h *SearchHandler) Search(c *gin.Context, query string, limit int) ([]common.SearchResult, error) {
query = normalizeSearchQuery(query)
limit = normalizeSearchLimit(limit)
var allResults []common.SearchResult

// Search in different resource types
// Determine which resource types to search
searchFuncs := resources.SearchFuncs
guessSearchResources, q := utils.GuessSearchResources(query)

// Collect the search functions to execute
type searchEntry struct {
name string
fn func(*gin.Context, string, int64) ([]common.SearchResult, error)
}
var entries []searchEntry
for name, searchFunc := range searchFuncs {
if guessSearchResources == "all" || name == guessSearchResources {
results, err := searchFunc(c, q, int64(limit))
if err != nil {
continue
}
allResults = append(allResults, results...)
entries = append(entries, searchEntry{name: name, fn: searchFunc})
}
}

// Execute searches in parallel using errgroup
resultSlices := make([][]common.SearchResult, len(entries))
g, _ := errgroup.WithContext(context.Background())

for i, entry := range entries {
g.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("search: resource %q panicked: %v", entry.name, r)
err = nil
}
}()
results, searchErr := entry.fn(c, q, int64(limit))
if searchErr != nil {
log.Printf("search: resource %q failed: %v", entry.name, searchErr)
return nil
}
resultSlices[i] = results
return nil
})
}

_ = g.Wait() // all goroutines return nil, error is always nil

// Merge results from all resource types
var allResults []common.SearchResult
for _, slice := range resultSlices {
allResults = append(allResults, slice...)
}

queryLower := strings.ToLower(q)
sortResults(allResults, queryLower)

Expand Down Expand Up @@ -104,11 +139,6 @@ func (h *SearchHandler) GlobalSearch(c *gin.Context) {
Results: cachedResults,
Total: len(cachedResults),
}
copiedCtx := c.Copy()
go func() {
// Perform search in the background to update cache
_, _ = h.Search(copiedCtx, query, limit)
}()
c.JSON(http.StatusOK, response)
return
}
Expand Down
153 changes: 150 additions & 3 deletions pkg/handlers/search_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/zxh326/kite/pkg/common"
Expand Down Expand Up @@ -109,7 +111,7 @@ func TestGlobalSearchCacheKeyIncludesClusterAndLimit(t *testing.T) {

handler := NewSearchHandler()

ctx := newSearchContext(t, "cluster-a", "/search")
ctx := newSearchContext(t, "cluster-a")
if _, err := handler.Search(ctx, "po target", 1); err != nil {
t.Fatalf("Search returned error: %v", err)
}
Expand All @@ -128,13 +130,13 @@ func TestGlobalSearchCacheKeyIncludesClusterAndLimit(t *testing.T) {
}
}

func newSearchContext(t *testing.T, clusterName, target string) *gin.Context {
func newSearchContext(t *testing.T, clusterName string) *gin.Context {
t.Helper()

gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
ctx, _ := gin.CreateTestContext(rec)
ctx.Request = httptest.NewRequest(http.MethodGet, target, nil)
ctx.Request = httptest.NewRequest(http.MethodGet, "/search", nil)
if clusterName != "" {
ctx.Set(middleware.ClusterNameKey, clusterName)
}
Expand Down Expand Up @@ -163,3 +165,148 @@ func performGlobalSearch(t *testing.T, handler *SearchHandler, clusterName, targ
}
return resp
}

// TestSearchParallelExecution verifies that multiple resource searches run concurrently.
func TestSearchParallelExecution(t *testing.T) {
oldSearchFuncs := resources.SearchFuncs

// Track concurrent execution: each func sleeps and records max concurrency.
var running atomic.Int32
var maxConcurrent atomic.Int32

slowSearch := func(results []common.SearchResult) func(*gin.Context, string, int64) ([]common.SearchResult, error) {
return func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) {
cur := running.Add(1)
// Update max concurrency seen
for {
old := maxConcurrent.Load()
if cur <= old || maxConcurrent.CompareAndSwap(old, cur) {
break
}
}
time.Sleep(50 * time.Millisecond)
running.Add(-1)
return results, nil
}
}

resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){
"pods": slowSearch([]common.SearchResult{{Name: "nginx", ResourceType: "pods"}}),
"services": slowSearch([]common.SearchResult{{Name: "nginx-svc", ResourceType: "services"}}),
"deployments": slowSearch([]common.SearchResult{{Name: "nginx-deploy", ResourceType: "deployments"}}),
}
t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs })

handler := NewSearchHandler()
ctx := newSearchContext(t, "test-cluster")

start := time.Now()
results, err := handler.Search(ctx, "nginx", 50)
elapsed := time.Since(start)

if err != nil {
t.Fatalf("Search returned error: %v", err)
}

// With 3 funcs sleeping 50ms each, sequential would take >= 150ms.
// Parallel should complete in ~50-80ms. Allow generous margin.
if elapsed >= 140*time.Millisecond {
t.Errorf("Search took %v, expected < 140ms for parallel execution", elapsed)
}

if maxConcurrent.Load() < 2 {
t.Errorf("maxConcurrent = %d, want >= 2 (proves parallelism)", maxConcurrent.Load())
}

if len(results) != 3 {
t.Fatalf("expected 3 results, got %d", len(results))
}
}

// TestSearchPartialFailure ensures that one failing resource type doesn't break others.
func TestSearchPartialFailure(t *testing.T) {
oldSearchFuncs := resources.SearchFuncs
resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){
"pods": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs
return []common.SearchResult{{Name: "ok-pod", ResourceType: "pods"}}, nil
},
"services": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) {
return nil, fmt.Errorf("simulated API server error")
},
"deployments": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs
return []common.SearchResult{{Name: "ok-deploy", ResourceType: "deployments"}}, nil
},
}
t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs })

handler := NewSearchHandler()
ctx := newSearchContext(t, "test-cluster")

results, err := handler.Search(ctx, "ok", 50)
if err != nil {
t.Fatalf("Search returned error: %v", err)
}

// Should have results from pods + deployments (services failed gracefully)
if len(results) != 2 {
t.Fatalf("expected 2 results (failed resource skipped), got %d: %+v", len(results), results)
}
}

// TestGlobalSearchCacheDoesNotTriggerBackgroundRefresh validates Solution E:
// a cache hit should NOT invoke Search again (no background goroutine).
func TestGlobalSearchCacheDoesNotTriggerBackgroundRefresh(t *testing.T) {
var searchCallCount atomic.Int32

oldSearchFuncs := resources.SearchFuncs
resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){
"pods": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs
searchCallCount.Add(1)
return []common.SearchResult{{Name: "nginx", ResourceType: "pods"}}, nil
},
}
t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs })

handler := NewSearchHandler()

// First call: populates the cache
resp := performGlobalSearch(t, handler, "test-cluster", "/search?q=nginx&limit=50")
if resp.Total != 1 {
t.Fatalf("first call: expected 1 result, got %d", resp.Total)
}

callsAfterFirst := searchCallCount.Load()

// Second call: should serve from cache WITHOUT launching background search
resp = performGlobalSearch(t, handler, "test-cluster", "/search?q=nginx&limit=50")
if resp.Total != 1 {
t.Fatalf("second call: expected 1 result, got %d", resp.Total)
}

// Give any hypothetical background goroutine time to execute
time.Sleep(100 * time.Millisecond)

callsAfterSecond := searchCallCount.Load()
if callsAfterSecond != callsAfterFirst {
t.Fatalf("cache hit triggered %d extra Search calls (background refresh not removed)",
callsAfterSecond-callsAfterFirst)
}
}

// TestSearchEmptyResourceFuncs verifies Search handles zero searchable types gracefully.
func TestSearchEmptyResourceFuncs(t *testing.T) {
oldSearchFuncs := resources.SearchFuncs
resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){}
t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs })

handler := NewSearchHandler()
ctx := newSearchContext(t, "test-cluster")

results, err := handler.Search(ctx, "anything", 50)
if err != nil {
t.Fatalf("Search returned error: %v", err)
}
if len(results) != 0 {
t.Fatalf("expected 0 results with no search funcs, got %d", len(results))
}
}
Loading