Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get all the indices from Opensearch eligible for reindexing #8817

Merged
merged 14 commits into from
Mar 20, 2025
24 changes: 24 additions & 0 deletions components/ingest-service/backend/chef_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,27 @@ type InternalChefAction struct {
Data string `json:"data,omitempty"`
Projects []string `json:"projects"`
}

type Indices []struct {
Health string `json:"health"`
Status string `json:"status"`
Index string `json:"index"`
UUID string `json:"uuid"`
Pri string `json:"pri"`
Rep string `json:"rep"`
DocsCount string `json:"docs.count"`
DocsDeleted string `json:"docs.deleted"`
StoreSize string `json:"store.size"`
PriStoreSize string `json:"pri.store.size"`
}

type IndexSettingsVersion struct {
Settings struct {
Index struct {
Version struct {
CreatedString string `json:"created"`
UpgradedString string `json:"upgraded"`
} `json:"version"`
} `json:"index"`
} `json:"settings"`
}
5 changes: 5 additions & 0 deletions components/ingest-service/backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,9 @@ type Client interface {
ReindexNodeStateToLatest(context.Context, string) (string, error)
GetActions(string, int, time.Time, string, bool) ([]InternalChefAction, int64, error)
DeleteAllIndexesWithPrefix(string, context.Context) error

GetIndices(ctx context.Context) (Indices, error)
GetIndexSettingsVersion(index string) (*IndexSettingsVersion, error)

TriggerReindex(index string) error
}
52 changes: 51 additions & 1 deletion components/ingest-service/backend/elastic/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package elastic
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

elastic "github.com/olivere/elastic/v7"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/chef/automate/components/ingest-service/backend"
Expand Down Expand Up @@ -478,7 +480,7 @@ func (es *Backend) RemoveAlias(ctx context.Context, aliasName string, indexName
return err
}

//GetNodeCount - count how many node-state documents are in an index
// GetNodeCount - count how many node-state documents are in an index
func (es *Backend) GetNodeCount(ctx context.Context, indexName string) (int64, error) {
count, err := es.client.Count(indexName).Type("node-state").Do(ctx)
if err != nil {
Expand All @@ -487,3 +489,51 @@ func (es *Backend) GetNodeCount(ctx context.Context, indexName string) (int64, e
}
return count, err
}

func (es *Backend) GetIndices(ctx context.Context) (backend.Indices, error) {
catIndicesService := es.client.CatIndices()
resp, err := catIndicesService.Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch indices: %w", err)
}

bx, err := json.Marshal(resp)
if err != nil {
return nil, fmt.Errorf("failed to read indices response: %w", err)
}

var indices backend.Indices
if err = json.Unmarshal(bx, &indices); err != nil {
return nil, fmt.Errorf("failed to unmarshal indices response: %w", err)
}

return indices, nil
}

func (es *Backend) GetIndexSettingsVersion(index string) (*backend.IndexSettingsVersion, error) {
resp, err := es.client.IndexGetSettings(index).Do(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to fetch settings for index %s: %w", index, err)
}

settings, exists := resp[index]
if !exists {
return nil, errors.New("index settings not found in response")
}

settingsJSON, err := json.Marshal(settings)
if err != nil {
return nil, fmt.Errorf("failed to marshal settings for index %s: %w", index, err)
}

var setting backend.IndexSettingsVersion
if err := json.Unmarshal(settingsJSON, &setting); err != nil {
return nil, fmt.Errorf("failed to unmarshal settings for index %s: %w", index, err)
}

return &setting, nil
}

func (es *Backend) TriggerReindex(index string) error {
return nil
}
79 changes: 77 additions & 2 deletions components/ingest-service/server/chef.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"encoding/json"
"strings"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -21,6 +22,13 @@ import (
"github.com/chef/automate/lib/version"
)

var skipIndices = map[string]bool{
"security-auditlog": true,
".opendistro": true,
".plugins-ml-config": true,
".opensearch-observability": true,
}

type ChefIngestServer struct {
chefRunPipeline pipeline.ChefRunPipeline
chefActionPipeline pipeline.ChefActionPipeline
Expand Down Expand Up @@ -239,8 +247,75 @@ func (s *ChefIngestServer) ProcessNodeDelete(ctx context.Context,
return &response.ProcessNodeDeleteResponse{}, nil
}

func (s *ChefIngestServer) GetIndicesEligableForReindexing(ctx context.Context) (map[string]backend.IndexSettingsVersion, error) {
var eligableIndices = make(map[string]backend.IndexSettingsVersion)
indices, err := s.client.GetIndices(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to fetch indices: %s", err)
}

OuterLoop:
for _, index := range indices {
for prefix := range skipIndices {
if strings.HasPrefix(index.Index, prefix) {
log.WithFields(log.Fields{"index": index.Index}).Info("Skipping index")
continue OuterLoop
}
}

versionSettings, err := s.client.GetIndexSettingsVersion(index.Index)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to fetch settings for index %s: %s", index.Index, err)
}

// Is reindexing needed?
if versionSettings.Settings.Index.Version.CreatedString == versionSettings.Settings.Index.Version.UpgradedString {
log.WithFields(log.Fields{"index": index.Index}).Info("Skipping index as it is already up to date")
continue
}

eligableIndices[index.Index] = *versionSettings
}

if len(eligableIndices) == 0 {
log.Info("No indices are eligable for reindexing")
return nil, status.Errorf(codes.NotFound, "no indices are eligable for reindexing")
}

return eligableIndices, nil
}

func (s *ChefIngestServer) StartReindex(ctx context.Context, req *ingest.StartReindexRequest) (*ingest.StartReindexResponse, error) {
log.Info("Received request to start reindexing")

indices, err := s.GetIndicesEligableForReindexing(ctx)

if err != nil {
return nil, status.Errorf(codes.Internal, "failed to fetch indices: %s", err)
}

// Add to the database that indexing request is running
requestID, err := s.db.InsertReindexRequest("running", time.Now())
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to add reindex request: %s", err)
}

for key, value := range indices {
if err := s.db.InsertReindexRequestDetailed(storage.ReindexRequestDetailed{
RequestID: requestID,
Index: key,
FromVersion: value.Settings.Index.Version.CreatedString,
ToVersion: value.Settings.Index.Version.UpgradedString,
OsTaskID: "",
Heartbeat: time.Now(),
HavingAlias: false,
AliasList: "",
}, time.Now()); err != nil {
return nil, status.Errorf(codes.Internal, "failed to add reindex request: %s", err)
}
}

log.Info("Reindexing started successfully")
return &ingest.StartReindexResponse{
Message: "Reindexing started successfully",
}, nil
Expand All @@ -254,16 +329,16 @@ func (s *ChefIngestServer) GetReindexStatus(ctx context.Context, req *ingest.Get
return nil, status.Errorf(codes.Internal, "%s", errMsg)
}
var requestID int
var err error
// If RequestId is missing (0), fetch the latest request ID
if req == nil || req.RequestId == 0 {
log.Debug("RequestId is missing, fetching the latest request ID")

latestRequestID, err := s.db.GetLatestReindexRequestID()
requestID, err = s.db.GetLatestReindexRequestID()
if err != nil {
log.WithFields(log.Fields{"error": err.Error()}).Error("Failed to fetch latest reindex request ID")
return nil, status.Errorf(codes.Internal, "failed to fetch latest reindex request ID: %v", err)
}
requestID = latestRequestID
log.WithFields(log.Fields{"requestID": requestID}).Debug("Fetched latest request ID successfully")
} else {
requestID = int(req.RequestId)
Expand Down
31 changes: 17 additions & 14 deletions components/ingest-service/storage/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package storage
import (
"database/sql"
"encoding/json"
"math/rand"
"sort"
"time"

Expand All @@ -26,7 +25,7 @@ func NewDB(dbConn *sql.DB) *DB {

// ReindexRequest represents the reindex_requests table
type ReindexRequest struct {
RequestID int `db:"request_id"`
ID int `db:"id"`
Status string `db:"status"`
CreatedAt time.Time `db:"created_at"`
LastUpdated time.Time `db:"last_updated"`
Expand Down Expand Up @@ -76,9 +75,12 @@ func RunMigrations(dbConf *config.Storage) error {

// Create a new reindex request with a random request_id
func (db *DB) InsertReindexRequest(status string, currentTime time.Time) (int, error) {
requestID := rand.Int()
_, err := db.Exec(insertReindexRequest, requestID, status, currentTime, currentTime)
return requestID, err
var requestID int
err := db.QueryRow(insertReindexRequest, status, currentTime, currentTime).Scan(&requestID)
if err != nil {
return 0, errors.Wrap(err, "failed to insert reindex request")
}
return requestID, nil
}

// Update an existing reindex request
Expand Down Expand Up @@ -182,7 +184,7 @@ func (db *DB) GetReindexStatus(requestID int) (*StatusResponse, error) {
// If no details are found, return a JSON response with an empty indexes array
if len(details) == 0 {
return &StatusResponse{
RequestID: request.RequestID,
RequestID: request.ID,
Status: request.Status,
Indexes: []IndexStatusDetail{},
}, nil
Expand Down Expand Up @@ -218,7 +220,7 @@ func (db *DB) GetReindexStatus(requestID int) (*StatusResponse, error) {
}

statusResponse := &StatusResponse{
RequestID: request.RequestID,
RequestID: request.ID,
Status: overallStatus,
Indexes: indexes,
}
Expand All @@ -238,7 +240,7 @@ func (db *DB) GetLatestReindexRequestID() (int, error) {
return 0, errors.New("database connection is not initialized")
}
var requestID int
err := db.QueryRow("SELECT request_id FROM reindex_requests ORDER BY created_at DESC LIMIT 1").Scan(&requestID)
err := db.QueryRow("SELECT id FROM reindex_requests ORDER BY created_at DESC LIMIT 1").Scan(&requestID)
if err != nil {
if err == sql.ErrNoRows {
logrus.Error("No reindex requests found in the database")
Expand All @@ -256,16 +258,17 @@ func (db *DB) GetLatestReindexRequestID() (int, error) {

// SQL Queries
const insertReindexRequest = `
INSERT INTO reindex_requests(request_id, status, created_at, last_updated)
VALUES ($1, $2, $3, $4);`
INSERT INTO reindex_requests(status, created_at, last_updated)
VALUES ($1, $2, $3)
RETURNING id;`

const updateReindexRequest = `
UPDATE reindex_requests SET status = $1, last_updated = $2 WHERE request_id = $3;`
UPDATE reindex_requests SET status = $1, last_updated = $2 WHERE id = $3;`

const getLatestReindexRequest = `
SELECT request_id, status, created_at, last_updated
SELECT id, status, created_at, last_updated
FROM reindex_requests
WHERE request_id = $1
WHERE id = $1
ORDER BY last_updated DESC
LIMIT 1;`

Expand All @@ -280,7 +283,7 @@ WHERE request_id = $1
ORDER BY updated_at DESC;`

const deleteReindexRequest = `
DELETE FROM reindex_requests WHERE request_id = $1;`
DELETE FROM reindex_requests WHERE id = $1;`

const deleteReindexRequestDetail = `
DELETE FROM reindex_request_detailed WHERE id = $1;`
Loading
Loading