Skip to content

Commit

Permalink
SSE htmx example (#435)
Browse files Browse the repository at this point in the history
* SSE htmx example
  • Loading branch information
m110 authored Jun 12, 2024
1 parent a91dc03 commit 57df621
Show file tree
Hide file tree
Showing 18 changed files with 2,097 additions and 49 deletions.
11 changes: 11 additions & 0 deletions _examples/real-world-examples/server-sent-events-htmx/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM golang:1.22 AS builder

COPY . /src
WORKDIR /src/

RUN CGO_ENABLED=0 go build -ldflags="-s -w" -trimpath -o /main .

FROM alpine
RUN apk add --no-cache ca-certificates
COPY --from=builder /main /main
CMD ["/main"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
version: '3.7'
services:
server:
build:
context: docker
volumes:
- ./:/src
- go_pkg:/go/pkg
- go_cache:/go-cache
working_dir: /src
ports:
- '8080:8080'
environment:
- PORT=8080
- DATABASE_URL=postgres://postgres:postgres@postgres:5432/postgres?sslmode=disable
- PUBSUB_PROJECT_ID=local
- PUBSUB_EMULATOR_HOST=pubsub:8681
restart: unless-stopped
networks:
- sse

postgres:
image: postgres:15
restart: unless-stopped
environment:
- POSTGRES_PASSWORD=postgres
ports:
- 5432:5432
networks:
- sse

pubsub:
image: messagebird/gcloud-pubsub-emulator:latest
restart: unless-stopped
ports:
- '8681:8681'
networks:
- sse

networks:
sse:

volumes:
go_pkg:
go_cache:
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM golang:1.22

RUN go install github.com/cespare/reflex@latest
RUN go install github.com/a-h/templ/cmd/templ@latest

COPY reflex.conf /

ENTRYPOINT ["/go/bin/reflex", "-c", "/reflex.conf"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-r '(\.go$|go\.mod$)' -s go run .
-r '\.templ$' templ generate
171 changes: 171 additions & 0 deletions _examples/real-world-examples/server-sent-events-htmx/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package main

import (
"context"
"fmt"
"time"

"cloud.google.com/go/pubsub"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
"github.com/ThreeDotsLabs/watermill-http/v2/pkg/http"
"github.com/ThreeDotsLabs/watermill/components/cqrs"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
)

type PostViewed struct {
PostID int `json:"post_id"`
}

type PostReactionAdded struct {
PostID int `json:"post_id"`
ReactionID string `json:"reaction_id"`
}

type PostStatsUpdated struct {
PostID int `json:"post_id"`
ViewsUpdated bool `json:"views_updated"`
ReactionUpdated *string `json:"reaction_updated"`
}

type Routers struct {
EventsRouter *message.Router
SSERouter http.SSERouter
EventBus *cqrs.EventBus
}

func NewRouters(cfg config, repo *Repository) (Routers, error) {
logger := watermill.NewStdLogger(false, false)

publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: cfg.PubSubProjectID,
},
logger,
)
if err != nil {
return Routers{}, err
}

eventBus, err := cqrs.NewEventBusWithConfig(
publisher,
cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
return params.EventName, nil
},
Marshaler: cqrs.JSONMarshaler{},
Logger: logger,
},
)
if err != nil {
return Routers{}, err
}

eventsRouter, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
return Routers{}, err
}

eventsRouter.AddMiddleware(middleware.Recoverer)

eventProcessor, err := cqrs.NewEventProcessorWithConfig(
eventsRouter,
cqrs.EventProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
return params.EventName, nil
},
SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return googlecloud.NewSubscriber(
googlecloud.SubscriberConfig{
ProjectID: cfg.PubSubProjectID,
GenerateSubscriptionName: func(topic string) string {
return fmt.Sprintf("%v_%v", topic, params.HandlerName)
},
},
logger,
)
},
Marshaler: cqrs.JSONMarshaler{},
Logger: logger,
},
)
if err != nil {
return Routers{}, err
}

err = eventProcessor.AddHandlers(
cqrs.NewEventHandler(
"UpdateViews",
func(ctx context.Context, event *PostViewed) error {
err = repo.UpdatePost(ctx, event.PostID, func(post *Post) {
post.Views++
})
if err != nil {
return err
}

statsUpdated := PostStatsUpdated{
PostID: event.PostID,
ViewsUpdated: true,
}

return eventBus.Publish(ctx, statsUpdated)
},
),
cqrs.NewEventHandler(
"UpdateReactions",
func(ctx context.Context, event *PostReactionAdded) error {
err := repo.UpdatePost(ctx, event.PostID, func(post *Post) {
post.Reactions[event.ReactionID]++
})
if err != nil {
return err
}

statsUpdated := PostStatsUpdated{
PostID: event.PostID,
ReactionUpdated: &event.ReactionID,
}

return eventBus.Publish(ctx, statsUpdated)
},
),
)
if err != nil {
return Routers{}, err
}

sseSubscriber, err := googlecloud.NewSubscriber(
googlecloud.SubscriberConfig{
ProjectID: cfg.PubSubProjectID,
GenerateSubscriptionName: func(topic string) string {
return fmt.Sprintf("%v_%v", topic, watermill.NewShortUUID())
},
SubscriptionConfig: pubsub.SubscriptionConfig{
ExpirationPolicy: time.Hour * 24,
},
},
logger,
)
if err != nil {
return Routers{}, err
}

sseRouter, err := http.NewSSERouter(
http.SSERouterConfig{
UpstreamSubscriber: sseSubscriber,
Marshaler: http.StringSSEMarshaler{},
},
logger,
)
if err != nil {
return Routers{}, err
}

return Routers{
EventsRouter: eventsRouter,
SSERouter: sseRouter,
EventBus: eventBus,
}, nil
}
66 changes: 66 additions & 0 deletions _examples/real-world-examples/server-sent-events-htmx/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
module main

go 1.22.0

require (
cloud.google.com/go/pubsub v1.36.1
github.com/ThreeDotsLabs/watermill v1.3.5
github.com/ThreeDotsLabs/watermill-googlecloud v1.2.0
github.com/ThreeDotsLabs/watermill-http/v2 v2.2.0
github.com/a-h/templ v0.2.663
github.com/kelseyhightower/envconfig v1.4.0
github.com/labstack/echo/v4 v4.11.4
github.com/lib/pq v1.10.9
)

require (
cloud.google.com/go v0.112.1 // indirect
cloud.google.com/go/compute v1.24.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.7 // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-chi/chi v4.0.2+incompatible // indirect
github.com/go-chi/render v1.0.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/api v0.170.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240314234333-6e1732d8331c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240311132316-a219d84964c2 // indirect
google.golang.org/grpc v1.62.1 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
Loading

0 comments on commit 57df621

Please sign in to comment.