Skip to content

Commit

Permalink
Feature: Add RabbitMQ and Kafka Messaging for comparison
Browse files Browse the repository at this point in the history
  • Loading branch information
selamanse committed Oct 31, 2024
1 parent 2bd3b24 commit f1017fe
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 27 deletions.
8 changes: 8 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ services:
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
queue:
image: rabbitmq:3-management
container_name: queue
ports:
- 5672:5672
- 15672:15672 # management ui
networks:
- backend
volumes:
mongodb-data:
networks:
Expand Down
2 changes: 1 addition & 1 deletion clients/kafka/kafka_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func init() {
if ev.TopicPartition.Error != nil {
zap.L().Error("Kafka Delivery failed: ", zap.Any("error", ev.TopicPartition.Error.Error()))
} else {
zap.L().Sugar().Infof("Delivered message to %s", ev.TopicPartition.Topic)
zap.L().Sugar().Infof("Delivered message to %s", *ev.TopicPartition.Topic)
}
}
}
Expand Down
111 changes: 111 additions & 0 deletions clients/rabbitmq/rabbitmq_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package rabbitmq_client

import (
"fmt"
"os"
"stockbackend/types"

"encoding/json"

"github.com/streadway/amqp"
"go.uber.org/zap"
)

var (
Connection *amqp.Connection
Channel *amqp.Channel
Queue amqp.Queue
)

// GetEnv retrieves the environment variable with a default value if not set.
func GetEnv(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return value
}

func Close() {
Channel.Close()
Connection.Close()
}

func SendMessage(event types.StockbackendEvent) {
message, err := json.Marshal(event)
if err != nil {
panic(err)
}

zap.L().Sugar().Infof("Sending message to rabbitmq: %s", message)

// 4. Publish a message to the queue
err = Channel.Publish(
"", // Exchange (empty means default)
Queue.Name, // Routing key (queue name in this case)
false, // Mandatory
false, // Immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(message),
})

if err != nil {
zap.L().Error("Error publishing message to rabbitmq: ", zap.Any("error", err.Error()))
return
}

zap.L().Info("Successfully sent message to rabbitmq.")
}

