Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
452 changes: 252 additions & 200 deletions server/interfaces/http/routes/source/database/create.go

Large diffs are not rendered by default.

390 changes: 213 additions & 177 deletions server/interfaces/http/routes/source/database/refresh.go

Large diffs are not rendered by default.

304 changes: 182 additions & 122 deletions server/interfaces/http/routes/source/s3/create.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package s3

import (
"bufio"
"context"
"encoding/json"
"fmt"

"github.com/factly/gopie/domain"
Expand All @@ -11,6 +14,19 @@ import (
"go.uber.org/zap"
)

// SSEEvent represents a server-sent event for S3 upload progress
type SSEEvent struct {
Type string `json:"type"`
Message string `json:"message"`
Data any `json:"data,omitempty"`
}

// SSEData holds either data or error for SSE channel
type SSEData struct {
Data []byte
Error error
}

// uploadRequestBody represents the request body for uploading a file from S3
// @Description Request body for uploading a file from S3
type uploadRequestBody struct {
Expand Down Expand Up @@ -71,12 +87,12 @@ func (h *httpHandler) cleanupResources(rc resourceCleanup) {
}

// @Summary Upload file from S3
// @Description Upload a file from S3 and create a new dataset
// @Description Upload a file from S3 and create a new dataset with SSE progress updates
// @Tags s3
// @Accept json
// @Produce json
// @Produce text/event-stream
// @Param body body uploadRequestBody true "Upload request parameters"
// @Success 201 {object} responses.SuccessResponse{data=map[string]interface{}{"dataset":models.Dataset,"summary":any}}
// @Success 200 {string} string "SSE stream of upload progress"
// @Failure 400 {object} responses.ErrorResponse "Invalid request body or S3 file access error"
// @Failure 404 {object} responses.ErrorResponse "Project not found"
// @Failure 500 {object} responses.ErrorResponse "Internal server error"
Expand All @@ -92,6 +108,7 @@ func (h *httpHandler) upload(ctx *fiber.Ctx) error {
"code": fiber.StatusForbidden,
})
}

// Get request body from context
body := uploadRequestBody{
IgnoreErrors: true,
Expand All @@ -114,149 +131,192 @@ func (h *httpHandler) upload(ctx *fiber.Ctx) error {
})
}

// Check if project exists
project, err := h.projectSvc.Details(body.ProjectID, orgID)
if err != nil {
if domain.IsStoreError(err) && err == domain.ErrRecordNotFound {
h.logger.Error("Project not found", zap.Error(err), zap.String("project_id", body.ProjectID))
return ctx.Status(fiber.StatusNotFound).JSON(fiber.Map{
"error": "Project not found",
"message": fmt.Sprintf("Project with ID %s not found", body.ProjectID),
"code": fiber.StatusNotFound,
})
// Create SSE channel
sseChan := make(chan SSEData, 10)

// Helper to send SSE events
sendEvent := func(eventType, message string, data any) {
eventPayload := SSEEvent{
Type: eventType,
Message: message,
Data: data,
}
h.logger.Error("Error fetching project", zap.Error(err), zap.String("project_id", body.ProjectID))
return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error validating project",
"code": fiber.StatusInternalServerError,
})
payloadBytes, _ := json.Marshal(eventPayload)
sseMessage := fmt.Sprintf("data: %s\n\n", payloadBytes)
sseChan <- SSEData{Data: []byte(sseMessage)}
}

h.logger.Info("Starting file upload", zap.String("file_path", body.FilePath), zap.String("project_id", project.ID))
// Helper to handle failures
handleFailure := func(failErr error) {
errMsg := failErr.Error()
errorPayload, _ := json.Marshal(map[string]string{"type": "error", "message": errMsg})
errorMsg := fmt.Sprintf("event: error\ndata: %s\n\n", errorPayload)
sseChan <- SSEData{Data: []byte(errorMsg)}
}

