Skip to content
Open
452 changes: 252 additions & 200 deletions server/interfaces/http/routes/source/database/create.go

Large diffs are not rendered by default.

390 changes: 213 additions & 177 deletions server/interfaces/http/routes/source/database/refresh.go

Large diffs are not rendered by default.

304 changes: 182 additions & 122 deletions server/interfaces/http/routes/source/s3/create.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package s3

import (
"bufio"
"context"
"encoding/json"
"fmt"

"github.com/factly/gopie/domain"
Expand All @@ -11,6 +14,19 @@ import (
"go.uber.org/zap"
)

// SSEEvent represents a server-sent event for S3 upload progress
type SSEEvent struct {
Type string `json:"type"`
Message string `json:"message"`
Data any `json:"data,omitempty"`
}

// SSEData holds either data or error for SSE channel
type SSEData struct {
Data []byte
Error error
}

// uploadRequestBody represents the request body for uploading a file from S3
// @Description Request body for uploading a file from S3
type uploadRequestBody struct {
Expand Down Expand Up @@ -71,12 +87,12 @@ func (h *httpHandler) cleanupResources(rc resourceCleanup) {
}

// @Summary Upload file from S3
// @Description Upload a file from S3 and create a new dataset
// @Description Upload a file from S3 and create a new dataset with SSE progress updates
// @Tags s3
// @Accept json
// @Produce json
// @Produce text/event-stream
// @Param body body uploadRequestBody true "Upload request parameters"
// @Success 201 {object} responses.SuccessResponse{data=map[string]interface{}{"dataset":models.Dataset,"summary":any}}
// @Success 200 {string} string "SSE stream of upload progress"
// @Failure 400 {object} responses.ErrorResponse "Invalid request body or S3 file access error"
// @Failure 404 {object} responses.ErrorResponse "Project not found"
// @Failure 500 {object} responses.ErrorResponse "Internal server error"
Expand All @@ -92,6 +108,7 @@ func (h *httpHandler) upload(ctx *fiber.Ctx) error {
"code": fiber.StatusForbidden,
})
}

// Get request body from context
body := uploadRequestBody{
IgnoreErrors: true,
Expand All @@ -114,149 +131,192 @@ func (h *httpHandler) upload(ctx *fiber.Ctx) error {
})
}

// Check if project exists
project, err := h.projectSvc.Details(body.ProjectID, orgID)
if err != nil {
if domain.IsStoreError(err) && err == domain.ErrRecordNotFound {
h.logger.Error("Project not found", zap.Error(err), zap.String("project_id", body.ProjectID))
return ctx.Status(fiber.StatusNotFound).JSON(fiber.Map{
"error": "Project not found",
"message": fmt.Sprintf("Project with ID %s not found", body.ProjectID),
"code": fiber.StatusNotFound,
})
// Create SSE channel
sseChan := make(chan SSEData, 10)

// Helper to send SSE events
sendEvent := func(eventType, message string, data any) {
eventPayload := SSEEvent{
Type: eventType,
Message: message,
Data: data,
}
h.logger.Error("Error fetching project", zap.Error(err), zap.String("project_id", body.ProjectID))
return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error validating project",
"code": fiber.StatusInternalServerError,
})
payloadBytes, _ := json.Marshal(eventPayload)
sseMessage := fmt.Sprintf("data: %s\n\n", payloadBytes)
sseChan <- SSEData{Data: []byte(sseMessage)}
}

h.logger.Info("Starting file upload", zap.String("file_path", body.FilePath), zap.String("project_id", project.ID))
// Helper to handle failures
handleFailure := func(failErr error) {
errMsg := failErr.Error()
errorPayload, _ := json.Marshal(map[string]string{"type": "error", "message": errMsg})
errorMsg := fmt.Sprintf("event: error\ndata: %s\n\n", errorPayload)
sseChan <- SSEData{Data: []byte(errorMsg)}
}