func init() {
logger, err := zap.NewProductionConfig().Build()
if err != nil {
panic("oh noes!")
}

zap.ReplaceGlobals(logger)

// 1. Connect to RabbitMQ server
rabbitServer := GetEnv("RABBITMQ_SERVER", "localhost")
rabbitPort := GetEnv("RABBITMQ_PORT", "5672")
rabbitUser := GetEnv("RABBITMQ_USER", "guest")
rabbitPass := GetEnv("RABBITMQ_PASS", "guest")

zap.L().Sugar().Infof("RabbitMQ Server: %s", rabbitServer)
zap.L().Sugar().Infof("RabbitMQ Port: %s", rabbitPort)
zap.L().Sugar().Infof("RabbitMQ User: %s", rabbitUser)
zap.L().Sugar().Debugf("RabbitMQ Pass: %s", rabbitPass)

newConn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", rabbitUser, rabbitPass, rabbitServer, rabbitPort))
if err != nil {
zap.L().Error("RabbitMQ initialization failed: ", zap.Any("error", err.Error()))
}
Connection = newConn

// 2. Create a channel
ch, err := Connection.Channel()
if err != nil {
zap.L().Error("RabbitMQ - Failed to open a channel: ", zap.Any("error", err.Error()))
}

Channel = ch

// 3. Declare a queue to ensure it exists before publishing messages
queueName := "stockbackend"
q, err := ch.QueueDeclare(
queueName, // Name of the queue
true, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
zap.L().Error("RabbitMQ - Failed to declare a queue: ", zap.Any("error", err.Error()))
}

Queue = q

zap.L().Info("Connected to RabbitMQ.")
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/getsentry/sentry-go v0.29.0
github.com/gin-gonic/gin v1.10.0
github.com/google/uuid v1.6.0
github.com/streadway/amqp v1.1.0
github.com/xuri/excelize/v2 v2.8.1
go.mongodb.org/mongo-driver v1.17.1
go.uber.org/zap v1.27.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/exec"
"os/signal"
kafka_client "stockbackend/clients/kafka"
rabbitmq_client "stockbackend/clients/rabbitmq"
"stockbackend/middleware"
"stockbackend/routes"
"stockbackend/services"
Expand Down Expand Up @@ -71,6 +72,7 @@ func GracefulShutdown(server *http.Server, ticker, rankUpdater *time.Ticker) {
ticker.Stop()

// Stop the kafka producer
rabbitmq_client.Close()
kafka_client.KafkaProducer.Close()
rankUpdater.Stop()
// Create a context with a timeout for shutdown
Expand Down
2 changes: 2 additions & 0 deletions scripts/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export KAFKA_TOPIC="stockbackend"
export KAFKA_TOPIC_PARTITIONS="1"
export KAFKA_TOPIC_REPL_FACTOR="1"

export RABBITMQ_SERVER="queue"

export SENTRY_DSN=
export SENTRY_SAMPLE_RATE=1.0
export ENVIRONMENT=development
Expand Down
62 changes: 36 additions & 26 deletions services/file_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"fmt"
"os"
"stockbackend/clients/http_client"
kafka_client "stockbackend/clients/kafka"
kafka_client "stockbackend/clients/kafka"
mongo_client "stockbackend/clients/mongo"
rabbitmq_client "stockbackend/clients/rabbitmq"
"stockbackend/types"
"stockbackend/utils/constants"
"stockbackend/utils/helpers"
"strings"
Expand Down Expand Up @@ -36,12 +38,6 @@ func (fs *fileService) ParseXLSXFile(ctx *gin.Context, files <-chan string, sent
span := sentry.StartSpan(sentryCtx, "[DAO] ParseXLSXFile")
defer span.Finish()

cld, err := cloudinary.NewFromURL(os.Getenv("CLOUDINARY_URL"))
if err != nil {
sentry.CaptureException(err)
span.Status = sentry.SpanStatusInternalError
return fmt.Errorf("error initializing Cloudinary: %w", err)
}
for filePath := range files {
file, err := os.Open(filePath)
if err != nil {
Expand All @@ -56,23 +52,34 @@ func (fs *fileService) ParseXLSXFile(ctx *gin.Context, files <-chan string, sent
}
defer file.Close()

// Generate a UUID for the filename
uuid := uuid.New().String()
cloudinaryFilename := uuid + ".xlsx"
dbSpan1 := sentry.StartSpan(span.Context(), "[DB] Upload XLSX File")
// Upload file to Cloudinary
uploadResult, err := cld.Upload.Upload(ctx, file, uploader.UploadParams{
PublicID: cloudinaryFilename,
Folder: "xlsx_uploads",
})
dbSpan1.Finish()
if err != nil {
zap.L().Error("Error uploading file to Cloudinary", zap.String("filePath", filePath), zap.Error(err))
sentry.CaptureException(err)
continue
}
cloudinaryUrl, useCloudinary := os.LookupEnv("CLOUDINARY_URL")
if useCloudinary {

cld, err := cloudinary.NewFromURL(cloudinaryUrl)
if err != nil {
sentry.CaptureException(err)
span.Status = sentry.SpanStatusInternalError
return fmt.Errorf("error initializing Cloudinary: %w", err)
}

// Generate a UUID for the filename
cloudinaryFilename := uuid.New().String() + ".xlsx"
dbSpan1 := sentry.StartSpan(span.Context(), "[DB] Upload XLSX File")
// Upload file to Cloudinary
uploadResult, err := cld.Upload.Upload(ctx, file, uploader.UploadParams{
PublicID: cloudinaryFilename,
Folder: "xlsx_uploads",
})
dbSpan1.Finish()
if err != nil {
zap.L().Error("Error uploading file to Cloudinary", zap.String("filePath", filePath), zap.Error(err))
sentry.CaptureException(err)
continue
}

zap.L().Info("File uploaded to Cloudinary", zap.String("filePath", filePath), zap.String("url", uploadResult.SecureURL))
zap.L().Info("File uploaded to Cloudinary", zap.String("filePath", filePath), zap.String("url", uploadResult.SecureURL))

}

// Create a new reader from the uploaded file
if _, err := file.Seek(0, 0); err != nil {
Expand Down Expand Up @@ -218,12 +225,13 @@ func (fs *fileService) ParseXLSXFile(ctx *gin.Context, files <-chan string, sent
if err != nil {
zap.L().Error("Error finding document", zap.Error(err))
sentry.CaptureException(err)
event := types.StockbackendEvent{
event := types.StockbackendEvent{
EventType: "NoMongoDbDocFound",
Data: map[string]interface{}{"queryString": queryString},
CorrelationId: uuid.New().String(),
}
kafka_client.SendMessage(event)
rabbitmq_client.SendMessage(event)
continue
}
dbSpan3.Finish()
Expand All @@ -237,7 +245,8 @@ func (fs *fileService) ParseXLSXFile(ctx *gin.Context, files <-chan string, sent
stockDetail["marketCap"] = helpers.GetMarketCapCategory(fmt.Sprintf("%v", result["marketCap"]))
stockDetail["stockRate"] = helpers.RateStock(result)

stockFScore := helpers.GenerateFScore(result)
//stockFScore := helpers.GenerateFScore(result)
stockFScore := -1
if stockFScore < 0 {
stockDetail["fScore"] = "Not Available"
} else {
Expand All @@ -250,12 +259,13 @@ func (fs *fileService) ParseXLSXFile(ctx *gin.Context, files <-chan string, sent
if err != nil || len(results) == 0 {
zap.L().Error("No company found", zap.Error(err))
sentry.CaptureException(err)
event := types.StockbackendEvent{
event := types.StockbackendEvent{
EventType: "NoCompanyFound",
Data: stockDetail,
CorrelationId: fmt.Sprintf("%s", stockDetail["url"]),
}
kafka_client.SendMessage(event)
rabbitmq_client.SendMessage(event)
continue
}
dbSpan4.Finish()
Expand Down

0 comments on commit f1017fe

Please sign in to comment.