Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/kafka-and-rabbitmq #36

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM mcr.microsoft.com/devcontainers/go:1-1.23-bookworm@sha256:255137cbcf1d460a2daa57747d6525f0e26e4a3e6a25bb733bcb1c4412a0ca34

# Install MongoDB command line tools
ARG MONGO_TOOLS_VERSION=8.0
RUN . /etc/os-release \
&& curl -sSL "https://www.mongodb.org/static/pgp/server-${MONGO_TOOLS_VERSION}.asc" | gpg --dearmor > /usr/share/keyrings/mongodb-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/mongodb-archive-keyring.gpg] http://repo.mongodb.org/apt/debian bookworm/mongodb-org/${MONGO_TOOLS_VERSION} main" | tee /etc/apt/sources.list.d/mongodb-org-${MONGO_TOOLS_VERSION}.list \
&& apt-get update && export DEBIAN_FRONTEND=noninteractive \
&& apt-get install -y mongodb-mongosh \
&& if [ "$(dpkg --print-architecture)" = "amd64" ]; then apt-get install -y mongodb-database-tools; fi \
&& apt-get clean -y && rm -rf /var/lib/apt/lists/*
12 changes: 12 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"name": "Go & MongoDb & Kafka",
"dockerComposeFile": "docker-compose.yml",
"workspaceFolder": "/workspaces",
"service": "app",
"features": {
"ghcr.io/itsmechlark/features/rabbitmq-server:1": {
"version": "latest"
}
},
"postCreateCommand": "bash scripts/install-dependencies.sh"
}
53 changes: 53 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
version: '3'
services:
app:
build:
context: .
dockerfile: Dockerfile
volumes:
- ..:/workspaces:cached
command: sleep infinity
#network_mode: service:mongodb
networks:
- backend
mongodb:
image: mongo:latest
container_name: mongodb_container
ports:
- "27017:27017"
networks:
- backend
volumes:
- mongodb-data:/data/db
broker:
image: apache/kafka-native:latest
container_name: broker
ports:
- 9092:9092
networks:
- backend
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
queue:
image: rabbitmq:3-management
container_name: queue
ports:
- 5672:5672
- 15672:15672 # management ui
networks:
- backend
volumes:
mongodb-data:
networks:
backend:
109 changes: 109 additions & 0 deletions clients/kafka/kafka_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package kafka_client

import (
"context"
"os"
"stockbackend/types"
"strconv"
"time"

"encoding/json"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"go.uber.org/zap"
)

var (
KafkaProducer *kafka.Producer
KafkaAdminClient *kafka.AdminClient
)

func SendMessage(event types.StockbackendEvent) {
topic := os.Getenv("KAFKA_TOPIC")
message, err := json.Marshal(event)
if err != nil {
panic(err)
}

zap.L().Sugar().Infof("Sending message to kafka: %s", message)
err = KafkaProducer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(message),
}, nil)
if err != nil {
zap.L().Error("Error sending message to kafka: ", zap.Any("error", err.Error()))
}
}

func init() {
logger, err := zap.NewProductionConfig().Build()
if err != nil {
panic("oh noes!")
}

zap.ReplaceGlobals(logger)

zap.L().Info("KAFKA_BOOTSTRAPSERVERS: ", zap.String("uri", os.Getenv("KAFKA_BOOTSTRAPSERVERS")))

NewProducer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": os.Getenv("KAFKA_BOOTSTRAPSERVERS"),
"client.id": "myProducer",
"acks": "all",
})
if err != nil {
zap.L().Error("Kafka Producer initialization failed: ", zap.Any("error", err.Error()))
}
KafkaProducer = NewProducer

NewAdminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": os.Getenv("KAFKA_BOOTSTRAPSERVERS"),
})
if err != nil {
zap.L().Error("Kafka Producer initialization failed: ", zap.Any("error", err.Error()))
}
KafkaAdminClient = NewAdminClient

// Delivery report handler for produced messages
go func() {
for e := range KafkaProducer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
zap.L().Error("Kafka Delivery failed: ", zap.Any("error", ev.TopicPartition.Error.Error()))
} else {
zap.L().Sugar().Infof("Delivered message to %s", *ev.TopicPartition.Topic)
}
}
}
}()

KafkaProducer.Flush(50)

topic := os.Getenv("KAFKA_TOPIC")
numParts, err := strconv.Atoi(os.Getenv("KAFKA_TOPIC_PARTITIONS"))

Check failure on line 83 in clients/kafka/kafka_client.go

View workflow job for this annotation

GitHub Actions / Static Code Analysis

this value of err is never used (SA4006)

Check failure on line 83 in clients/kafka/kafka_client.go

View workflow job for this annotation

GitHub Actions / Lint Code

SA4006: this value of `err` is never used (staticcheck)
replicationFactor, err := strconv.Atoi(os.Getenv("KAFKA_TOPIC_REPL_FACTOR"))

Check failure on line 84 in clients/kafka/kafka_client.go

View workflow job for this annotation

GitHub Actions / Static Code Analysis

this value of err is never used (SA4006)

Check failure on line 84 in clients/kafka/kafka_client.go

View workflow job for this annotation

GitHub Actions / Lint Code

SA4006: this value of `err` is never used (staticcheck)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create topics on cluster.
// Set Admin options to wait for the operation to finish (or at most 60s)
maxDur, err := time.ParseDuration("60s")
if err != nil {
panic("ParseDuration(60s)")
}
results, err := KafkaAdminClient.CreateTopics(
ctx,
// Multiple topics can be created simultaneously
// by providing more TopicSpecification structs here.
[]kafka.TopicSpecification{{
Topic: topic,
NumPartitions: numParts,
ReplicationFactor: replicationFactor}},
// Admin options
kafka.SetAdminOperationTimeout(maxDur))
if err != nil {
zap.L().Error("Failed to create topic: ", zap.Any("error", err.Error()))
}

zap.L().Sugar().Infof("Connected to Kafka %s", results)
}
1 change: 0 additions & 1 deletion clients/mongo/mongo_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ var (

func init() {
zap.L().Info("MONGO_URI: ", zap.String("uri", os.Getenv("MONGO_URI")))
zap.L().Info("CLOUDINARY_URL", zap.String("uri", os.Getenv("CLOUDINARY_URL")))

serverAPI := options.ServerAPI(options.ServerAPIVersion1)
mongoURI := os.Getenv("MONGO_URI")
Expand Down
111 changes: 111 additions & 0 deletions clients/rabbitmq/rabbitmq_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package rabbitmq_client

import (
"fmt"
"os"
"stockbackend/types"

"encoding/json"

"github.com/streadway/amqp"
"go.uber.org/zap"
)

var (
Connection *amqp.Connection
Channel *amqp.Channel
Queue amqp.Queue
)

// GetEnv retrieves the environment variable with a default value if not set.
func GetEnv(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return value
}

func Close() {
Channel.Close()
Connection.Close()
}

func SendMessage(event types.StockbackendEvent) {
message, err := json.Marshal(event)
if err != nil {
panic(err)
}

zap.L().Sugar().Infof("Sending message to rabbitmq: %s", message)

// 4. Publish a message to the queue
err = Channel.Publish(
"", // Exchange (empty means default)
Queue.Name, // Routing key (queue name in this case)
false, // Mandatory
false, // Immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(message),
})

if err != nil {
zap.L().Error("Error publishing message to rabbitmq: ", zap.Any("error", err.Error()))
return
}

zap.L().Info("Successfully sent message to rabbitmq.")
}

func init() {
logger, err := zap.NewProductionConfig().Build()
if err != nil {
panic("oh noes!")
}

zap.ReplaceGlobals(logger)

// 1. Connect to RabbitMQ server
rabbitServer := GetEnv("RABBITMQ_SERVER", "localhost")
rabbitPort := GetEnv("RABBITMQ_PORT", "5672")
rabbitUser := GetEnv("RABBITMQ_USER", "guest")
rabbitPass := GetEnv("RABBITMQ_PASS", "guest")

zap.L().Sugar().Infof("RabbitMQ Server: %s", rabbitServer)
zap.L().Sugar().Infof("RabbitMQ Port: %s", rabbitPort)
zap.L().Sugar().Infof("RabbitMQ User: %s", rabbitUser)
zap.L().Sugar().Debugf("RabbitMQ Pass: %s", rabbitPass)

newConn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", rabbitUser, rabbitPass, rabbitServer, rabbitPort))
if err != nil {
zap.L().Error("RabbitMQ initialization failed: ", zap.Any("error", err.Error()))
}
Connection = newConn

// 2. Create a channel
ch, err := Connection.Channel()
if err != nil {
zap.L().Error("RabbitMQ - Failed to open a channel: ", zap.Any("error", err.Error()))
}

Channel = ch

// 3. Declare a queue to ensure it exists before publishing messages
queueName := "stockbackend"
q, err := ch.QueueDeclare(
queueName, // Name of the queue
true, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
zap.L().Error("RabbitMQ - Failed to declare a queue: ", zap.Any("error", err.Error()))
}

Queue = q

zap.L().Info("Connected to RabbitMQ.")
}
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ go 1.23.0
require (
github.com/PuerkitoBio/goquery v1.10.0
github.com/cloudinary/cloudinary-go/v2 v2.9.0
github.com/confluentinc/confluent-kafka-go/v2 v2.5.4
github.com/getsentry/sentry-go v0.29.0
github.com/gin-gonic/gin v1.10.0
github.com/google/uuid v1.6.0
github.com/streadway/amqp v1.1.0
github.com/xuri/excelize/v2 v2.8.1
go.mongodb.org/mongo-driver v1.17.1
go.uber.org/zap v1.27.0
Expand All @@ -32,8 +34,9 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/schema v1.4.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -43,6 +46,7 @@ require (
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/richardlehane/mscfb v1.0.4 // indirect
github.com/richardlehane/msoleps v1.0.3 // indirect
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
Expand All @@ -58,5 +62,6 @@ require (
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading