Skip to content

Commit

Permalink
fixed failing events
Browse files Browse the repository at this point in the history
Signed-off-by: freedisch <[email protected]>
  • Loading branch information
Freedisch committed Jan 11, 2024
1 parent becd1df commit bbd1022
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 136 deletions.
336 changes: 200 additions & 136 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,171 +1,235 @@
// main.go
package main

import (
"encoding/json"
"context"
"fmt"
"log"
"io/ioutil"
"net/http"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/gin-gonic/gin"
"gopkg.in/redis.v5"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/gorilla/mux"
)

var (
kafkaProducer sarama.SyncProducer
redisClient *redis.Client
mu sync.Mutex
kafkaProducer sarama.AsyncProducer
kafkaConsumer sarama.Consumer
redisClient *redis.Client
wg sync.WaitGroup
)

func main() {
// Setup Kafka producer
kafkaConfig := sarama.NewConfig()
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages
kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
kafkaConfig.Producer.Return.Successes = true // Enable success notifications

var err error
kafkaProducer, err = sarama.NewSyncProducer([]string{"localhost:29092"}, kafkaConfig)
if err != nil {
log.Fatalf("Error creating Kafka producer: %v", err)
}

// Setup Redis client
redisClient = redis.NewClient(&redis.Options{
Addr: "redis:6379",
Password: "",
DB: 0,
})

// Setup HTTP server
router := gin.Default()

router.POST("/pushData", pushDataHandler)
router.GET("/getData", getDataHandler)

err = router.Run(":8080")
if err != nil {
log.Fatalf("Error starting server: %v", err)
}
initializeKafkaProducer()
initializeKafkaConsumer()
initializeRedisClient()

router := mux.NewRouter()

router.HandleFunc("/pushdata", pushDataHandler).Methods("POST")
router.HandleFunc("/getdata", getDataHandler).Methods("GET")

port := 8080
fmt.Printf("Server is running on port %d...\n", port)
http.ListenAndServe(fmt.Sprintf(":%d", port), router)

// Close resources when the server shuts down
defer func() {
kafkaProducer.Close()
kafkaConsumer.Close()
redisClient.Close()
}()
}

func pushDataHandler(c *gin.Context) {
var requestData map[string]interface{}

if err := c.BindJSON(&requestData); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request payload"})
return
}
log.Printf("Invalid Kafka message: %s", requestData)
// Goroutine-1: Route to Kafka
go routeToKafka(requestData)
c.JSON(http.StatusOK, gin.H{"message": "Data received and pushed to Kafka"})
func generateUID() string {
// Implement a function to generate a UID (e.g., using a UUID library)
// Example: Using the `github.com/google/uuid` package
uid, err := uuid.NewRandom()
if err != nil {
panic(err)
}
return uid.String()
}
func initializeKafkaProducer() {
// Initialize Kafka producer
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true

p, err := sarama.NewAsyncProducer([]string{"localhost:29092"}, config)
if err != nil {
panic(err)
}
kafkaProducer = p

// Start a goroutine to handle successful and failed deliveries
wg.Add(1)
go handleKafkaProducerEvents()
}

func getDataHandler(c *gin.Context) {
// Goroutine-2: Route from Kafka to Redis
go routeFromKafkaToRedis()
func initializeKafkaConsumer() {
// Initialize Kafka consumer
config := sarama.NewConfig()
config.Consumer.Return.Errors = true

// Fetch data from Redis and Kafka, and deliver the response
data, err := fetchDataFromRedis()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Error fetching data from Redis"})
return
}
c, err := sarama.NewConsumer([]string{"localhost:29092"}, config)
if err != nil {
panic(err)
}
kafkaConsumer = c

c.JSON(http.StatusOK, data)
// Start a goroutine to consume messages from Kafka
wg.Add(1)
go consumeFromKafka()
}

func routeToKafka(data map[string]interface{}) {
mu.Lock()
defer mu.Unlock()

// Produce the data to Kafka
message := &sarama.ProducerMessage{
Topic: "messagetopic",
Value: sarama.StringEncoder(fmt.Sprintf("%v", data)),
}
log.Printf("Invalid Kafka message: %s", message.Value)
_, _, err := kafkaProducer.SendMessage(message)
if err != nil {
log.Printf("Error producing to Kafka: %v", err)
}
func initializeRedisClient() {
// Initialize Redis client
redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
// Add other Redis configuration options if needed
})
}

func routeFromKafkaToRedis() {
// Consume messages from Kafka
consumer, err := sarama.NewConsumer([]string{"localhost:29092"}, nil)
if err != nil {
log.Printf("Error creating Kafka consumer: %v", err)
return
}
//defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition("messagetopic", 0, sarama.OffsetOldest)
if err != nil {
log.Printf("Error creating partition consumer: %v", err)
return
}
//defer partitionConsumer.Close()

for {
select {
case msg := <-partitionConsumer.Messages():
// Goroutine-2: Process Kafka message and route to Redis
go processKafkaMessage(msg)
}
}
func pushDataHandler(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Error reading request body", http.StatusBadRequest)
return
}
uid := generateUID()
// Produce message to Kafka topic in a separate goroutine
wg.Add(1)
go produceToKafka(uid, body)

fmt.Fprint(w, "Data received successfully")
}