// Upload file to OLAP service
res, err := h.olapSvc.IngestS3File(nil, ctx.Context(), body.FilePath, "", body.AlterColumnNames, body.IgnoreErrors)
if err != nil {
h.logger.Error("Error uploading file to OLAP service", zap.Error(err), zap.String("file_path", body.FilePath))
// Start async upload process
go func() {
defer close(sseChan)
ctxBg := context.Background()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use request context instead of context.Background().

Using context.Background() loses the request's cancellation signal, deadlines, and tracing context. Pass ctx.Context() (or a derived context) to preserve these capabilities and enable cancellation on client disconnect.

-		ctxBg := context.Background()
+		reqCtx := ctx.Context()

Then use reqCtx for IngestS3File and other context-aware calls.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In server/interfaces/http/routes/source/s3/create.go around line 160, replace
the use of context.Background() with the incoming request context (e.g., reqCtx
:= req.Context() or a derived context) so the request's cancellation, deadlines,
and tracing are preserved; then pass reqCtx to IngestS3File and any other
context-aware calls instead of ctxBg.


// For S3 upload failures, return a more specific error
return ctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": err.Error(),
"message": "Failed to upload file from S3. Please check if the file exists and you have proper access.",
"code": fiber.StatusBadRequest,
})
}
sendEvent("status_update", "Validating project...", nil)

// Initialize cleanup resource object
cleanup := resourceCleanup{
tableName: res.TableName,
}
// Check if project exists
project, err := h.projectSvc.Details(body.ProjectID, orgID)
if err != nil {
if domain.IsStoreError(err) && err == domain.ErrRecordNotFound {
h.logger.Error("Project not found", zap.Error(err), zap.String("project_id", body.ProjectID))
handleFailure(fmt.Errorf("project with ID %s not found", body.ProjectID))
return
}
h.logger.Error("Error fetching project", zap.Error(err), zap.String("project_id", body.ProjectID))
handleFailure(fmt.Errorf("error validating project: %w", err))
return
}

count, columns, err := h.getMetrics(res.TableName)
if err != nil {
h.logger.Error("Error fetching dataset metrics", zap.Error(err), zap.String("table_name", res.TableName))
h.cleanupResources(cleanup)
return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error fetching dataset metrics",
"code": fiber.StatusInternalServerError,
})
}
h.logger.Info("Starting file upload", zap.String("file_path", body.FilePath), zap.String("project_id", project.ID))

dataset, err := h.datasetSvc.Create(&models.CreateDatasetParams{
Name: res.TableName,
Description: body.Description,
ProjectID: project.ID,
Columns: columns,
FilePath: res.FilePath,
RowCount: count,
Size: res.Size,
Alias: body.Alias,
CreatedBy: userID,
UpdatedBy: userID,
Source: "file",
OrgID: orgID,
CustomPrompt: body.CustomPrompt,
})
if err != nil {
h.logger.Error("Error creating dataset record", zap.Error(err))
h.cleanupResources(cleanup)
return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error creating dataset record",
"code": fiber.StatusInternalServerError,
})
}
sendEvent("status_update", "Ingesting file from S3...", nil)

// Upload file to OLAP service
res, err := h.olapSvc.IngestS3File(nil, ctxBg, body.FilePath, "", body.AlterColumnNames, body.IgnoreErrors)
if err != nil {
h.logger.Error("Error uploading file to OLAP service", zap.Error(err), zap.String("file_path", body.FilePath))
handleFailure(fmt.Errorf("failed to upload file from S3: %w", err))
return
}

// Update cleanup object to include dataset info
cleanup.hasDataset = true
cleanup.datasetID = dataset.ID
cleanup.orgID = dataset.OrgID
// Initialize cleanup resource object
cleanup := resourceCleanup{
tableName: res.TableName,
}

datasetSummary, err := h.olapSvc.GetDatasetSummary(res.TableName)
if err != nil {
h.logger.Error("Error fetching dataset summary", zap.Error(err))
h.cleanupResources(cleanup)
return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error fetching dataset summary",
"code": fiber.StatusInternalServerError,
sendEvent("status_update", "Fetching dataset metrics...", nil)

count, columns, err := h.getMetrics(res.TableName)
if err != nil {
h.logger.Error("Error fetching dataset metrics", zap.Error(err), zap.String("table_name", res.TableName))
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error fetching dataset metrics: %w", err))
return
}

sendEvent("status_update", "Creating dataset record...", nil)

dataset, err := h.datasetSvc.Create(&models.CreateDatasetParams{
Name: res.TableName,
Description: body.Description,
ProjectID: project.ID,
Columns: columns,
FilePath: res.FilePath,
RowCount: count,
Size: res.Size,
Alias: body.Alias,
CreatedBy: userID,
UpdatedBy: userID,
Source: "file",
OrgID: orgID,
CustomPrompt: body.CustomPrompt,
})
}
if err != nil {
h.logger.Error("Error creating dataset record", zap.Error(err))
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error creating dataset record: %w", err))
return
}

