Skip to content

Commit

Permalink
Client created, using mongodb as read store
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrián committed Jun 28, 2019
1 parent 2c35d86 commit c127b16
Show file tree
Hide file tree
Showing 624 changed files with 120,818 additions and 1,427 deletions.
46 changes: 41 additions & 5 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 69 additions & 0 deletions cmd/go-event/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"
"encoding/json"
"github.com/adriancarayol/go-event/config/mongo"
"github.com/adriancarayol/go-event/config/rabbit"
"github.com/adriancarayol/go-event/pkg/dao"
"go.mongodb.org/mongo-driver/bson"
"log"
"time"
)

type Data struct {
Username string `json:"username"`
Email string `json:"email"`
Password string `json:"password"`
}

func main() {
rabbit.Init()
mongo.Init()

amqpConnection := rabbit.Get()
ch, err := amqpConnection.Channel()

if err != nil {
log.Fatalln("Fail creating channel")
}

channel, err := ch.Consume("user_events", "", false, false, false, false, nil)

forever := make(chan bool)

collection := mongo.Get().Database("testing").Collection("numbers")

go func() {
ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)

for d := range channel {
log.Printf("Received a message: %s", d.Body)
var event dao.Event
var data Data

err = json.Unmarshal(d.Body, &event)

if err != nil {
log.Printf("Error: %s", err)
}

err = json.Unmarshal([]byte(event.Data), &data)

if err != nil {
log.Printf("Error: %s", err)
}

res, err := collection.InsertOne(ctx, bson.M{"id": event.AggregateID.String(), "password": data.Password, "username": data.Username, "email": data.Email})

if err != nil {
log.Printf("Error: %s", err)
}

log.Printf("Inserted into mongo: %s", res)
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
6 changes: 6 additions & 0 deletions cmd/go-event/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package main

import (
"github.com/adriancarayol/go-event/config/db"
"github.com/adriancarayol/go-event/config/rabbit"
"github.com/adriancarayol/go-event/pkg/interfaces/registry"
"github.com/adriancarayol/go-event/pkg/usecases"
"log"
)

func main() {
db.Init()
rabbit.Init()

container, err := registry.NewContainer()
if err != nil {
Expand All @@ -21,4 +23,8 @@ func main() {
if err != nil {
log.Fatalf("Error registering user: %s", err)
}

if err := db.GetDB().Close(); err != nil {
log.Fatal("Error closing DB: %s", err)
}
}
41 changes: 41 additions & 0 deletions config/mongo/mongo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package mongo

import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"log"
"time"
)

var db *mongo.Client

func Init() {
client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://localhost:27017"))

if err != nil {
log.Fatalf("Error creating mongodb: %s", err)
}

// Connect the mongo client to the MongoDB server
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
err = client.Connect(ctx)

// Ping MongoDB
ctx, _ = context.WithTimeout(context.Background(), 10*time.Second)

if err = client.Ping(ctx, readpref.Primary()); err != nil {
fmt.Println("could not ping to mongo db service: %v\n", err)
return
}

fmt.Println("connected to nosql database:")

db = client
}

func Get() *mongo.Client {
return db
}
41 changes: 41 additions & 0 deletions config/rabbit/rabbit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package rabbit

import (
"github.com/streadway/amqp"
"log"
)

var amqpConnection *amqp.Connection

func configureQueues() {
ch, err := amqpConnection.Channel()
if err != nil {
log.Fatalf("Error creating channel: %s", err)
}

_, err = ch.QueueDeclare("user_events", true, true, false, false, nil)

if err != nil {
log.Fatalf("Error creating queue: %s", err)
}

if err := ch.Close(); err != nil {
log.Fatalf("Error closing rabbitmq channel: %s", err)
}
}

func Init() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil {
log.Fatalf("Error connecting with RabbitMQ: %s", err)
}

amqpConnection = conn

configureQueues()
}

func Get() *amqp.Connection {
return amqpConnection
}
Binary file added main
Binary file not shown.
14 changes: 14 additions & 0 deletions pkg/dao/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package dao

import (
"github.com/google/uuid"
)

type Event struct {
ID uuid.UUID
AggregateID uuid.UUID
AggregateType string
Type string
Version uint64
Data string
}
13 changes: 0 additions & 13 deletions pkg/dao/user.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
package dao

import (
"github.com/adriancarayol/go-event/pkg/domain/model"
"github.com/google/uuid"
)

type User struct {
ID uuid.UUID
Email string
Username string
}

func toUser(users []*model.User) []*User {
res := make([]*User, len(users))
for i, user := range users {
res[i] = &User{
ID: user.GetID(),
Email: user.GetEmail(),
Username: user.GetUsername(),
}
}
return res
}
9 changes: 9 additions & 0 deletions pkg/domain/eventbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package domain

import (
"github.com/adriancarayol/go-event/pkg/dao"
)

type EventBus interface {
Publish(event dao.Event) error
}
10 changes: 4 additions & 6 deletions pkg/domain/service/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ import (
)

type UserService struct {
repo repository.UserRepository
eventStore repository.EventStore
repo repository.UserRepository
}

func NewUserService(repo repository.UserRepository, eventStore repository.EventStore) *UserService {
func NewUserService(repo repository.UserRepository) *UserService {
return &UserService{
eventStore: eventStore,
repo: repo,
repo: repo,
}
}

Expand Down Expand Up @@ -44,4 +42,4 @@ func (s *UserService) CheckIfExistUsername(username string) error {
}

return nil
}
}
45 changes: 45 additions & 0 deletions pkg/interfaces/eventbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package interfaces

import (
"encoding/json"
"github.com/adriancarayol/go-event/config/rabbit"
"github.com/adriancarayol/go-event/pkg/dao"
"github.com/streadway/amqp"
"log"
)

type eventBusRabbitMQ struct {
amqp *amqp.Connection
}

func NewEventBus() *eventBusRabbitMQ {
return &eventBusRabbitMQ{
amqp: rabbit.Get(),
}
}

func (b *eventBusRabbitMQ) Publish(event dao.Event) error {
amqpConnection := rabbit.Get()
ch, err := amqpConnection.Channel()

if err != nil {
log.Printf("Error creating rabbitmq channel: %s", err)
return err
}

payload, err := json.Marshal(event)

if err != nil {
log.Printf("Error marshal event: %s", err)
return err
}

err = ch.Publish("", "user_events", false, false, amqp.Publishing{DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: payload})

if err != nil {
log.Printf("Error publishing message in rabbitmq: %s", err)
return err
}

return nil
}
5 changes: 3 additions & 2 deletions pkg/interfaces/registry/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (c *Container) Clean() error {
func buildUserUseCase(ctn di.Container) (interface{}, error) {
repo := interfaces.NewUserRepository()
eventStore := interfaces.NewEventStore()
userService := service.NewUserService(repo, eventStore)
return usecases.NewUserUsecase(repo, eventStore, userService), nil
eventBus := interfaces.NewEventBus()
userService := service.NewUserService(repo)
return usecases.NewUserUsecase(repo, eventStore, eventBus, userService), nil
}
Loading

0 comments on commit c127b16

Please sign in to comment.