func processKafkaMessage(msg *sarama.ConsumerMessage) {
var data map[string]interface{}

// Assuming the message is a map
err := json.Unmarshal(msg.Value, &data)
if err != nil {
log.Printf("Error unmarshalling Kafka message: %v", err)
log.Printf("Invalid Kafka message: %s", string(msg.Value))
return
}

mu.Lock()
defer mu.Unlock()

// Store data in Redis
err = storeDataInRedis(data)
if err != nil {
log.Printf("Error storing data in Redis: %v", err)
}
func getDataHandler(w http.ResponseWriter, r *http.Request) {
// Retrieve data from Redis and send it to Kafka in a separate goroutine
wg.Add(1)
go retrieveDataAndSendToKafka(w)
}

func storeDataInRedis(data map[string]interface{}) error {
// Store data in Redis
// Assuming data has "uid" and "message" fields
uid := fmt.Sprintf("%v", data["uid"])
message := fmt.Sprintf("%v", data["message"])
func retrieveDataAndSendToKafka(w http.ResponseWriter) {
defer wg.Done()

// Retrieve all data from Redis
ctx := context.Background()
keys, err := redisClient.Keys(ctx, "*").Result()
if err != nil {
http.Error(w, "Error retrieving data from Redis", http.StatusInternalServerError)
return
}

// Send each data to Kafka
for _, key := range keys {
data, err := redisClient.Get(ctx, key).Result()
if err != nil {
fmt.Printf("Error retrieving data with key %s from Redis: %v\n", key, err)
continue
}

// Send the retrieved data to Kafka
wg.Add(1)
go produceToKafka(key, []byte(data))
}

// Deliver the response
fmt.Fprint(w, "All data retrieved and sent to Kafka successfully")
}

func produceToKafka(uid string, data []byte) {
defer wg.Done()

err := redisClient.HSet("message:", uid, message).Err()
return err
// Produce message to Kafka topic
message := &sarama.ProducerMessage{
Topic: "testTopic",
Key: sarama.StringEncoder(uid),
Value: sarama.StringEncoder(data),
}

// Asynchronously send the message to Kafka
kafkaProducer.Input() <- message
}

func fetchDataFromRedis() ([]map[string]string, error) {
mu.Lock()
defer mu.Unlock()
func handleKafkaProducerEvents() {
defer wg.Done()

for {
select {
case success := <-kafkaProducer.Successes():
fmt.Printf("Produced message to Kafka: %v\n", success)
case err := <-kafkaProducer.Errors():
fmt.Printf("Error producing message to Kafka: %v\n", err.Err)
}
}
}

// Fetch data from Redis
redisData, err := redisClient.HGetAll("message:").Result()
if err != nil {
return nil, err
}
func consumeFromKafka() {
defer wg.Done()

// Convert Redis data to the desired format
var result []map[string]string
for key, value := range redisData {
result = append(result, map[string]string{"uid": key, "message": value})
}
partitionConsumer, err := kafkaConsumer.ConsumePartition("testTopic", 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}

defer func() {
if err := partitionConsumer.Close(); err != nil {
panic(err)
}
}()

for {
// Consume messages from Kafka topic
msg := <-partitionConsumer.Messages()

// Process the received data
fmt.Printf("Received message from Kafka: %s\n", string(msg.Value))

// Route the data to Redis in a separate goroutine
wg.Add(1)
go routeToRedis(string(msg.Key),msg.Value)
}
}

func routeToRedis(uid string, data []byte) {
defer wg.Done()

// Route data to Redis
ctx := context.Background()
err := redisClient.Set(ctx, uid, string(data), 0).Err()
if err != nil {
fmt.Printf("Error routing data to Redis: %v\n", err)
}
}

return result, nil
func retrieveAllDataAndSendToKafka(w http.ResponseWriter) {
defer wg.Done()

// Retrieve all data from Redis
ctx := context.Background()
keys, err := redisClient.Keys(ctx, "*").Result()
if err != nil {
http.Error(w, "Error retrieving data from Redis", http.StatusInternalServerError)
return
}

// Send each data to Kafka
for _, key := range keys {
data, err := redisClient.Get(ctx, key).Result()
if err != nil {
fmt.Printf("Error retrieving data with key %s from Redis: %v\n", key, err)
continue
}

// Send the retrieved data to Kafka
wg.Add(1)
go produceToKafka(key, []byte(data))
}

// Deliver the response
fmt.Fprint(w, "All data retrieved and sent to Kafka successfully")
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/gin-gonic/gin v1.9.1
github.com/go-redis/redis/v8 v8.11.5
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.5.0
github.com/gorilla/mux v1.8.1
github.com/labstack/echo v3.3.10+incompatible
github.com/redis/go-redis/v9 v9.4.0 // indirect
golang.org/x/net v0.17.0 // indirect
Expand Down
Loading

0 comments on commit bbd1022

Please sign in to comment.