// Update cleanup object to include dataset info
cleanup.hasDataset = true
cleanup.datasetID = dataset.ID
cleanup.orgID = dataset.OrgID

if datasetSummary != nil {
summaryMap := make(map[string]int)
for i := range *datasetSummary {
summaryMap[(*datasetSummary)[i].ColumnName] = i
sendEvent("status_update", "Generating dataset summary...", nil)

datasetSummary, err := h.olapSvc.GetDatasetSummary(res.TableName)
if err != nil {
h.logger.Error("Error fetching dataset summary", zap.Error(err))
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error fetching dataset summary: %w", err))
return
}

for colName, desc := range body.ColumnDescriptions {
if desc != "" {
if idx, exists := summaryMap[colName]; exists {
(*datasetSummary)[idx].Description = desc
if datasetSummary != nil {
summaryMap := make(map[string]int)
for i := range *datasetSummary {
summaryMap[(*datasetSummary)[i].ColumnName] = i
}

for colName, desc := range body.ColumnDescriptions {
if desc != "" {
if idx, exists := summaryMap[colName]; exists {
(*datasetSummary)[idx].Description = desc
}
}
}
}
}

summary, err := h.datasetSvc.CreateDatasetSummary(res.TableName, datasetSummary)
if err != nil {
h.logger.Error("Error creating dataset summary", zap.Error(err))
h.cleanupResources(cleanup)
return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error creating dataset summary",
"code": fiber.StatusInternalServerError,
})
}
sendEvent("status_update", "Saving dataset summary...", nil)

// Update cleanup object to include summary info
cleanup.hasSummary = true
summary, err := h.datasetSvc.CreateDatasetSummary(res.TableName, datasetSummary)
if err != nil {
h.logger.Error("Error creating dataset summary", zap.Error(err))
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error creating dataset summary: %w", err))
return
}

err = h.aiAgentSvc.UploadSchema(&models.SchemaParams{
DatasetID: dataset.ID,
ProjectID: project.ID,
})
if err != nil {
h.logger.Error("Error uploading schema to AI agent", zap.Error(err))
h.cleanupResources(cleanup)
return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error uploading schema to AI agent",
"code": fiber.StatusInternalServerError,
// Update cleanup object to include summary info
cleanup.hasSummary = true

sendEvent("status_update", "Uploading schema to AI agent...", nil)

err = h.aiAgentSvc.UploadSchema(&models.SchemaParams{
DatasetID: dataset.ID,
ProjectID: project.ID,
})
}
if err != nil {
h.logger.Error("Error uploading schema to AI agent", zap.Error(err))
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error uploading schema to AI agent: %w", err))
return
}

h.logger.Info("File upload completed successfully",
zap.String("dataset_id", dataset.ID),
zap.String("project_id", project.ID))
h.logger.Info("File upload completed successfully",
zap.String("dataset_id", dataset.ID),
zap.String("project_id", project.ID))

// Return success response
return ctx.Status(fiber.StatusCreated).JSON(map[string]any{
"data": map[string]any{
// Send completion event with dataset and summary
sendEvent("complete", "Dataset created successfully", map[string]any{
"dataset": dataset,
"summary": summary,
},
})
}()

// Set SSE headers
ctx.Set("Content-Type", "text/event-stream")
ctx.Set("Cache-Control", "no-cache")
ctx.Set("Connection", "keep-alive")
ctx.Set("Transfer-Encoding", "chunked")

// Stream SSE events to client
ctx.Response().SetBodyStreamWriter(func(w *bufio.Writer) {
for sse := range sseChan {
if sse.Error != nil {
h.logger.Error("Error received from stream source", zap.Error(sse.Error))
return
}

if _, err := w.Write(sse.Data); err != nil {
h.logger.Error("Error writing to client stream", zap.Error(err))
return
}

if err := w.Flush(); err != nil {
h.logger.Error("Error flushing client stream", zap.Error(err))
return
}
}
})

return nil
}
Loading