diff --git a/catalogd/cmd/catalogd/main.go b/catalogd/cmd/catalogd/main.go index 8ab76aa32..bcabfa644 100644 --- a/catalogd/cmd/catalogd/main.go +++ b/catalogd/cmd/catalogd/main.go @@ -302,9 +302,13 @@ func main() { os.Exit(1) } - localStorage = storage.LocalDirV1{RootDir: storeDir, RootURL: baseStorageURL} + localStorage = &storage.LocalDirV1{ + RootDir: storeDir, + RootURL: baseStorageURL, + EnableQueryHandler: features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler), + } - // Config for the the catalogd web server + // Config for the catalogd web server catalogServerConfig := serverutil.CatalogServerConfig{ ExternalAddr: externalAddr, CatalogAddr: catalogServerAddr, diff --git a/catalogd/internal/features/features.go b/catalogd/internal/features/features.go index 8f67b1689..1ab490854 100644 --- a/catalogd/internal/features/features.go +++ b/catalogd/internal/features/features.go @@ -5,7 +5,13 @@ import ( "k8s.io/component-base/featuregate" ) -var catalogdFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{} +const ( + APIV1QueryHandler = featuregate.Feature("APIV1QueryHandler") +) + +var catalogdFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ + APIV1QueryHandler: {Default: false, PreRelease: featuregate.Alpha}, +} var CatalogdFeatureGate featuregate.MutableFeatureGate = featuregate.NewFeatureGate() diff --git a/catalogd/internal/serverutil/serverutil.go b/catalogd/internal/serverutil/serverutil.go index 1dcaa9282..2d84b46d1 100644 --- a/catalogd/internal/serverutil/serverutil.go +++ b/catalogd/internal/serverutil/serverutil.go @@ -10,6 +10,7 @@ import ( "github.com/go-logr/logr" "github.com/gorilla/handlers" + "github.com/klauspost/compress/gzhttp" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -42,17 +43,12 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil } shutdownTimeout := 30 * time.Second - - l := mgr.GetLogger().WithName("catalogd-http-server") - handler := catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler()) - handler = logrLoggingHandler(l, handler) - catalogServer := manager.Server{ Name: "catalogs", OnlyServeWhenLeader: true, Server: &http.Server{ Addr: cfg.CatalogAddr, - Handler: handler, + Handler: storageServerHandlerWrapped(mgr.GetLogger().WithName("catalogd-http-server"), cfg), ReadTimeout: 5 * time.Second, // TODO: Revert this to 10 seconds if/when the API // evolves to have significantly smaller responses @@ -97,3 +93,12 @@ func logrLoggingHandler(l logr.Logger, handler http.Handler) http.Handler { l.Info("handled request", "host", host, "username", username, "method", params.Request.Method, "uri", uri, "protocol", params.Request.Proto, "status", params.StatusCode, "size", params.Size) }) } + +func storageServerHandlerWrapped(l logr.Logger, cfg CatalogServerConfig) http.Handler { + handler := cfg.LocalStorage.StorageServerHandler() + handler = gzhttp.GzipHandler(handler) + handler = catalogdmetrics.AddMetricsToHandler(handler) + + handler = logrLoggingHandler(l, handler) + return handler +} diff --git a/catalogd/internal/serverutil/serverutil_test.go b/catalogd/internal/serverutil/serverutil_test.go new file mode 100644 index 000000000..183bf97f1 --- /dev/null +++ b/catalogd/internal/serverutil/serverutil_test.go @@ -0,0 +1,128 @@ +package serverutil + +import ( + "compress/gzip" + "context" + "io" + "io/fs" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/require" +) + +func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { + var generatedJSON = func(size int) string { + return "{\"data\":\"" + strings.Repeat("test data ", size) + "\"}" + } + tests := []struct { + name string + acceptEncoding string + responseContent string + expectCompressed bool + expectedStatus int + }{ + { + name: "compresses large response when client accepts gzip", + acceptEncoding: "gzip", + responseContent: generatedJSON(1000), + expectCompressed: true, + expectedStatus: http.StatusOK, + }, + { + name: "does not compress small response even when client accepts gzip", + acceptEncoding: "gzip", + responseContent: `{"foo":"bar"}`, + expectCompressed: false, + expectedStatus: http.StatusOK, + }, + { + name: "does not compress when client doesn't accept gzip", + acceptEncoding: "", + responseContent: generatedJSON(1000), + expectCompressed: false, + expectedStatus: http.StatusOK, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a mock storage instance that returns our test content + mockStorage := &mockStorageInstance{ + content: tt.responseContent, + } + + cfg := CatalogServerConfig{ + LocalStorage: mockStorage, + } + handler := storageServerHandlerWrapped(logr.Logger{}, cfg) + + // Create test request + req := httptest.NewRequest("GET", "/test", nil) + if tt.acceptEncoding != "" { + req.Header.Set("Accept-Encoding", tt.acceptEncoding) + } + + // Create response recorder + rec := httptest.NewRecorder() + + // Handle the request + handler.ServeHTTP(rec, req) + + // Check status code + require.Equal(t, tt.expectedStatus, rec.Code) + + // Check if response was compressed + wasCompressed := rec.Header().Get("Content-Encoding") == "gzip" + require.Equal(t, tt.expectCompressed, wasCompressed) + + // Get the response body + var responseBody []byte + if wasCompressed { + // Decompress the response + gzipReader, err := gzip.NewReader(rec.Body) + require.NoError(t, err) + responseBody, err = io.ReadAll(gzipReader) + require.NoError(t, err) + require.NoError(t, gzipReader.Close()) + } else { + responseBody = rec.Body.Bytes() + } + + // Verify the response content + require.Equal(t, tt.responseContent, string(responseBody)) + }) + } +} + +// mockStorageInstance implements storage.Instance interface for testing +type mockStorageInstance struct { + content string +} + +func (m *mockStorageInstance) StorageServerHandler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte(m.content)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + }) +} + +func (m *mockStorageInstance) Store(ctx context.Context, catalogName string, fs fs.FS) error { + return nil +} + +func (m *mockStorageInstance) Delete(catalogName string) error { + return nil +} + +func (m *mockStorageInstance) ContentExists(catalog string) bool { + return true +} +func (m *mockStorageInstance) BaseURL(catalog string) string { + return "" +} diff --git a/catalogd/internal/storage/http_precoditions_check.go b/catalogd/internal/storage/http_precoditions_check.go new file mode 100644 index 000000000..c4ee083ed --- /dev/null +++ b/catalogd/internal/storage/http_precoditions_check.go @@ -0,0 +1,226 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// +// Source: Originally from Go's net/http/fs.go +// https://cs.opensource.google/go/go/+/master:src/net/http/fs.go + +package storage + +import ( + "net/http" + "net/textproto" + "strings" + "time" +) + +type condResult int + +const ( + condNone condResult = iota + condTrue + condFalse +) + +// checkPreconditions evaluates request preconditions and reports whether a precondition +// resulted in sending StatusNotModified or StatusPreconditionFailed. +func checkPreconditions(w http.ResponseWriter, r *http.Request, modtime time.Time) bool { + // This function carefully follows RFC 7232 section 6. + ch := checkIfMatch(r) + if ch == condNone { + ch = checkIfUnmodifiedSince(r, modtime) + } + if ch == condFalse { + w.WriteHeader(http.StatusPreconditionFailed) + return true + } + switch checkIfNoneMatch(r) { + case condFalse: + if r.Method == "GET" || r.Method == "HEAD" { + writeNotModified(w) + return true + } else { + w.WriteHeader(http.StatusPreconditionFailed) + return true + } + case condNone: + if checkIfModifiedSince(r, w, modtime) == condFalse { + writeNotModified(w) + return true + } + } + return false +} + +func checkIfModifiedSince(r *http.Request, w http.ResponseWriter, modtime time.Time) condResult { + ims := r.Header.Get("If-Modified-Since") + if ims == "" || isZeroTime(modtime) { + return condTrue + } + t, err := ParseTime(ims) + if err != nil { + httpError(w, err) + return condNone + } + // The Last-Modified header truncates sub-second precision so + // the modtime needs to be truncated too. + modtime = modtime.Truncate(time.Second) + if modtime.Compare(t) <= 0 { + return condFalse + } + return condTrue +} + +func checkIfUnmodifiedSince(r *http.Request, modtime time.Time) condResult { + ius := r.Header.Get("If-Unmodified-Since") + if ius == "" || isZeroTime(modtime) { + return condNone + } + t, err := ParseTime(ius) + if err != nil { + return condNone + } + + // The Last-Modified header truncates sub-second precision so + // the modtime needs to be truncated too. + modtime = modtime.Truncate(time.Second) + if ret := modtime.Compare(t); ret <= 0 { + return condTrue + } + return condFalse +} + +// TimeFormat is the time format to use when generating times in HTTP +// headers. It is like [time.RFC1123] but hard-codes GMT as the time +// zone. The time being formatted must be in UTC for Format to +// generate the correct format. +// +// For parsing this time format, see [ParseTime]. +const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT" + +var ( + unixEpochTime = time.Unix(0, 0) + timeFormats = []string{ + TimeFormat, + time.RFC850, + time.ANSIC, + } +) + +// isZeroTime reports whether t is obviously unspecified (either zero or Unix()=0). +func isZeroTime(t time.Time) bool { + return t.IsZero() || t.Equal(unixEpochTime) +} + +func writeNotModified(w http.ResponseWriter) { + // RFC 7232 section 4.1: + // a sender SHOULD NOT generate representation metadata other than the + // above listed fields unless said metadata exists for the purpose of + // guiding cache updates (e.g., Last-Modified might be useful if the + // response does not have an ETag field). + h := w.Header() + delete(h, "Content-Type") + delete(h, "Content-Length") + delete(h, "Content-Encoding") + if h.Get("Etag") != "" { + delete(h, "Last-Modified") + } + w.WriteHeader(http.StatusNotModified) +} + +func checkIfNoneMatch(r *http.Request) condResult { + inm := r.Header.Get("If-None-Match") + if inm == "" { + return condNone + } + buf := inm + for { + buf = textproto.TrimString(buf) + if len(buf) == 0 { + break + } + if buf[0] == ',' { + buf = buf[1:] + continue + } + if buf[0] == '*' { + return condFalse + } + etag, remain := scanETag(buf) + if etag == "" { + break + } + buf = remain + } + return condTrue +} + +// ParseTime parses a time header (such as the Date: header), +// trying each of the three formats allowed by HTTP/1.1: +// [TimeFormat], [time.RFC850], and [time.ANSIC]. +// nolint:nonamedreturns +func ParseTime(text string) (t time.Time, err error) { + for _, layout := range timeFormats { + t, err = time.Parse(layout, text) + if err == nil { + return + } + } + return +} + +func checkIfMatch(r *http.Request) condResult { + im := r.Header.Get("If-Match") + if im == "" { + return condNone + } + for { + im = textproto.TrimString(im) + if len(im) == 0 { + break + } + if im[0] == ',' { + im = im[1:] + continue + } + if im[0] == '*' { + return condTrue + } + etag, remain := scanETag(im) + if etag == "" { + break + } + im = remain + } + + return condFalse +} + +// scanETag determines if a syntactically valid ETag is present at s. If so, +// the ETag and remaining text after consuming ETag is returned. Otherwise, +// it returns "", "". +// nolint:nonamedreturns +func scanETag(s string) (etag string, remain string) { + s = textproto.TrimString(s) + start := 0 + if strings.HasPrefix(s, "W/") { + start = 2 + } + if len(s[start:]) < 2 || s[start] != '"' { + return "", "" + } + // ETag is either W/"text" or "text". + // See RFC 7232 2.3. + for i := start + 1; i < len(s); i++ { + c := s[i] + switch { + // Character values allowed in ETags. + case c == 0x21 || c >= 0x23 && c <= 0x7E || c >= 0x80: + case c == '"': + return s[:i+1], s[i+1:] + default: + return "", "" + } + } + return "", "" +} diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go new file mode 100644 index 000000000..c40ac4b3e --- /dev/null +++ b/catalogd/internal/storage/index.go @@ -0,0 +1,133 @@ +package storage + +import ( + "cmp" + "encoding/json" + "fmt" + "io" + "slices" + + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/operator-framework/operator-registry/alpha/declcfg" +) + +// index is an index of sections of an FBC file used to lookup FBC blobs that +// match any combination of their schema, package, and name fields. + +// This index strikes a balance between space and performance. It indexes each field +// separately, and performs logical set intersections at lookup time in order to implement +// a multi-parameter query. +// +// Note: it is permissible to change the indexing algorithm later if it is necessary to +// tune the space / performance tradeoff. However care should be taken to ensure +// that the actual content returned by the index remains identical, as users of the index +// may be sensitive to differences introduced by index algorithm changes (e.g. if the +// order of the returned sections changes). +type index struct { + BySchema map[string][]section `json:"by_schema"` + ByPackage map[string][]section `json:"by_package"` + ByName map[string][]section `json:"by_name"` +} + +// A section is the byte offset and length of an FBC blob within the file. +type section struct { + offset int64 + length int64 +} + +func (s *section) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`[%d,%d]`, s.offset, s.length)), nil +} + +func (s *section) UnmarshalJSON(b []byte) error { + vals := [2]int64{} + if err := json.Unmarshal(b, &vals); err != nil { + return err + } + s.offset = vals[0] + s.length = vals[1] + return nil +} + +func (i index) Size() int64 { + size := 0 + for k, v := range i.BySchema { + size += len(k) + len(v)*16 + } + for k, v := range i.ByPackage { + size += len(k) + len(v)*16 + } + for k, v := range i.ByName { + size += len(k) + len(v)*16 + } + return int64(size) +} + +func (i index) Get(r io.ReaderAt, schema, packageName, name string) io.Reader { + sectionSet := i.getSectionSet(schema, packageName, name) + + sections := sectionSet.UnsortedList() + slices.SortFunc(sections, func(a, b section) int { + return cmp.Compare(a.offset, b.offset) + }) + + srs := make([]io.Reader, 0, len(sections)) + for _, s := range sections { + sr := io.NewSectionReader(r, s.offset, s.length) + srs = append(srs, sr) + } + return io.MultiReader(srs...) +} + +func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section] { + // Initialize with all sections if no schema specified, otherwise use schema sections + sectionSet := sets.New[section]() + if schema == "" { + for _, s := range i.BySchema { + sectionSet.Insert(s...) + } + } else { + sectionSet = sets.New[section](i.BySchema[schema]...) + } + + // Filter by package name if specified + if packageName != "" { + packageSections := sets.New[section](i.ByPackage[packageName]...) + sectionSet = sectionSet.Intersection(packageSections) + } + + // Filter by name if specified + if name != "" { + nameSections := sets.New[section](i.ByName[name]...) + sectionSet = sectionSet.Intersection(nameSections) + } + + return sectionSet +} + +func newIndex(metasChan <-chan *declcfg.Meta) *index { + idx := &index{ + BySchema: make(map[string][]section), + ByPackage: make(map[string][]section), + ByName: make(map[string][]section), + } + offset := int64(0) + for meta := range metasChan { + start := offset + length := int64(len(meta.Blob)) + offset += length + + s := section{offset: start, length: length} + if meta.Schema != "" { + idx.BySchema[meta.Schema] = append(idx.BySchema[meta.Schema], s) + } + if meta.Package != "" { + idx.ByPackage[meta.Package] = append(idx.ByPackage[meta.Package], s) + } + if meta.Name != "" { + idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s) + } + } + return idx +} diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index dd06729ea..b34428940 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -2,113 +2,316 @@ package storage import ( "context" + "encoding/json" + "errors" "fmt" + "io" "io/fs" "net/http" "net/url" "os" "path/filepath" + "sync" - "github.com/klauspost/compress/gzhttp" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/singleflight" + "k8s.io/apimachinery/pkg/util/sets" "github.com/operator-framework/operator-registry/alpha/declcfg" ) // LocalDirV1 is a storage Instance. When Storing a new FBC contained in // fs.FS, the content is first written to a temporary file, after which -// it is copied to its final destination in RootDir/catalogName/. This is -// done so that clients accessing the content stored in RootDir/catalogName have -// atomic view of the content for a catalog. +// it is copied to its final destination in RootDir/.jsonl. This is +// done so that clients accessing the content stored in RootDir/.json1 +// have an atomic view of the content for a catalog. type LocalDirV1 struct { - RootDir string - RootURL *url.URL + RootDir string + RootURL *url.URL + EnableQueryHandler bool + + m sync.RWMutex + // this singleflight Group is used in `getIndex()`` to handle concurrent HTTP requests + // optimally. With the use of this slightflight group, the index is loaded from disk + // once per concurrent group of HTTP requests being handled by the query handler. + // The single flight instance gives us a way to load the index from disk exactly once + // per concurrent group of callers, and then let every concurrent caller have access to + // the loaded index. This avoids lots of unnecessary open/decode/close cycles when concurrent + // requests are being handled, which improves overall performance and decreases response latency. + sf singleflight.Group } -const ( - v1ApiPath = "api/v1" - v1ApiData = "all" +var ( + _ Instance = (*LocalDirV1)(nil) + errInvalidParams = errors.New("invalid parameters") ) -func (s LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { - fbcDir := filepath.Join(s.RootDir, catalog, v1ApiPath) - if err := os.MkdirAll(fbcDir, 0700); err != nil { +func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { + s.m.Lock() + defer s.m.Unlock() + + if err := os.MkdirAll(s.RootDir, 0700); err != nil { return err } - tempFile, err := os.CreateTemp(s.RootDir, fmt.Sprint(catalog)) + tmpCatalogDir, err := os.MkdirTemp(s.RootDir, fmt.Sprintf(".%s-*", catalog)) if err != nil { return err } - defer os.Remove(tempFile.Name()) - if err := declcfg.WalkMetasFS(ctx, fsys, func(path string, meta *declcfg.Meta, err error) error { + defer os.RemoveAll(tmpCatalogDir) + + storeMetaFuncs := []storeMetasFunc{storeCatalogData} + if s.EnableQueryHandler { + storeMetaFuncs = append(storeMetaFuncs, storeIndexData) + } + + eg, egCtx := errgroup.WithContext(ctx) + metaChans := []chan *declcfg.Meta{} + + for range storeMetaFuncs { + metaChans = append(metaChans, make(chan *declcfg.Meta, 1)) + } + for i, f := range storeMetaFuncs { + eg.Go(func() error { + return f(tmpCatalogDir, metaChans[i]) + }) + } + err = declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { if err != nil { return err } - _, err = tempFile.Write(meta.Blob) - return err - }); err != nil { + for _, ch := range metaChans { + select { + case ch <- meta: + case <-egCtx.Done(): + return egCtx.Err() + } + } + return nil + }, declcfg.WithConcurrency(1)) + for _, ch := range metaChans { + close(ch) + } + if err != nil { return fmt.Errorf("error walking FBC root: %w", err) } - fbcFile := filepath.Join(fbcDir, v1ApiData) - return os.Rename(tempFile.Name(), fbcFile) + + if err := eg.Wait(); err != nil { + return err + } + + catalogDir := s.catalogDir(catalog) + return errors.Join( + os.RemoveAll(catalogDir), + os.Rename(tmpCatalogDir, catalogDir), + ) +} + +func (s *LocalDirV1) Delete(catalog string) error { + s.m.Lock() + defer s.m.Unlock() + + return os.RemoveAll(s.catalogDir(catalog)) +} + +func (s *LocalDirV1) ContentExists(catalog string) bool { + s.m.RLock() + defer s.m.RUnlock() + + catalogFileStat, err := os.Stat(catalogFilePath(s.catalogDir(catalog))) + if err != nil { + return false + } + if !catalogFileStat.Mode().IsRegular() { + // path is not valid content + return false + } + + if s.EnableQueryHandler { + indexFileStat, err := os.Stat(catalogIndexFilePath(s.catalogDir(catalog))) + if err != nil { + return false + } + if !indexFileStat.Mode().IsRegular() { + return false + } + } + return true +} + +func (s *LocalDirV1) catalogDir(catalog string) string { + return filepath.Join(s.RootDir, catalog) +} + +func catalogFilePath(catalogDir string) string { + return filepath.Join(catalogDir, "catalog.jsonl") +} + +func catalogIndexFilePath(catalogDir string) string { + return filepath.Join(catalogDir, "index.json") +} + +type storeMetasFunc func(catalogDir string, metaChan <-chan *declcfg.Meta) error + +func storeCatalogData(catalogDir string, metas <-chan *declcfg.Meta) error { + f, err := os.Create(catalogFilePath(catalogDir)) + if err != nil { + return err + } + defer f.Close() + + for m := range metas { + if _, err := f.Write(m.Blob); err != nil { + return err + } + } + return nil } -func (s LocalDirV1) Delete(catalog string) error { - return os.RemoveAll(filepath.Join(s.RootDir, catalog)) +func storeIndexData(catalogDir string, metas <-chan *declcfg.Meta) error { + idx := newIndex(metas) + + f, err := os.Create(catalogIndexFilePath(catalogDir)) + if err != nil { + return err + } + defer f.Close() + + enc := json.NewEncoder(f) + enc.SetEscapeHTML(false) + return enc.Encode(idx) } -func (s LocalDirV1) BaseURL(catalog string) string { +func (s *LocalDirV1) BaseURL(catalog string) string { return s.RootURL.JoinPath(catalog).String() } -func (s LocalDirV1) StorageServerHandler() http.Handler { +func (s *LocalDirV1) StorageServerHandler() http.Handler { mux := http.NewServeMux() - fsHandler := http.FileServer(http.FS(&filesOnlyFilesystem{os.DirFS(s.RootDir)})) - spHandler := http.StripPrefix(s.RootURL.Path, fsHandler) - gzHandler := gzhttp.GzipHandler(spHandler) - typeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mux.HandleFunc(s.RootURL.JoinPath("{catalog}", "api", "v1", "all").Path, s.handleV1All) + if s.EnableQueryHandler { + mux.HandleFunc(s.RootURL.JoinPath("{catalog}", "api", "v1", "query").Path, s.handleV1Query) + } + allowedMethodsHandler := func(next http.Handler, allowedMethods ...string) http.Handler { + allowedMethodSet := sets.New[string](allowedMethods...) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !allowedMethodSet.Has(r.Method) { + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return + } + next.ServeHTTP(w, r) + }) + } + return allowedMethodsHandler(mux, http.MethodGet, http.MethodHead) +} + +func (s *LocalDirV1) handleV1All(w http.ResponseWriter, r *http.Request) { + s.m.RLock() + defer s.m.RUnlock() + + catalog := r.PathValue("catalog") + catalogFile, catalogStat, err := s.catalogData(catalog) + if err != nil { + httpError(w, err) + return + } + w.Header().Add("Content-Type", "application/jsonl") + http.ServeContent(w, r, "", catalogStat.ModTime(), catalogFile) +} + +func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { + s.m.RLock() + defer s.m.RUnlock() + + catalog := r.PathValue("catalog") + catalogFile, catalogStat, err := s.catalogData(catalog) + if err != nil { + httpError(w, err) + return + } + defer catalogFile.Close() + + w.Header().Set("Last-Modified", catalogStat.ModTime().UTC().Format(TimeFormat)) + if checkPreconditions(w, r, catalogStat.ModTime()) { + w.WriteHeader(http.StatusNotModified) + return + } + + schema := r.URL.Query().Get("schema") + pkg := r.URL.Query().Get("package") + name := r.URL.Query().Get("name") + + if schema == "" && pkg == "" && name == "" { + // If no parameters are provided, return the entire catalog (this is the same as /api/v1/all) w.Header().Add("Content-Type", "application/jsonl") - gzHandler.ServeHTTP(w, r) - }) - mux.Handle(s.RootURL.Path, typeHandler) - return mux + http.ServeContent(w, r, "", catalogStat.ModTime(), catalogFile) + return + } + idx, err := s.getIndex(catalog) + if err != nil { + httpError(w, err) + return + } + indexReader := idx.Get(catalogFile, schema, pkg, name) + serveJSONLines(w, r, indexReader) } -func (s LocalDirV1) ContentExists(catalog string) bool { - file, err := os.Stat(filepath.Join(s.RootDir, catalog, v1ApiPath, v1ApiData)) +func (s *LocalDirV1) catalogData(catalog string) (*os.File, os.FileInfo, error) { + catalogFile, err := os.Open(catalogFilePath(s.catalogDir(catalog))) if err != nil { - return false + return nil, nil, err } - if !file.Mode().IsRegular() { - // path is not valid content - return false + catalogFileStat, err := catalogFile.Stat() + if err != nil { + return nil, nil, err } - return true + return catalogFile, catalogFileStat, nil } -// filesOnlyFilesystem is a file system that can open only regular -// files from the underlying filesystem. All other file types result -// in os.ErrNotExists -type filesOnlyFilesystem struct { - FS fs.FS +func httpError(w http.ResponseWriter, err error) { + var code int + switch { + case errors.Is(err, fs.ErrNotExist): + code = http.StatusNotFound + case errors.Is(err, fs.ErrPermission): + code = http.StatusForbidden + case errors.Is(err, errInvalidParams): + code = http.StatusBadRequest + default: + code = http.StatusInternalServerError + } + http.Error(w, fmt.Sprintf("%d %s", code, http.StatusText(code)), code) } -// Open opens a named file from the underlying filesystem. If the file -// is not a regular file, it return os.ErrNotExists. Callers are resposible -// for closing the file returned. -func (f *filesOnlyFilesystem) Open(name string) (fs.File, error) { - file, err := f.FS.Open(name) +func serveJSONLines(w http.ResponseWriter, r *http.Request, rs io.Reader) { + w.Header().Add("Content-Type", "application/jsonl") + // Copy the content of the reader to the response writer + // only if it's a Get request + if r.Method == http.MethodHead { + return + } + _, err := io.Copy(w, rs) if err != nil { - return nil, err + httpError(w, err) + return } - stat, err := file.Stat() +} + +func (s *LocalDirV1) getIndex(catalog string) (*index, error) { + idx, err, _ := s.sf.Do(catalog, func() (interface{}, error) { + indexFile, err := os.Open(catalogIndexFilePath(s.catalogDir(catalog))) + if err != nil { + return nil, err + } + defer indexFile.Close() + var idx index + if err := json.NewDecoder(indexFile).Decode(&idx); err != nil { + return nil, err + } + return &idx, nil + }) if err != nil { - _ = file.Close() return nil, err } - if !stat.Mode().IsRegular() { - _ = file.Close() - return nil, os.ErrNotExist - } - return file, nil + return idx.(*index), nil } diff --git a/catalogd/internal/storage/localdir_test.go b/catalogd/internal/storage/localdir_test.go index c975c8fc9..400d2236e 100644 --- a/catalogd/internal/storage/localdir_test.go +++ b/catalogd/internal/storage/localdir_test.go @@ -1,10 +1,7 @@ package storage import ( - "bytes" - "compress/gzip" "context" - "encoding/json" "errors" "fmt" "io" @@ -13,219 +10,498 @@ import ( "net/http/httptest" "net/url" "os" - "path/filepath" "strings" + "sync" + "testing" "testing/fstest" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "github.com/google/go-cmp/cmp" - "sigs.k8s.io/yaml" - - "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/stretchr/testify/require" ) const urlPrefix = "/catalogs/" -var ctx = context.Background() - -var _ = Describe("LocalDir Storage Test", func() { - var ( - catalog = "test-catalog" - store Instance - rootDir string - baseURL *url.URL - testBundleName = "bundle.v0.0.1" - testBundleImage = "quaydock.io/namespace/bundle:0.0.3" - testBundleRelatedImageName = "test" - testBundleRelatedImageImage = "testimage:latest" - testBundleObjectData = "dW5pbXBvcnRhbnQK" - testPackageDefaultChannel = "preview_test" - testPackageName = "webhook_operator_test" - testChannelName = "preview_test" - testPackage = fmt.Sprintf(testPackageTemplate, testPackageDefaultChannel, testPackageName) - testBundle = fmt.Sprintf(testBundleTemplate, testBundleImage, testBundleName, testPackageName, testBundleRelatedImageName, testBundleRelatedImageImage, testBundleObjectData) - testChannel = fmt.Sprintf(testChannelTemplate, testPackageName, testChannelName, testBundleName) - - unpackResultFS fs.FS - ) - BeforeEach(func() { - d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache") - Expect(err).ToNot(HaveOccurred()) - rootDir = d - - baseURL = &url.URL{Scheme: "http", Host: "test-addr", Path: urlPrefix} - store = LocalDirV1{RootDir: rootDir, RootURL: baseURL} - unpackResultFS = &fstest.MapFS{ - "bundle.yaml": &fstest.MapFile{Data: []byte(testBundle), Mode: os.ModePerm}, - "package.yaml": &fstest.MapFile{Data: []byte(testPackage), Mode: os.ModePerm}, - "channel.yaml": &fstest.MapFile{Data: []byte(testChannel), Mode: os.ModePerm}, - } - }) - When("An unpacked FBC is stored using LocalDir", func() { - BeforeEach(func() { - err := store.Store(context.Background(), catalog, unpackResultFS) - Expect(err).To(Not(HaveOccurred())) - }) - It("should store the content in the RootDir correctly", func() { - fbcDir := filepath.Join(rootDir, catalog, v1ApiPath) - fbcFile := filepath.Join(fbcDir, v1ApiData) - _, err := os.Stat(fbcFile) - Expect(err).To(Not(HaveOccurred())) - - gotConfig, err := declcfg.LoadFS(ctx, unpackResultFS) - Expect(err).To(Not(HaveOccurred())) - storedConfig, err := declcfg.LoadFile(os.DirFS(fbcDir), v1ApiData) - Expect(err).To(Not(HaveOccurred())) - diff := cmp.Diff(gotConfig, storedConfig) - Expect(diff).To(Equal("")) - }) - It("should form the content URL correctly", func() { - Expect(store.BaseURL(catalog)).To(Equal(baseURL.JoinPath(catalog).String())) +func TestLocalDirStoraget(t *testing.T) { + tests := []struct { + name string + setup func(*testing.T) (*LocalDirV1, fs.FS) + test func(*testing.T, *LocalDirV1, fs.FS) + cleanup func(*testing.T, *LocalDirV1) + }{ + { + name: "store and retrieve catalog content", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + s := &LocalDirV1{ + RootDir: t.TempDir(), + RootURL: &url.URL{Scheme: "http", Host: "test-addr", Path: urlPrefix}, + } + return s, createTestFS(t) + }, + test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { + const catalog = "test-catalog" + + // Initially content should not exist + if s.ContentExists(catalog) { + t.Fatal("content should not exist before store") + } + + // Store the content + if err := s.Store(context.Background(), catalog, fsys); err != nil { + t.Fatal(err) + } + + // Verify content exists after store + if !s.ContentExists(catalog) { + t.Fatal("content should exist after store") + } + + // Delete the content + if err := s.Delete(catalog); err != nil { + t.Fatal(err) + } + + // Verify content no longer exists + if s.ContentExists(catalog) { + t.Fatal("content should not exist after delete") + } + }, + }, + { + name: "storing with query handler enabled should create indexes", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + s := &LocalDirV1{ + RootDir: t.TempDir(), + EnableQueryHandler: true, + } + return s, createTestFS(t) + }, + test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { + err := s.Store(context.Background(), "test-catalog", fsys) + if err != nil { + t.Fatal(err) + } + + if !s.ContentExists("test-catalog") { + t.Error("content should exist after store") + } + + // Verify index file was created + indexPath := catalogIndexFilePath(s.catalogDir("test-catalog")) + if _, err := os.Stat(indexPath); err != nil { + t.Errorf("index file should exist: %v", err) + } + }, + }, + { + name: "concurrent reads during write should not cause data race", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + dir := t.TempDir() + s := &LocalDirV1{RootDir: dir} + return s, createTestFS(t) + }, + test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { + const catalog = "test-catalog" + var wg sync.WaitGroup + + // Start multiple concurrent readers + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Add(-1) + for j := 0; j < 100; j++ { + s.ContentExists(catalog) + } + }() + } + + // Write while readers are active + err := s.Store(context.Background(), catalog, fsys) + if err != nil { + t.Fatal(err) + } + + wg.Wait() + }, + }, + { + name: "delete nonexistent catalog", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + return &LocalDirV1{RootDir: t.TempDir()}, nil + }, + test: func(t *testing.T, s *LocalDirV1, _ fs.FS) { + err := s.Delete("nonexistent") + if err != nil { + t.Errorf("expected no error deleting nonexistent catalog, got: %v", err) + } + }, + }, + { + name: "store with invalid permissions", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + dir := t.TempDir() + // Set directory permissions to deny access + if err := os.Chmod(dir, 0000); err != nil { + t.Fatal(err) + } + return &LocalDirV1{RootDir: dir}, createTestFS(t) + }, + test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { + err := s.Store(context.Background(), "test-catalog", fsys) + if !errors.Is(err, fs.ErrPermission) { + t.Errorf("expected permission error, got: %v", err) + } + }, + cleanup: func(t *testing.T, s *LocalDirV1) { + // Restore permissions so cleanup can succeed + require.NoError(t, os.Chmod(s.RootDir, 0700)) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, fsys := tt.setup(t) + tt.test(t, s, fsys) + if tt.cleanup != nil { + tt.cleanup(t, s) + } }) - It("should report content exists", func() { - Expect(store.ContentExists(catalog)).To(BeTrue()) + } +} + +func TestLocalDirServerHandler(t *testing.T) { + store := &LocalDirV1{RootDir: t.TempDir(), RootURL: &url.URL{Path: urlPrefix}} + testFS := fstest.MapFS{ + "meta.json": &fstest.MapFile{ + Data: []byte(`{"foo":"bar"}`), + }, + } + if store.Store(context.Background(), "test-catalog", testFS) != nil { + t.Fatal("failed to store test catalog and start server") + } + testServer := httptest.NewServer(store.StorageServerHandler()) + defer testServer.Close() + + for _, tc := range []struct { + name string + URLPath string + expectedStatusCode int + expectedContent string + }{ + { + name: "Server returns 404 when root URL is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "", + }, + { + name: "Server returns 404 when path '/' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/", + }, + { + name: "Server returns 404 when path '/catalogs/' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/", + }, + { + name: "Server returns 404 when path '/catalogs//' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog/", + }, + { + name: "Server returns 404 when path '/catalogs//api/' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog/api/", + }, + { + name: "Serer return 404 when path '/catalogs//api/v1' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog/api/v1c", + }, + { + name: "Server return 404 when path '/catalogs//non-existent.txt' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog/non-existent.txt", + }, + { + name: "Server returns 404 when path '/catalogs/.jsonl' is queried even if the file exists, since we don't serve the filesystem, and serve an API instead", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog.jsonl", + }, + { + name: "Server returns 404 when non-existent catalog is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 Not Found", + URLPath: "/catalogs/non-existent-catalog/api/v1/all", + }, + { + name: "Server returns 200 when path '/catalogs//api/v1/all' is queried, when catalog exists", + expectedStatusCode: http.StatusOK, + expectedContent: `{"foo":"bar"}`, + URLPath: "/catalogs/test-catalog/api/v1/all", + }, + } { + t.Run(tc.name, func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", testServer.URL, tc.URLPath), nil) + require.NoError(t, err) + req.Header.Set("Accept-Encoding", "gzip") + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + + require.Equal(t, tc.expectedStatusCode, resp.StatusCode) + + var actualContent []byte + actualContent, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedContent, strings.TrimSpace(string(actualContent))) + require.NoError(t, resp.Body.Close()) }) - When("The stored content is deleted", func() { - BeforeEach(func() { - err := store.Delete(catalog) - Expect(err).To(Not(HaveOccurred())) - }) - It("should delete the FBC from the cache directory", func() { - fbcFile := filepath.Join(rootDir, catalog) - _, err := os.Stat(fbcFile) - Expect(err).To(HaveOccurred()) - Expect(os.IsNotExist(err)).To(BeTrue()) - }) - It("should report content does not exist", func() { - Expect(store.ContentExists(catalog)).To(BeFalse()) - }) + } +} + +// Tests to verify the behavior of the query endpoint, as described in +// https://docs.google.com/document/d/1s6_9IFEKGQLNh3ueH7SF4Yrx4PW9NSiNFqFIJx0pU-8/edit?usp=sharing +func TestQueryEndpoint(t *testing.T) { + store := &LocalDirV1{ + RootDir: t.TempDir(), + RootURL: &url.URL{Path: urlPrefix}, + EnableQueryHandler: true, + } + if store.Store(context.Background(), "test-catalog", createTestFS(t)) != nil { + t.Fatal("failed to store test catalog") + } + testServer := httptest.NewServer(store.StorageServerHandler()) + + testCases := []struct { + name string + setupStore func() (*httptest.Server, error) + queryParams string + expectedStatusCode int + expectedContent string + }{ + { + name: "valid query with package schema", + queryParams: "?schema=olm.package", + expectedStatusCode: http.StatusOK, + expectedContent: `{"defaultChannel":"preview_test","name":"webhook_operator_test","schema":"olm.package"}`, + }, + { + name: "valid query with schema and name combination", + queryParams: "?schema=olm.package&name=webhook_operator_test", + expectedStatusCode: http.StatusOK, + expectedContent: `{"defaultChannel":"preview_test","name":"webhook_operator_test","schema":"olm.package"}`, + }, + { + name: "valid query with channel schema and package name combination", + queryParams: "?schema=olm.channel&package=webhook_operator_test", + expectedStatusCode: http.StatusOK, + expectedContent: `{"entries":[{"name":"bundle.v0.0.1"}],"name":"preview_test","package":"webhook_operator_test","schema":"olm.channel"}`, + }, + { + name: "query with all meta fields", + queryParams: "?schema=olm.bundle&package=webhook_operator_test&name=bundle.v0.0.1", + expectedStatusCode: http.StatusOK, + expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"}`, + }, + { + name: "valid query for package schema for a package that does not exist", + queryParams: "?schema=olm.package&name=not-present", + expectedStatusCode: http.StatusOK, + expectedContent: "", + }, + { + name: "valid query with package and name", + queryParams: "?package=webhook_operator_test&name=bundle.v0.0.1", + expectedStatusCode: http.StatusOK, + expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"}`, + }, + { + name: "query with non-existent schema", + queryParams: "?schema=non_existent_schema", + expectedStatusCode: http.StatusOK, + expectedContent: "", + }, + { + name: "cached response with If-Modified-Since", + queryParams: "?schema=olm.package", + expectedStatusCode: http.StatusNotModified, + expectedContent: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/catalogs/test-catalog/api/v1/query%s", testServer.URL, tc.queryParams), nil) + require.NoError(t, err) + + if strings.Contains(tc.name, "If-Modified-Since") { + // Do an initial request to get a Last-Modified timestamp + // for the actual request + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + resp.Body.Close() + req.Header.Set("If-Modified-Since", resp.Header.Get("Last-Modified")) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, tc.expectedStatusCode, resp.StatusCode) + + actualContent, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedContent, strings.TrimSpace(string(actualContent))) }) - }) -}) - -var _ = Describe("LocalDir Server Handler tests", func() { - var ( - testServer *httptest.Server - store LocalDirV1 - ) - BeforeEach(func() { - d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache") - Expect(err).ToNot(HaveOccurred()) - Expect(os.MkdirAll(filepath.Join(d, "test-catalog", v1ApiPath), 0700)).To(Succeed()) - store = LocalDirV1{RootDir: d, RootURL: &url.URL{Path: urlPrefix}} - testServer = httptest.NewServer(store.StorageServerHandler()) - - }) - It("gets 404 for the path /", func() { - expectNotFound(testServer.URL) - }) - It("gets 404 for the path /catalogs/", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/")) - }) - It("gets 404 for the path /catalogs/test-catalog/", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/")) - }) - It("gets 404 for the path /test-catalog/foo.txt", func() { - // This ensures that even if the file exists, the URL must contain the /catalogs/ prefix - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), []byte("bar"), 0600)).To(Succeed()) - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/test-catalog/foo.txt")) - }) - It("gets 404 for the path /catalogs/test-catalog/non-existent.txt", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/non-existent.txt")) - }) - It("gets 200 for the path /catalogs/foo.txt", func() { - expectedContent := []byte("bar") - Expect(os.WriteFile(filepath.Join(store.RootDir, "foo.txt"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/foo.txt"), expectedContent) - }) - It("gets 200 for the path /catalogs/test-catalog/foo.txt", func() { - expectedContent := []byte("bar") - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/foo.txt"), expectedContent) - }) - It("ignores accept-encoding for the path /catalogs/test-catalog/api/v1/all with size < 1400 bytes", func() { - expectedContent := []byte("bar") - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", v1ApiPath, v1ApiData), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent) - }) - It("provides gzipped content for the path /catalogs/test-catalog/api/v1/all with size > 1400 bytes", func() { - expectedContent := []byte(testCompressableJSON) - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", v1ApiPath, v1ApiData), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent) - }) - It("provides json-lines format for the served JSON catalog", func() { - catalog := "test-catalog" - unpackResultFS := &fstest.MapFS{ - "catalog.json": &fstest.MapFile{Data: []byte(testCompressableJSON), Mode: os.ModePerm}, - } - err := store.Store(context.Background(), catalog, unpackResultFS) - Expect(err).To(Not(HaveOccurred())) - - expectedContent, err := generateJSONLines([]byte(testCompressableJSON)) - Expect(err).To(Not(HaveOccurred())) - path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, v1ApiPath, v1ApiData) - Expect(err).To(Not(HaveOccurred())) - expectFound(path, []byte(expectedContent)) - }) - It("provides json-lines format for the served YAML catalog", func() { - catalog := "test-catalog" - yamlData, err := makeYAMLFromConcatenatedJSON([]byte(testCompressableJSON)) - Expect(err).To(Not(HaveOccurred())) - unpackResultFS := &fstest.MapFS{ - "catalog.yaml": &fstest.MapFile{Data: yamlData, Mode: os.ModePerm}, - } - err = store.Store(context.Background(), catalog, unpackResultFS) - Expect(err).To(Not(HaveOccurred())) - - expectedContent, err := generateJSONLines(yamlData) - Expect(err).To(Not(HaveOccurred())) - path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, v1ApiPath, v1ApiData) - Expect(err).To(Not(HaveOccurred())) - expectFound(path, []byte(expectedContent)) - }) - AfterEach(func() { - testServer.Close() - }) -}) - -func expectNotFound(url string) { - resp, err := http.Get(url) //nolint:gosec - Expect(err).To(Not(HaveOccurred())) - Expect(resp.StatusCode).To(Equal(http.StatusNotFound)) - Expect(resp.Body.Close()).To(Succeed()) + } } -func expectFound(url string, expectedContent []byte) { - req, err := http.NewRequest(http.MethodGet, url, nil) - Expect(err).To(Not(HaveOccurred())) - req.Header.Set("Accept-Encoding", "gzip") - resp, err := http.DefaultClient.Do(req) - Expect(err).To(Not(HaveOccurred())) - Expect(resp.StatusCode).To(Equal(http.StatusOK)) - - var actualContent []byte - switch resp.Header.Get("Content-Encoding") { - case "gzip": - Expect(len(expectedContent)).To(BeNumerically(">", 1400), - fmt.Sprintf("gzipped content should only be provided for content larger than 1400 bytes, but our expected content is only %d bytes", len(expectedContent))) - gz, err := gzip.NewReader(resp.Body) - Expect(err).To(Not(HaveOccurred())) - actualContent, err = io.ReadAll(gz) - Expect(err).To(Not(HaveOccurred())) - default: - actualContent, err = io.ReadAll(resp.Body) - Expect(len(expectedContent)).To(BeNumerically("<", 1400), - fmt.Sprintf("plaintext content should only be provided for content smaller than 1400 bytes, but we received plaintext for %d bytes\n expectedContent:\n%s\n", len(expectedContent), expectedContent)) - Expect(err).To(Not(HaveOccurred())) +func TestServerLoadHandling(t *testing.T) { + store := &LocalDirV1{ + RootDir: t.TempDir(), + RootURL: &url.URL{Path: urlPrefix}, + EnableQueryHandler: true, + } + + // Create large test data + largeFS := fstest.MapFS{} + for i := 0; i < 1000; i++ { + largeFS[fmt.Sprintf("meta_%d.json", i)] = &fstest.MapFile{ + Data: []byte(fmt.Sprintf(`{"schema":"olm.bundle","package":"test-op-%d","name":"test-op.v%d.0"}`, i, i)), + } + } + + if err := store.Store(context.Background(), "test-catalog", largeFS); err != nil { + t.Fatal("failed to store test catalog") } - Expect(actualContent).To(Equal(expectedContent)) - Expect(resp.Body.Close()).To(Succeed()) + testServer := httptest.NewServer(store.StorageServerHandler()) + defer testServer.Close() + + tests := []struct { + name string + concurrent int + requests func(baseURL string) []*http.Request + validateFunc func(t *testing.T, responses []*http.Response, errs []error) + }{ + { + name: "concurrent identical queries", + concurrent: 100, + requests: func(baseURL string) []*http.Request { + var reqs []*http.Request + for i := 0; i < 100; i++ { + req, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("%s/catalogs/test-catalog/api/v1/query?schema=olm.bundle", baseURL), + nil) + req.Header.Set("Accept", "application/jsonl") + reqs = append(reqs, req) + } + return reqs + }, + validateFunc: func(t *testing.T, responses []*http.Response, errs []error) { + for _, err := range errs { + require.NoError(t, err) + } + for _, resp := range responses { + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, "application/jsonl", resp.Header.Get("Content-Type")) + resp.Body.Close() + } + }, + }, + { + name: "concurrent different queries", + concurrent: 50, + requests: func(baseURL string) []*http.Request { + var reqs []*http.Request + for i := 0; i < 50; i++ { + req, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("%s/catalogs/test-catalog/api/v1/query?package=test-op-%d", baseURL, i), + nil) + req.Header.Set("Accept", "application/jsonl") + reqs = append(reqs, req) + } + return reqs + }, + validateFunc: func(t *testing.T, responses []*http.Response, errs []error) { + for _, err := range errs { + require.NoError(t, err) + } + for _, resp := range responses { + require.Equal(t, http.StatusOK, resp.StatusCode) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Contains(t, string(body), "test-op-") + resp.Body.Close() + } + }, + }, + { + name: "mixed all and query endpoints", + concurrent: 40, + requests: func(baseURL string) []*http.Request { + var reqs []*http.Request + for i := 0; i < 20; i++ { + allReq, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("%s/catalogs/test-catalog/api/v1/all", baseURL), + nil) + queryReq, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("%s/catalogs/test-catalog/api/v1/query?schema=olm.bundle", baseURL), + nil) + allReq.Header.Set("Accept", "application/jsonl") + queryReq.Header.Set("Accept", "application/jsonl") + reqs = append(reqs, allReq, queryReq) + } + return reqs + }, + validateFunc: func(t *testing.T, responses []*http.Response, errs []error) { + for _, err := range errs { + require.NoError(t, err) + } + for _, resp := range responses { + require.Equal(t, http.StatusOK, resp.StatusCode) + resp.Body.Close() + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ( + wg sync.WaitGroup + responses = make([]*http.Response, tt.concurrent) + errs = make([]error, tt.concurrent) + ) + + requests := tt.requests(testServer.URL) + for i := 0; i < tt.concurrent; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + // nolint:bodyclose + // the response body is closed in the validateFunc + resp, err := http.DefaultClient.Do(requests[idx]) + responses[idx] = resp + errs[idx] = err + }(i) + } + + wg.Wait() + tt.validateFunc(t, responses, errs) + }) + } } -const testBundleTemplate = `--- +func createTestFS(t *testing.T) fs.FS { + t.Helper() + testBundleTemplate := `--- image: %s name: %s schema: olm.bundle @@ -242,197 +518,34 @@ properties: data: arbitrary-info ` -const testPackageTemplate = `--- + testPackageTemplate := `--- defaultChannel: %s name: %s schema: olm.package ` -const testChannelTemplate = `--- + testChannelTemplate := `--- schema: olm.channel package: %s name: %s entries: - name: %s ` + testBundleName := "bundle.v0.0.1" + testBundleImage := "quaydock.io/namespace/bundle:0.0.3" + testBundleRelatedImageName := "test" + testBundleRelatedImageImage := "testimage:latest" + testBundleObjectData := "dW5pbXBvcnRhbnQK" + testPackageDefaultChannel := "preview_test" + testPackageName := "webhook_operator_test" + testChannelName := "preview_test" -// by default the compressor will only trigger for files larger than 1400 bytes -const testCompressableJSON = `{ - "defaultChannel": "stable-v6.x", - "name": "cockroachdb", - "schema": "olm.package" -} -{ - "entries": [ - { - "name": "cockroachdb.v5.0.3" - }, - { - "name": "cockroachdb.v5.0.4", - "replaces": "cockroachdb.v5.0.3" - } - ], - "name": "stable-5.x", - "package": "cockroachdb", - "schema": "olm.channel" -} -{ - "entries": [ - { - "name": "cockroachdb.v6.0.0", - "skipRange": "<6.0.0" - } - ], - "name": "stable-v6.x", - "package": "cockroachdb", - "schema": "olm.channel" -} -{ - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:a5d4f4467250074216eb1ba1c36e06a3ab797d81c431427fc2aca97ecaf4e9d8", - "name": "cockroachdb.v5.0.3", - "package": "cockroachdb", - "properties": [ - { - "type": "olm.gvk", - "value": { - "group": "charts.operatorhub.io", - "kind": "Cockroachdb", - "version": "v1alpha1" - } - }, - { - "type": "olm.package", - "value": { - "packageName": "cockroachdb", - "version": "5.0.3" - } - } - ], - "relatedImages": [ - { - "name": "", - "image": "quay.io/helmoperators/cockroachdb:v5.0.3" - }, - { - "name": "", - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:a5d4f4467250074216eb1ba1c36e06a3ab797d81c431427fc2aca97ecaf4e9d8" - } - ], - "schema": "olm.bundle" -} -{ - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:f42337e7b85a46d83c94694638e2312e10ca16a03542399a65ba783c94a32b63", - "name": "cockroachdb.v5.0.4", - "package": "cockroachdb", - "properties": [ - { - "type": "olm.gvk", - "value": { - "group": "charts.operatorhub.io", - "kind": "Cockroachdb", - "version": "v1alpha1" - } - }, - { - "type": "olm.package", - "value": { - "packageName": "cockroachdb", - "version": "5.0.4" - } - } - ], - "relatedImages": [ - { - "name": "", - "image": "quay.io/helmoperators/cockroachdb:v5.0.4" - }, - { - "name": "", - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:f42337e7b85a46d83c94694638e2312e10ca16a03542399a65ba783c94a32b63" - } - ], - "schema": "olm.bundle" -} -{ - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:d3016b1507515fc7712f9c47fd9082baf9ccb070aaab58ed0ef6e5abdedde8ba", - "name": "cockroachdb.v6.0.0", - "package": "cockroachdb", - "properties": [ - { - "type": "olm.gvk", - "value": { - "group": "charts.operatorhub.io", - "kind": "Cockroachdb", - "version": "v1alpha1" - } - }, - { - "type": "olm.package", - "value": { - "packageName": "cockroachdb", - "version": "6.0.0" - } - } - ], - "relatedImages": [ - { - "name": "", - "image": "quay.io/cockroachdb/cockroach-helm-operator:6.0.0" - }, - { - "name": "", - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:d3016b1507515fc7712f9c47fd9082baf9ccb070aaab58ed0ef6e5abdedde8ba" - } - ], - "schema": "olm.bundle" -} -` - -// makeYAMLFromConcatenatedJSON takes a byte slice of concatenated JSON objects and returns a byte slice of concatenated YAML objects. -func makeYAMLFromConcatenatedJSON(data []byte) ([]byte, error) { - var msg json.RawMessage - var delimiter = []byte("---\n") - var yamlData []byte - - yamlData = append(yamlData, delimiter...) - - dec := json.NewDecoder(bytes.NewReader(data)) - for { - err := dec.Decode(&msg) - if errors.Is(err, io.EOF) { - break - } - y, err := yaml.JSONToYAML(msg) - if err != nil { - return []byte{}, err - } - yamlData = append(yamlData, delimiter...) - yamlData = append(yamlData, y...) + testPackage := fmt.Sprintf(testPackageTemplate, testPackageDefaultChannel, testPackageName) + testBundle := fmt.Sprintf(testBundleTemplate, testBundleImage, testBundleName, testPackageName, testBundleRelatedImageName, testBundleRelatedImageImage, testBundleObjectData) + testChannel := fmt.Sprintf(testChannelTemplate, testPackageName, testChannelName, testBundleName) + return &fstest.MapFS{ + "bundle.yaml": {Data: []byte(testBundle), Mode: os.ModePerm}, + "package.yaml": {Data: []byte(testPackage), Mode: os.ModePerm}, + "channel.yaml": {Data: []byte(testChannel), Mode: os.ModePerm}, } - return yamlData, nil -} - -// generateJSONLines takes a byte slice of concatenated JSON objects and returns a JSONlines-formatted string. -func generateJSONLines(in []byte) (string, error) { - var out strings.Builder - reader := bytes.NewReader(in) - - err := declcfg.WalkMetasReader(reader, func(meta *declcfg.Meta, err error) error { - if err != nil { - return err - } - - if meta != nil && meta.Blob != nil { - if meta.Blob[len(meta.Blob)-1] != '\n' { - return fmt.Errorf("blob does not end with newline") - } - } - - _, err = out.Write(meta.Blob) - if err != nil { - return err - } - return nil - }) - return out.String(), err } diff --git a/catalogd/internal/storage/storage.go b/catalogd/internal/storage/storage.go index 458ff040b..af78a669f 100644 --- a/catalogd/internal/storage/storage.go +++ b/catalogd/internal/storage/storage.go @@ -13,7 +13,8 @@ import ( type Instance interface { Store(ctx context.Context, catalog string, fsys fs.FS) error Delete(catalog string) error + ContentExists(catalog string) bool + BaseURL(catalog string) string StorageServerHandler() http.Handler - ContentExists(catalog string) bool } diff --git a/go.mod b/go.mod index 3a0581966..67a43e9fc 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/spf13/pflag v1.0.6 github.com/stretchr/testify v1.10.0 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c + golang.org/x/sync v0.10.0 gopkg.in/yaml.v2 v2.4.0 helm.sh/helm/v3 v3.17.0 k8s.io/api v0.32.0 @@ -227,7 +228,6 @@ require ( golang.org/x/crypto v0.32.0 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/oauth2 v0.25.0 // indirect - golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/term v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect