diff --git a/components/ingest-service/backend/chef_action.go b/components/ingest-service/backend/chef_action.go index ad4793410f..8529e58e39 100644 --- a/components/ingest-service/backend/chef_action.go +++ b/components/ingest-service/backend/chef_action.go @@ -26,3 +26,27 @@ type InternalChefAction struct { Data string `json:"data,omitempty"` Projects []string `json:"projects"` } + +type Index struct { + Health string `json:"health"` + Status string `json:"status"` + Index string `json:"index"` + UUID string `json:"uuid"` + Pri string `json:"pri"` + Rep string `json:"rep"` + DocsCount string `json:"docs.count"` + DocsDeleted string `json:"docs.deleted"` + StoreSize string `json:"store.size"` + PriStoreSize string `json:"pri.store.size"` +} + +type IndexSettingsVersion struct { + Settings struct { + Index struct { + Version struct { + CreatedString string `json:"created"` + UpgradedString string `json:"upgraded"` + } `json:"version"` + } `json:"index"` + } `json:"settings"` +} diff --git a/components/ingest-service/backend/client.go b/components/ingest-service/backend/client.go index b58b2b31c7..f60a6a39bb 100644 --- a/components/ingest-service/backend/client.go +++ b/components/ingest-service/backend/client.go @@ -94,4 +94,9 @@ type Client interface { ReindexNodeStateToLatest(context.Context, string) (string, error) GetActions(string, int, time.Time, string, bool) ([]InternalChefAction, int64, error) DeleteAllIndexesWithPrefix(string, context.Context) error + + GetIndices(ctx context.Context) ([]Index, error) + GetIndexVersionSettings(index string) (*IndexSettingsVersion, error) + + TriggerReindex(index string) error } diff --git a/components/ingest-service/backend/elastic/migration.go b/components/ingest-service/backend/elastic/migration.go index 18e1e663a9..e04388d870 100644 --- a/components/ingest-service/backend/elastic/migration.go +++ b/components/ingest-service/backend/elastic/migration.go @@ -3,10 +3,12 @@ package elastic import ( "context" "encoding/json" + "fmt" "strings" "time" elastic "github.com/olivere/elastic/v7" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/chef/automate/components/ingest-service/backend" @@ -478,7 +480,7 @@ func (es *Backend) RemoveAlias(ctx context.Context, aliasName string, indexName return err } -//GetNodeCount - count how many node-state documents are in an index +// GetNodeCount - count how many node-state documents are in an index func (es *Backend) GetNodeCount(ctx context.Context, indexName string) (int64, error) { count, err := es.client.Count(indexName).Type("node-state").Do(ctx) if err != nil { @@ -487,3 +489,51 @@ func (es *Backend) GetNodeCount(ctx context.Context, indexName string) (int64, e } return count, err } + +func (es *Backend) GetIndices(ctx context.Context) ([]backend.Index, error) { + catIndicesService := es.client.CatIndices() + resp, err := catIndicesService.Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to fetch indices: %w", err) + } + + bx, err := json.Marshal(resp) + if err != nil { + return nil, fmt.Errorf("failed to read indices response: %w", err) + } + + var indices []backend.Index + if err = json.Unmarshal(bx, &indices); err != nil { + return nil, fmt.Errorf("failed to unmarshal indices response: %w", err) + } + + return indices, nil +} + +func (es *Backend) GetIndexVersionSettings(index string) (*backend.IndexSettingsVersion, error) { + resp, err := es.client.IndexGetSettings(index).Do(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to fetch settings for index %s: %w", index, err) + } + + settings, exists := resp[index] + if !exists { + return nil, errors.New("index settings not found in response") + } + + settingsJSON, err := json.Marshal(settings) + if err != nil { + return nil, fmt.Errorf("failed to marshal settings for index %s: %w", index, err) + } + + var setting backend.IndexSettingsVersion + if err := json.Unmarshal(settingsJSON, &setting); err != nil { + return nil, fmt.Errorf("failed to unmarshal settings for index %s: %w", index, err) + } + + return &setting, nil +} + +func (es *Backend) TriggerReindex(index string) error { + return nil +} diff --git a/components/ingest-service/server/chef.go b/components/ingest-service/server/chef.go index 8b3b5512a7..88e669d1e5 100644 --- a/components/ingest-service/server/chef.go +++ b/components/ingest-service/server/chef.go @@ -3,6 +3,7 @@ package server import ( "context" "encoding/json" + "strings" "time" log "github.com/sirupsen/logrus" @@ -21,6 +22,13 @@ import ( "github.com/chef/automate/lib/version" ) +var skipIndices = map[string]bool{ + "security-auditlog": true, + ".opendistro": true, + ".plugins-ml-config": true, + ".opensearch-observability": true, +} + type ChefIngestServer struct { chefRunPipeline pipeline.ChefRunPipeline chefActionPipeline pipeline.ChefActionPipeline @@ -239,8 +247,76 @@ func (s *ChefIngestServer) ProcessNodeDelete(ctx context.Context, return &response.ProcessNodeDeleteResponse{}, nil } +func (s *ChefIngestServer) GetIndicesEligableForReindexing(ctx context.Context) (map[string]backend.IndexSettingsVersion, error) { + var eligableIndices = make(map[string]backend.IndexSettingsVersion) + indices, err := s.client.GetIndices(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to fetch indices: %s", err) + } + +OuterLoop: + for _, index := range indices { + for prefix := range skipIndices { + if strings.HasPrefix(index.Index, prefix) { + log.WithFields(log.Fields{"index": index.Index}).Info("Skipping index") + continue OuterLoop + } + } + + versionSettings, err := s.client.GetIndexVersionSettings(index.Index) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to fetch settings for index %s: %s", index.Index, err) + } + + // Is reindexing needed? + if versionSettings.Settings.Index.Version.CreatedString == versionSettings.Settings.Index.Version.UpgradedString { + log.WithFields(log.Fields{"index": index.Index}).Info("Skipping index as it is already up to date") + continue + } + + eligableIndices[index.Index] = *versionSettings + } + + return eligableIndices, nil +} + func (s *ChefIngestServer) StartReindex(ctx context.Context, req *ingest.StartReindexRequest) (*ingest.StartReindexResponse, error) { log.Info("Received request to start reindexing") + + indices, err := s.GetIndicesEligableForReindexing(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to fetch indices: %s", err) + } + + if len(indices) == 0 { + log.Info("No indices found that need reindexing") + return &ingest.StartReindexResponse{ + Message: "No indices found that need reindexing", + }, nil + } + + // Add to the database that indexing request is running + requestID, err := s.db.InsertReindexRequest("running", time.Now()) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to add reindex request: %s", err) + } + + for key, value := range indices { + if err := s.db.InsertReindexRequestDetailed(storage.ReindexRequestDetailed{ + RequestID: requestID, + Index: key, + FromVersion: value.Settings.Index.Version.CreatedString, + ToVersion: value.Settings.Index.Version.UpgradedString, + OsTaskID: "", + Heartbeat: time.Now(), + HavingAlias: false, + AliasList: "", + }, time.Now()); err != nil { + return nil, status.Errorf(codes.Internal, "failed to add reindex request: %s", err) + } + } + + log.Info("Reindexing started successfully") return &ingest.StartReindexResponse{ Message: "Reindexing started successfully", }, nil @@ -254,16 +330,16 @@ func (s *ChefIngestServer) GetReindexStatus(ctx context.Context, req *ingest.Get return nil, status.Errorf(codes.Internal, "%s", errMsg) } var requestID int + var err error // If RequestId is missing (0), fetch the latest request ID if req == nil || req.RequestId == 0 { log.Debug("RequestId is missing, fetching the latest request ID") - latestRequestID, err := s.db.GetLatestReindexRequestID() + requestID, err = s.db.GetLatestReindexRequestID() if err != nil { log.WithFields(log.Fields{"error": err.Error()}).Error("Failed to fetch latest reindex request ID") return nil, status.Errorf(codes.Internal, "failed to fetch latest reindex request ID: %v", err) } - requestID = latestRequestID log.WithFields(log.Fields{"requestID": requestID}).Debug("Fetched latest request ID successfully") } else { requestID = int(req.RequestId) diff --git a/components/ingest-service/storage/db.go b/components/ingest-service/storage/db.go index 8afc0f8eee..20542a292a 100644 --- a/components/ingest-service/storage/db.go +++ b/components/ingest-service/storage/db.go @@ -3,7 +3,6 @@ package storage import ( "database/sql" "encoding/json" - "math/rand" "sort" "time" @@ -26,7 +25,7 @@ func NewDB(dbConn *sql.DB) *DB { // ReindexRequest represents the reindex_requests table type ReindexRequest struct { - RequestID int `db:"request_id"` + ID int `db:"id"` Status string `db:"status"` CreatedAt time.Time `db:"created_at"` LastUpdated time.Time `db:"last_updated"` @@ -76,9 +75,12 @@ func RunMigrations(dbConf *config.Storage) error { // Create a new reindex request with a random request_id func (db *DB) InsertReindexRequest(status string, currentTime time.Time) (int, error) { - requestID := rand.Int() - _, err := db.Exec(insertReindexRequest, requestID, status, currentTime, currentTime) - return requestID, err + var requestID int + err := db.QueryRow(insertReindexRequest, status, currentTime, currentTime).Scan(&requestID) + if err != nil { + return 0, errors.Wrap(err, "failed to insert reindex request") + } + return requestID, nil } // Update an existing reindex request @@ -182,7 +184,7 @@ func (db *DB) GetReindexStatus(requestID int) (*StatusResponse, error) { // If no details are found, return a JSON response with an empty indexes array if len(details) == 0 { return &StatusResponse{ - RequestID: request.RequestID, + RequestID: request.ID, Status: request.Status, Indexes: []IndexStatusDetail{}, }, nil @@ -218,7 +220,7 @@ func (db *DB) GetReindexStatus(requestID int) (*StatusResponse, error) { } statusResponse := &StatusResponse{ - RequestID: request.RequestID, + RequestID: request.ID, Status: overallStatus, Indexes: indexes, } @@ -238,7 +240,7 @@ func (db *DB) GetLatestReindexRequestID() (int, error) { return 0, errors.New("database connection is not initialized") } var requestID int - err := db.QueryRow("SELECT request_id FROM reindex_requests ORDER BY created_at DESC LIMIT 1").Scan(&requestID) + err := db.QueryRow("SELECT id FROM reindex_requests ORDER BY created_at DESC LIMIT 1").Scan(&requestID) if err != nil { if err == sql.ErrNoRows { logrus.Error("No reindex requests found in the database") @@ -256,16 +258,17 @@ func (db *DB) GetLatestReindexRequestID() (int, error) { // SQL Queries const insertReindexRequest = ` -INSERT INTO reindex_requests(request_id, status, created_at, last_updated) -VALUES ($1, $2, $3, $4);` +INSERT INTO reindex_requests(status, created_at, last_updated) +VALUES ($1, $2, $3) +RETURNING id;` const updateReindexRequest = ` -UPDATE reindex_requests SET status = $1, last_updated = $2 WHERE request_id = $3;` +UPDATE reindex_requests SET status = $1, last_updated = $2 WHERE id = $3;` const getLatestReindexRequest = ` -SELECT request_id, status, created_at, last_updated +SELECT id, status, created_at, last_updated FROM reindex_requests -WHERE request_id = $1 +WHERE id = $1 ORDER BY last_updated DESC LIMIT 1;` @@ -280,7 +283,7 @@ WHERE request_id = $1 ORDER BY updated_at DESC;` const deleteReindexRequest = ` -DELETE FROM reindex_requests WHERE request_id = $1;` +DELETE FROM reindex_requests WHERE id = $1;` const deleteReindexRequestDetail = ` DELETE FROM reindex_request_detailed WHERE id = $1;` diff --git a/components/ingest-service/storage/db_test.go b/components/ingest-service/storage/db_test.go index 0ab525c6db..5650a062aa 100644 --- a/components/ingest-service/storage/db_test.go +++ b/components/ingest-service/storage/db_test.go @@ -23,8 +23,8 @@ func TestInsertReindexRequestSuccess(t *testing.T) { createdAt := time.Now() - query := `INSERT INTO reindex_requests(request_id, status, created_at, last_updated) VALUES ($1, $2, $3, $4);` - mock.ExpectExec(query).WithArgs(sqlmock.AnyArg(), "running", createdAt, createdAt).WillReturnResult(sqlmock.NewResult(1, 1)) + query := `INSERT INTO reindex_requests(status, created_at, last_updated) VALUES ($1, $2, $3) RETURNING id;` + mock.ExpectQuery(query).WithArgs("running", createdAt, createdAt).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1)) requestID, err := db.InsertReindexRequest("running", createdAt) assert.NoError(t, err) @@ -42,11 +42,11 @@ func TestInsertReindexRequestFailure(t *testing.T) { createdAt := time.Now() - query := `INSERT INTO reindex_requests(request_id, status, created_at, last_updated) VALUES ($1, $2, $3, $4);` - mock.ExpectExec(query).WithArgs(sqlmock.AnyArg(), "running", createdAt, createdAt).WillReturnError(fmt.Errorf("insert error")) + query := `INSERT INTO reindex_requests(status, created_at, last_updated) VALUES ($1, $2, $3) RETURNING id;` + mock.ExpectQuery(query).WithArgs("running", createdAt, createdAt).WillReturnError(fmt.Errorf("insert error")) _, err = db.InsertReindexRequest("running", createdAt) - assert.Equal(t, "insert error", err.Error()) + assert.Equal(t, "failed to insert reindex request: insert error", err.Error()) } func TestUpdateReindexRequestSuccess(t *testing.T) { @@ -60,7 +60,7 @@ func TestUpdateReindexRequestSuccess(t *testing.T) { updatedAt := time.Now() - query := `UPDATE reindex_requests SET status = $1, last_updated = $2 WHERE request_id = $3;` + query := `UPDATE reindex_requests SET status = $1, last_updated = $2 WHERE id = $3;` mock.ExpectExec(query).WithArgs("completed", updatedAt, 1).WillReturnResult(sqlmock.NewResult(1, 1)) err = db.UpdateReindexRequest(1, "completed", updatedAt) @@ -78,7 +78,7 @@ func TestUpdateReindexRequestFailure(t *testing.T) { updatedAt := time.Now() - query := `UPDATE reindex_requests SET status = $1, last_updated = $2 WHERE request_id = $3;` + query := `UPDATE reindex_requests SET status = $1, last_updated = $2 WHERE id = $3;` mock.ExpectExec(query).WithArgs("completed", updatedAt, 1).WillReturnError(fmt.Errorf("update error")) err = db.UpdateReindexRequest(1, "completed", updatedAt) @@ -172,7 +172,7 @@ func TestDeleteReindexRequestSuccess(t *testing.T) { DbMap: &gorp.DbMap{Db: dbConn, Dialect: gorp.PostgresDialect{}}, } - query := `DELETE FROM reindex_requests WHERE request_id = $1;` + query := `DELETE FROM reindex_requests WHERE id = $1;` mock.ExpectExec(query).WithArgs(1).WillReturnResult(sqlmock.NewResult(1, 1)) err = db.DeleteReindexRequest(1) @@ -188,7 +188,7 @@ func TestDeleteReindexRequestFailure(t *testing.T) { DbMap: &gorp.DbMap{Db: dbConn, Dialect: gorp.PostgresDialect{}}, } - query := `DELETE FROM reindex_requests WHERE request_id = $1;` + query := `DELETE FROM reindex_requests WHERE id = $1;` mock.ExpectExec(query).WithArgs(1).WillReturnError(fmt.Errorf("delete error")) err = db.DeleteReindexRequest(1) @@ -242,8 +242,8 @@ func TestGetReindexStatusSuccess(t *testing.T) { heartbeat := time.Now() // Mock the reindex_requests table query - requestQuery := `SELECT request_id, status, created_at, last_updated FROM reindex_requests WHERE request_id = $1 ORDER BY last_updated DESC LIMIT 1;` - requestColumns := []string{"request_id", "status", "created_at", "last_updated"} + requestQuery := `SELECT id, status, created_at, last_updated FROM reindex_requests WHERE id = $1 ORDER BY last_updated DESC LIMIT 1;` + requestColumns := []string{"id", "status", "created_at", "last_updated"} mock.ExpectQuery(requestQuery).WithArgs(requestID). WillReturnRows(sqlmock.NewRows(requestColumns).AddRow(requestID, "completed", createdAt, updatedAt)) @@ -281,7 +281,7 @@ func TestGetReindexStatusFailure(t *testing.T) { requestID := 1 // Mock the reindex_requests table query to return an error - requestQuery := `SELECT request_id, status, created_at, last_updated FROM reindex_requests WHERE request_id = $1 ORDER BY last_updated DESC LIMIT 1;` + requestQuery := `SELECT id, status, created_at, last_updated FROM reindex_requests WHERE id = $1 ORDER BY last_updated DESC LIMIT 1;` mock.ExpectQuery(requestQuery).WithArgs(requestID).WillReturnError(fmt.Errorf("query error")) // Call the function @@ -306,8 +306,8 @@ func TestGetReindexStatusNoDetails(t *testing.T) { updatedAt := time.Now() // Mock the reindex_requests table query - requestQuery := `SELECT request_id, status, created_at, last_updated FROM reindex_requests WHERE request_id = $1 ORDER BY last_updated DESC LIMIT 1;` - requestColumns := []string{"request_id", "status", "created_at", "last_updated"} + requestQuery := `SELECT id, status, created_at, last_updated FROM reindex_requests WHERE id = $1 ORDER BY last_updated DESC LIMIT 1;` + requestColumns := []string{"id", "status", "created_at", "last_updated"} mock.ExpectQuery(requestQuery).WithArgs(requestID). WillReturnRows(sqlmock.NewRows(requestColumns).AddRow(requestID, "completed", createdAt, updatedAt)) diff --git a/components/ingest-service/storage/schema/sql/01_tables_creation.up.sql b/components/ingest-service/storage/schema/sql/01_tables_creation.up.sql index 9110c2ea62..8eb5a0febb 100644 --- a/components/ingest-service/storage/schema/sql/01_tables_creation.up.sql +++ b/components/ingest-service/storage/schema/sql/01_tables_creation.up.sql @@ -1,12 +1,12 @@ CREATE TABLE IF NOT EXISTS reindex_requests ( - request_id INT PRIMARY KEY, + id SERIAL PRIMARY KEY, status VARCHAR(50) CHECK (status IN ('running', 'failed', 'completed')), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS reindex_request_detailed ( - id INT PRIMARY KEY, + id SERIAL PRIMARY KEY, request_id INT NOT NULL, index TEXT NOT NULL, from_version TEXT NOT NULL, @@ -18,6 +18,5 @@ CREATE TABLE IF NOT EXISTS reindex_request_detailed ( alias_list TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (request_id) REFERENCES reindex_requests(request_id) ON DELETE CASCADE + FOREIGN KEY (request_id) REFERENCES reindex_requests(id) ON DELETE CASCADE ); -