// Upload file to OLAP service
res, err := h.olapSvc.IngestS3File(nil, ctx.Context(), body.FilePath, "", body.AlterColumnNames, body.IgnoreErrors)
if err != nil {
h.logger.Error("Error uploading file to OLAP service", zap.Error(err), zap.String("file_path", body.FilePath))
// Start async upload process
go func() {
defer close(sseChan)
ctxBg := context.Background()

// For S3 upload failures, return a more specific error
return ctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": err.Error(),
"message": "Failed to upload file from S3. Please check if the file exists and you have proper access.",
"code": fiber.StatusBadRequest,
})
}
sendEvent("status_update", "Validating project...", nil)

// Initialize cleanup resource object
cleanup := resourceCleanup{
tableName: res.TableName,
}
// Check if project exists
project, err := h.projectSvc.Details(body.ProjectID, orgID)
if err != nil {
if domain.IsStoreError(err) && err == domain.ErrRecordNotFound {
h.logger.Error("Project not found", zap.Error(err), zap.String("project_id", body.ProjectID))
handleFailure(fmt.Errorf("project with ID %s not found", body.ProjectID))
return
}
h.logger.Error("Error fetching project", zap.Error(err), zap.String("project_id", body.ProjectID))
handleFailure(fmt.Errorf("error validating project: %w", err))
return
}

count, columns, err := h.getMetrics(res.TableName)
if err != nil {
h.logger.Error("Error fetching dataset metrics", zap.Error(err), zap.String("table_name", res.TableName))
h.cleanupResources(cleanup)
return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error fetching dataset metrics",
"code": fiber.StatusInternalServerError,
})
}
h.logger.Info("Starting file upload", zap.String("file_path", body.FilePath), zap.String("project_id", project.ID))

dataset, err := h.datasetSvc.Create(&models.CreateDatasetParams{
Name: res.TableName,
Description: body.Description,
ProjectID: project.ID,
Columns: columns,
FilePath: res.FilePath,
RowCount: count,
Size: res.Size,
Alias: body.Alias,
CreatedBy: userID,
UpdatedBy: userID,
Source: "file",
OrgID: orgID,
CustomPrompt: body.CustomPrompt,
})
if err != nil {
h.logger.Error("Error creating dataset record", zap.Error(err))
h.cleanupResources(cleanup)
return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error creating dataset record",
"code": fiber.StatusInternalServerError,
})
}
sendEvent("status_update", "Ingesting file from S3...", nil)

// Upload file to OLAP service
res, err := h.olapSvc.IngestS3File(nil, ctxBg, body.FilePath, "", body.AlterColumnNames, body.IgnoreErrors)
if err != nil {
h.logger.Error("Error uploading file to OLAP service", zap.Error(err), zap.String("file_path", body.FilePath))
handleFailure(fmt.Errorf("failed to upload file from S3: %w", err))
return
}

// Update cleanup object to include dataset info
cleanup.hasDataset = true
cleanup.datasetID = dataset.ID
cleanup.orgID = dataset.OrgID
// Initialize cleanup resource object
cleanup := resourceCleanup{
tableName: res.TableName,
}

datasetSummary, err := h.olapSvc.GetDatasetSummary(res.TableName)
if err != nil {
h.logger.Error("Error fetching dataset summary", zap.Error(err))
h.cleanupResources(cleanup)
return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error fetching dataset summary",
"code": fiber.StatusInternalServerError,
sendEvent("status_update", "Fetching dataset metrics...", nil)

count, columns, err := h.getMetrics(res.TableName)
if err != nil {
h.logger.Error("Error fetching dataset metrics", zap.Error(err), zap.String("table_name", res.TableName))
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error fetching dataset metrics: %w", err))
return
}

sendEvent("status_update", "Creating dataset record...", nil)

dataset, err := h.datasetSvc.Create(&models.CreateDatasetParams{
Name: res.TableName,
Description: body.Description,
ProjectID: project.ID,
Columns: columns,
FilePath: res.FilePath,
RowCount: count,
Size: res.Size,
Alias: body.Alias,
CreatedBy: userID,
UpdatedBy: userID,
Source: "file",
OrgID: orgID,
CustomPrompt: body.CustomPrompt,
})
}
if err != nil {
h.logger.Error("Error creating dataset record", zap.Error(err))
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error creating dataset record: %w", err))
return
}

