Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions internal/test/integration/rpm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package integration
import (
"context"
"math/rand"
"strings"
"testing"

"github.com/content-services/tang/internal/config"
"github.com/content-services/tang/internal/zestwrapper"
"github.com/content-services/tang/pkg/tangy"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -148,6 +150,43 @@ func (r *RpmSuite) TestRpmRepositoryVersionPackageSearch() {
assert.Len(r.T(), search, 0)
}

func getDBConnection(t *testing.T) *pgx.Conn {
dbConfig := config.Get().Database
db := tangy.Database{
Name: dbConfig.Name,
Host: dbConfig.Host,
Port: dbConfig.Port,
User: dbConfig.User,
Password: dbConfig.Password,
}
conn, err := pgx.Connect(context.Background(), db.Url())
require.NoError(t, err)
return conn
}

func (r *RpmSuite) TestRpmRepositoryVersionPackageSearchOldMethod() {
firstVersionHref := &r.firstVersionHref

conn := getDBConnection(r.T())
defer conn.Close(context.Background())

// Update the repository version to use the old method
splitHref := strings.Split(*firstVersionHref, "/")
repoId := splitHref[len(splitHref)-4] // ignore trailing versions//1/
_, err := conn.Exec(context.Background(), "UPDATE core_repositoryversion SET content_ids = null WHERE repository_id = $1", repoId)
require.NoError(r.T(), err)

search, err := r.tangy.RpmRepositoryVersionPackageSearch(context.Background(), []string{*firstVersionHref}, "peng", 100)
assert.NoError(r.T(), err)
assert.Equal(r.T(), search[0].Name, "penguin")
search, err = r.tangy.RpmRepositoryVersionPackageSearch(context.Background(), []string{*firstVersionHref}, "enguin", 100)
assert.NoError(r.T(), err)
assert.Len(r.T(), search, 0)
search, err = r.tangy.RpmRepositoryVersionPackageSearch(context.Background(), []string{*firstVersionHref}, "bea", 100)
assert.NoError(r.T(), err)
assert.Empty(r.T(), search)
}

func (r *RpmSuite) TestRpmRepositoryVersionPackageGroupSearch() {
firstVersionHref := &r.firstVersionHref
secondVersionHref := &r.secondVersionHref
Expand Down
107 changes: 102 additions & 5 deletions pkg/tangy/queries.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package tangy

import (
"context"
"fmt"
"math/rand/v2"
"strings"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

// contentIdsInVersion forms a single query to fetch a list of content ids in a repository version
// contentIdsInVersionNew forms a single query to fetch a list of content ids in a repository version
// using the new content_ids array field (only works for versions created after August 1st, 2025)
//
// It uses randomized query parameter names and modifies the passed in namedArgs to include the key/values for these named query parameters.
// By using randomized query parameter names, this query can be included multiple times with different repository
// versions as multiple subqueries.
func contentIdsInVersion(repoId string, versionNum int, namedArgs *pgx.NamedArgs) string {
func contentIdsInVersionNew(repoId string, versionNum int, namedArgs *pgx.NamedArgs) string {
ran := rand.Int()
repoIdName := fmt.Sprintf("%v%v", "repoName", ran)
versionNumName := fmt.Sprintf("%v%v", "versionNum", ran)
Expand All @@ -25,19 +28,113 @@ func contentIdsInVersion(repoId string, versionNum int, namedArgs *pgx.NamedArgs
return fmt.Sprintf(query, repoIdName, versionNumName)
}

// returns part of a query that joins a table to the needed tables to select content units in a given set of versions
// contentIdsInVersionOld forms a single query to fetch a list of content ids in a repository version
// using the old method with core_repositorycontent table (works for all versions)
//
// TODO: DELETE THIS FUNCTION after August 1st, 2026 when all repository versions use content_ids
// It uses randomized query parameter names and modifies the passed in namedArgs to include the key/values for these named query parameters.
// By using randomized query parameter names, this query can be included multiple times with different repository
// versions as multiple subqueries.
func contentIdsInVersionOld(repoId string, versionNum int, namedArgs *pgx.NamedArgs) string {
ran := rand.Int()
repoIdName := fmt.Sprintf("%v%v", "repoName", ran)
versionNumName := fmt.Sprintf("%v%v", "versionNum", ran)
query := `
(crv.repository_id = @%v AND crv.number <= @%v AND NOT (crv2.number <= @%v AND crv2.number IS NOT NULL))
`
(*namedArgs)[repoIdName] = repoId
(*namedArgs)[versionNumName] = versionNum
return fmt.Sprintf(query, repoIdName, versionNumName, versionNumName)
}

// contentIdsInVersionsNew returns part of a query that joins a table to the needed tables to select content units
// in a given set of versions using the new content_ids array field
//
// TODO: DELETE THIS FUNCTION after August 1st, 2026 when all repository versions use content_ids
// The return of this functions should be added to a query such as "select ** from TABLE rp" query,
// Where rp has a column 'content_ptr_id', such as rpm_updaterecord, rpm_package, etc.
// Takes in a pointer to Named args in order to add required named arguments for the query.
func contentIdsInVersions(repoVerMap []ParsedRepoVersion, namedArgs *pgx.NamedArgs) string {
func contentIdsInVersionsNew(repoVerMap []ParsedRepoVersion, namedArgs *pgx.NamedArgs) string {
mainQuery := `
INNER JOIN core_repositoryversion crv ON (rp.content_ptr_id = ANY(crv.content_ids))
WHERE
`
queries := []string{}
for _, parsed := range repoVerMap {
queries = append(queries, contentIdsInVersion(parsed.RepositoryUUID, parsed.Version, namedArgs))
queries = append(queries, contentIdsInVersionNew(parsed.RepositoryUUID, parsed.Version, namedArgs))
}
return fmt.Sprintf("%v (%v)", mainQuery, strings.Join(queries, " OR "))
}

// contentIdsInVersionsOld returns part of a query that joins a table to the needed tables to select content units
// in a given set of versions using the old method with core_repositorycontent table
//
// TODO: DELETE THIS FUNCTION after August 1st, 2025 when all repository versions use content_ids
// The return of this functions should be added to a query such as "select ** from TABLE rp" query,
// Where rp has a column 'content_ptr_id', such as rpm_updaterecord, rpm_package, etc.
// Takes in a pointer to Named args in order to add required named arguments for the query.
func contentIdsInVersionsOld(repoVerMap []ParsedRepoVersion, namedArgs *pgx.NamedArgs) string {
mainQuery := `
INNER JOIN core_repositorycontent crc on rp.content_ptr_id = crc.content_id
INNER JOIN core_repositoryversion crv ON (crc.version_added_id = crv.pulp_id)
LEFT OUTER JOIN core_repositoryversion crv2 ON (crc.version_removed_id = crv2.pulp_id)
WHERE
`
queries := []string{}
for _, parsed := range repoVerMap {
queries = append(queries, contentIdsInVersionOld(parsed.RepositoryUUID, parsed.Version, namedArgs))
}
return fmt.Sprintf("%v (%v)", mainQuery, strings.Join(queries, " OR "))
}

// checkAllVersionsWithContentIds checks if all repository versions in the given map were created after the specified date
func checkAllVersionsWithContentIds(ctx context.Context, conn *pgxpool.Conn, repoVerMap []ParsedRepoVersion) (bool, error) {
if len(repoVerMap) == 0 {
return true, nil
}

// Build query to check content_ids dates of all repository versions
queryParts := []string{}
args := pgx.NamedArgs{}
for i, parsed := range repoVerMap {
repoIdParam := fmt.Sprintf("repoId%d", i)
versionNumParam := fmt.Sprintf("versionNum%d", i)
queryParts = append(queryParts, fmt.Sprintf("(crv.repository_id = @%s AND crv.number = @%s)", repoIdParam, versionNumParam))
args[repoIdParam] = parsed.RepositoryUUID
args[versionNumParam] = parsed.Version
}

query := fmt.Sprintf(`
SELECT COUNT(*)
FROM core_repositoryversion crv
WHERE (%s) AND crv.content_ids is null
`, strings.Join(queryParts, " OR "))

var count int
err := conn.QueryRow(ctx, query, args).Scan(&count)
if err != nil {
return false, err
}

// If count is 0, all versions are after the cutoff date
return count == 0, nil
}

// returns part of a query that joins a table to the needed tables to select content units in a given set of versions
//
// The return of this functions should be added to a query such as "select ** from TABLE rp" query,
// Where rp has a column 'content_ptr_id', such as rpm_updaterecord, rpm_package, etc.
// Takes in a pointer to Named args in order to add required named arguments for the query.
// This function automatically chooses between the old and new query methods based on repository version creation dates.
func contentIdsInVersions(ctx context.Context, conn *pgxpool.Conn, repoVerMap []ParsedRepoVersion, namedArgs *pgx.NamedArgs) (string, error) {
// Check if all versions are after the cutoff date, not needed after August 1st, 2026
useNewMethod, err := checkAllVersionsWithContentIds(ctx, conn, repoVerMap)
if err != nil {
return "", fmt.Errorf("error checking repository version dates: %w", err)
}

if useNewMethod {
return contentIdsInVersionsNew(repoVerMap, namedArgs), nil
}
return contentIdsInVersionsOld(repoVerMap, namedArgs), nil
}
30 changes: 24 additions & 6 deletions pkg/tangy/rpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ func (t *tangyImpl) RpmRepositoryVersionPackageSearch(ctx context.Context, hrefs
}

args := pgx.NamedArgs{"nameFilter": search + "%", "limit": limit}
innerUnion := contentIdsInVersions(repoVerMap, &args)
innerUnion, err := contentIdsInVersions(ctx, conn, repoVerMap, &args)
if err != nil {
return nil, err
}

query := `SELECT DISTINCT ON (rp.name) rp.name, rp.summary
FROM rpm_package rp `
Expand Down Expand Up @@ -152,7 +155,10 @@ func (t *tangyImpl) RpmRepositoryVersionPackageGroupSearch(ctx context.Context,
}

args := pgx.NamedArgs{"nameFilter": "%" + search + "%"}
innerUnion := contentIdsInVersions(repoVerMap, &args)
innerUnion, err := contentIdsInVersions(ctx, conn, repoVerMap, &args)
if err != nil {
return nil, err
}

query := `SELECT DISTINCT ON (rp.name, rp.id, rp.packages) rp.name, rp.id, rp.description, rp.packages
FROM rpm_packagegroup rp
Expand Down Expand Up @@ -233,7 +239,10 @@ func (t *tangyImpl) RpmRepositoryVersionEnvironmentSearch(ctx context.Context, h
}

args := pgx.NamedArgs{"nameFilter": "%" + search + "%", "limit": limit}
innerUnion := contentIdsInVersions(repoVerMap, &args)
innerUnion, err := contentIdsInVersions(ctx, conn, repoVerMap, &args)
if err != nil {
return nil, err
}

query := `SELECT DISTINCT ON (rp.name, rp.id) rp.name, rp.id, rp.description
FROM rpm_packageenvironment rp
Expand Down Expand Up @@ -311,7 +320,10 @@ func (t *tangyImpl) RpmRepositoryVersionErrataList(ctx context.Context, hrefs []
}
filterQuery := concatFilter.String()

innerUnion := contentIdsInVersions(repoVerMap, &args)
innerUnion, err := contentIdsInVersions(ctx, conn, repoVerMap, &args)
if err != nil {
return nil, 0, err
}

var countTotal int
err = conn.QueryRow(ctx, countQueryOpen+innerUnion+filterQuery,
Expand Down Expand Up @@ -403,7 +415,10 @@ func (t *tangyImpl) RpmRepositoryVersionModuleStreamsList(ctx context.Context, h
INNER JOIN rpm_modulemd_packages rmp on rmp.modulemd_id = rp.content_ptr_id
INNER JOIN rpm_package pack on pack.content_ptr_id = rmp.package_id `

innerUnion := contentIdsInVersions(repoVerMap, &args)
innerUnion, err := contentIdsInVersions(ctx, conn, repoVerMap, &args)
if err != nil {
return nil, err
}

rpmNameFilter := ""

Expand Down Expand Up @@ -451,7 +466,10 @@ func (t *tangyImpl) RpmRepositoryVersionPackageList(ctx context.Context, hrefs [

countQueryOpen := "select count(distinct(rp.content_ptr_id)) as total FROM rpm_package rp "
args := pgx.NamedArgs{"nameFilter": filterOpts.Name + "%"}
innerUnion := contentIdsInVersions(repoVerMap, &args)
innerUnion, err := contentIdsInVersions(ctx, conn, repoVerMap, &args)
if err != nil {
return nil, 0, err
}

var countTotal int
err = conn.QueryRow(ctx, countQueryOpen+innerUnion+" AND rp.name ILIKE CONCAT( @nameFilter::text, '%')", args).Scan(&countTotal)
Expand Down
Loading