Skip to content

Commit

Permalink
fixing consumer failing
Browse files Browse the repository at this point in the history
Signed-off-by: freedisch <[email protected]>
  • Loading branch information
Freedisch committed Jan 11, 2024
1 parent bde8348 commit becd1df
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 33 deletions.
186 changes: 156 additions & 30 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,171 @@
package main

import (
"errors"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"

"github.com/kafkago/db"
"github.com/kafkago/db/models"
"github.com/labstack/echo"
"github.com/redis/go-redis/v9"
"github.com/IBM/sarama"
"github.com/gin-gonic/gin"
"gopkg.in/redis.v5"
)

var (
kafkaProducer sarama.SyncProducer
redisClient *redis.Client
mu sync.Mutex
)

func main() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
// Setup Kafka producer
kafkaConfig := sarama.NewConfig()
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages
kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
kafkaConfig.Producer.Return.Successes = true // Enable success notifications

var err error
kafkaProducer, err = sarama.NewSyncProducer([]string{"localhost:29092"}, kafkaConfig)
if err != nil {
log.Fatalf("Error creating Kafka producer: %v", err)
}

// Setup Redis client
redisClient = redis.NewClient(&redis.Options{
Addr: "redis:6379",
Password: "",
DB: 0,
DB: 0,
})

messageService := db.NewRedis[models.Message](client)
http := echo.New()
// Setup HTTP server
router := gin.Default()

http.POST("/message", func(ctx echo.Context) error {
content := ctx.Request().PostFormValue("message")
message := models.Message{Message: content}
err := messageService.Save(ctx.Request().Context(), message)
if err != nil {
return ctx.String(500, err.Error())
}
return ctx.String(201, message.UID)
})
router.POST("/pushData", pushDataHandler)
router.GET("/getData", getDataHandler)

err = router.Run(":8080")
if err != nil {
log.Fatalf("Error starting server: %v", err)
}
}

func pushDataHandler(c *gin.Context) {
var requestData map[string]interface{}

if err := c.BindJSON(&requestData); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request payload"})
return
}
log.Printf("Invalid Kafka message: %s", requestData)
// Goroutine-1: Route to Kafka
go routeToKafka(requestData)
c.JSON(http.StatusOK, gin.H{"message": "Data received and pushed to Kafka"})
}

func getDataHandler(c *gin.Context) {
// Goroutine-2: Route from Kafka to Redis
go routeFromKafkaToRedis()

// Fetch data from Redis and Kafka, and deliver the response
data, err := fetchDataFromRedis()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Error fetching data from Redis"})
return
}

c.JSON(http.StatusOK, data)
}

func routeToKafka(data map[string]interface{}) {
mu.Lock()
defer mu.Unlock()

// Produce the data to Kafka
message := &sarama.ProducerMessage{
Topic: "messagetopic",
Value: sarama.StringEncoder(fmt.Sprintf("%v", data)),
}
log.Printf("Invalid Kafka message: %s", message.Value)
_, _, err := kafkaProducer.SendMessage(message)
if err != nil {
log.Printf("Error producing to Kafka: %v", err)
}
}

http.GET("/message/:uid", func(ctx echo.Context) error {
uid := ctx.Param("uid")
message, err := messageService.Get(ctx.Request().Context(), uid)
if errors.Is(err, redis.Nil) {
return ctx.String(404, "message not found")
} else if err != nil {
return ctx.String(500, err.Error())
func routeFromKafkaToRedis() {
// Consume messages from Kafka
consumer, err := sarama.NewConsumer([]string{"localhost:29092"}, nil)
if err != nil {
log.Printf("Error creating Kafka consumer: %v", err)
return
}
//defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition("messagetopic", 0, sarama.OffsetOldest)
if err != nil {
log.Printf("Error creating partition consumer: %v", err)
return
}
//defer partitionConsumer.Close()

for {
select {
case msg := <-partitionConsumer.Messages():
// Goroutine-2: Process Kafka message and route to Redis
go processKafkaMessage(msg)
}
return ctx.String(200, message.Message)
})
http.Logger.Fatal(http.Start(":1234"))

}
}

func processKafkaMessage(msg *sarama.ConsumerMessage) {
var data map[string]interface{}

// Assuming the message is a map
err := json.Unmarshal(msg.Value, &data)
if err != nil {
log.Printf("Error unmarshalling Kafka message: %v", err)
log.Printf("Invalid Kafka message: %s", string(msg.Value))
return
}

mu.Lock()
defer mu.Unlock()

// Store data in Redis
err = storeDataInRedis(data)
if err != nil {
log.Printf("Error storing data in Redis: %v", err)
}
}

func storeDataInRedis(data map[string]interface{}) error {
// Store data in Redis
// Assuming data has "uid" and "message" fields
uid := fmt.Sprintf("%v", data["uid"])
message := fmt.Sprintf("%v", data["message"])

err := redisClient.HSet("message:", uid, message).Err()
return err
}

func fetchDataFromRedis() ([]map[string]string, error) {
mu.Lock()
defer mu.Unlock()

// Fetch data from Redis
redisData, err := redisClient.HGetAll("message:").Result()
if err != nil {
return nil, err
}

// Convert Redis data to the desired format
var result []map[string]string
for key, value := range redisData {
result = append(result, map[string]string{"uid": key, "message": value})
}

return result, nil
}
43 changes: 40 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,61 @@ go 1.21.5
require github.com/google/s2a-go v0.1.7

require (
github.com/bytedance/sonic v1.9.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.14.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
github.com/IBM/sarama v1.42.1
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/gin-gonic/gin v1.9.1
github.com/golang/protobuf v1.5.3 // indirect
github.com/labstack/echo v3.3.10+incompatible
github.com/redis/go-redis/v9 v9.4.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/redis.v5 v5.2.9
)
Loading

0 comments on commit becd1df

Please sign in to comment.