// Update cleanup object to include dataset info
cleanup.hasDataset = true
cleanup.datasetID = dataset.ID
cleanup.orgID = dataset.OrgID

if datasetSummary != nil {
summaryMap := make(map[string]int)
for i := range *datasetSummary {
summaryMap[(*datasetSummary)[i].ColumnName] = i
sendEvent("status_update", "Generating dataset summary...", nil)

datasetSummary, err := h.olapSvc.GetDatasetSummary(res.TableName)
if err != nil {
h.logger.Error("Error fetching dataset summary", zap.Error(err))
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error fetching dataset summary: %w", err))
return
}

for colName, desc := range body.ColumnDescriptions {
if desc != "" {
if idx, exists := summaryMap[colName]; exists {
(*datasetSummary)[idx].Description = desc
if datasetSummary != nil {
summaryMap := make(map[string]int)
for i := range *datasetSummary {
summaryMap[(*datasetSummary)[i].ColumnName] = i
}

for colName, desc := range body.ColumnDescriptions {
if desc != "" {
if idx, exists := summaryMap[colName]; exists {
(*datasetSummary)[idx].Description = desc
}
}
}
}
}

summary, err := h.datasetSvc.CreateDatasetSummary(res.TableName, datasetSummary)
if err != nil {
h.logger.Error("Error creating dataset summary", zap.Error(err))
h.cleanupResources(cleanup)
return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error creating dataset summary",
"code": fiber.StatusInternalServerError,
})
}
sendEvent("status_update", "Saving dataset summary...", nil)

// Update cleanup object to include summary info
cleanup.hasSummary = true
summary, err := h.datasetSvc.CreateDatasetSummary(res.TableName, datasetSummary)
if err != nil {
h.logger.Error("Error creating dataset summary", zap.Error(err))
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error creating dataset summary: %w", err))
return
}

err = h.aiAgentSvc.UploadSchema(&models.SchemaParams{
DatasetID: dataset.ID,
ProjectID: project.ID,
})
if err != nil {
h.logger.Error("Error uploading schema to AI agent", zap.Error(err))
h.cleanupResources(cleanup)
return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error uploading schema to AI agent",
"code": fiber.StatusInternalServerError,
// Update cleanup object to include summary info
cleanup.hasSummary = true

sendEvent("status_update", "Uploading schema to AI agent...", nil)

err = h.aiAgentSvc.UploadSchema(&models.SchemaParams{
DatasetID: dataset.ID,
ProjectID: project.ID,
})
}
if err != nil {
h.logger.Error("Error uploading schema to AI agent", zap.Error(err))
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error uploading schema to AI agent: %w", err))
return
}

h.logger.Info("File upload completed successfully",
zap.String("dataset_id", dataset.ID),
zap.String("project_id", project.ID))
h.logger.Info("File upload completed successfully",
zap.String("dataset_id", dataset.ID),
zap.String("project_id", project.ID))

// Return success response
return ctx.Status(fiber.StatusCreated).JSON(map[string]any{
"data": map[string]any{
// Send completion event with dataset and summary
sendEvent("complete", "Dataset created successfully", map[string]any{
"dataset": dataset,
"summary": summary,
},
})
}()

// Set SSE headers
ctx.Set("Content-Type", "text/event-stream")
ctx.Set("Cache-Control", "no-cache")
ctx.Set("Connection", "keep-alive")
ctx.Set("Transfer-Encoding", "chunked")

// Stream SSE events to client
ctx.Response().SetBodyStreamWriter(func(w *bufio.Writer) {
for sse := range sseChan {
if sse.Error != nil {
h.logger.Error("Error received from stream source", zap.Error(sse.Error))
return
}

if _, err := w.Write(sse.Data); err != nil {
h.logger.Error("Error writing to client stream", zap.Error(err))
return
}

if err := w.Flush(); err != nil {
h.logger.Error("Error flushing client stream", zap.Error(err))
return
}
}
})

return nil
}
Loading