Skip to content

Commit 81129ae

Browse files
Merge pull request #12 from rafabene/HYPERFLEET-641
HYPERFLEET-641: Add Health() method, optimize integration tests, fix goroutine leak
2 parents 8013d44 + e110ba8 commit 81129ae

16 files changed

Lines changed: 735 additions & 303 deletions

File tree

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ fmt:
1414
test:
1515
go test -v ./broker/... ./pkg/... -timeout 10m
1616

17-
# Run tests in the test folder
17+
# Run tests in the test folder (sequential packages: CI has 1 CPU so parallel execution causes timeouts)
1818
test-integration:
19-
go test -v ./test/integration/... -timeout 10m
19+
go test -v -p 1 ./test/integration/... -timeout 10m
2020

2121
# Run all tests
2222
test-all: test test-integration

README.md

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ The current implementation uses [Watermill](https://github.com/ThreeDotsLabs/wat
1313
- **Flexible Configuration**: YAML configuration files with environment variable overrides via Viper
1414
- **Worker Pools**: Configurable parallel message processing for subscribers
1515
- **Subscription Management**: Flexible subscription IDs for load balancing (shared subscriptions) or fanout (separate subscriptions)
16+
- **Health Checks**: Built-in `Health()` method on `Publisher` for readiness probes (per [HyperFleet Health Endpoints standard](https://github.com/openshift-hyperfleet/architecture/blob/main/hyperfleet/standards/health-endpoints.md))
1617
- **Simple API**: Clean, easy-to-use interface that hides Watermill complexity
1718

1819
## Installation
@@ -52,7 +53,7 @@ appLogger := logger.NewTestLogger()
5253
publisher, err := broker.NewPublisher(appLogger)
5354

5455
// Use JSON format for the default logger
55-
appLogger := logger.NewTestLogger(logger.FormatJSON)
56+
appLogger := logger.NewTestLogger(logger.WithFormat(logger.FormatJSON))
5657
publisher, err := broker.NewPublisher(appLogger)
5758

5859
// Use your own logger implementation with config
@@ -113,9 +114,23 @@ func main() {
113114

114115
</details>
115116

116-
Note for Google PubSub: The Google Pub/Sub publisher implementation (via Watermill/Google Cloud SDK) starts background goroutines (for batching, connection management, etc.).
117-
The app should call Close() to not leak
117+
Note for Google Pub/Sub: The Google Pub/Sub publisher implementation (via Watermill/Google Cloud SDK) starts background goroutines (for batching, connection management, etc.).
118+
The app should call `Close()` to avoid leaking these background goroutines and resources.
118119

120+
### Health Check
121+
122+
The `Publisher` interface exposes a `Health(ctx context.Context) error` method that consumers can use to verify broker connectivity, typically in `/readyz` endpoints:
123+
124+
```go
125+
if err := publisher.Health(ctx); err != nil {
126+
// Broker is unhealthy — return 503
127+
log.Printf("Broker health check failed: %v", err)
128+
}
129+
```
130+
131+
The health check implementation varies by broker:
132+
- **RabbitMQ**: Checks if the AMQP connection is open and not closed (in-memory, no network call)
133+
- **Google Pub/Sub**: Performs a lightweight `GetTopic` API call (3s timeout) on a non-existent probe topic to verify connectivity. A `NotFound` response confirms the round-trip succeeded. Requires the `pubsub.topics.get` permission, which is **not** included in `roles/pubsub.publisher`. You must grant an additional role such as `roles/pubsub.viewer`, `roles/pubsub.editor`, or a custom role containing `pubsub.topics.get`.
119134

120135
<details>
121136
<summary><strong>Subscriber Example</strong></summary>
@@ -456,9 +471,9 @@ The subscription ID concept enables two messaging patterns:
456471
- **Load Balancing**: Multiple subscribers with the same subscription ID share messages
457472
- **Fanout**: Subscribers with different subscription IDs each receive all messages
458473

459-
This is implemented consistently across brokers:
474+
This is implemented differently per broker:
460475
- **RabbitMQ**: Queue names are `{topic}-{subscriptionId}`
461-
- **Google Pub/Sub**: Subscription names are `{topic}-{subscriptionId}`
476+
- **Google Pub/Sub**: Subscription names are `{subscriptionId}`
462477

463478
### 4. Worker Pool Architecture
464479

broker/broker.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"io"
78
"strings"
89
"time"
910

11+
"cloud.google.com/go/pubsub/v2"
1012
"github.com/ThreeDotsLabs/watermill"
1113
"github.com/ThreeDotsLabs/watermill/message"
1214
"github.com/cloudevents/sdk-go/v2/event"
@@ -58,25 +60,42 @@ func NewPublisher(log logger.Logger, configMap ...map[string]string) (Publisher,
5860
}
5961

6062
var pub message.Publisher
63+
var hc healthCheckFunc
64+
var healthCloser io.Closer
6165

6266
switch cfg.Broker.Type {
6367
case "rabbitmq":
6468
pub, err = newRabbitMQPublisher(cfg, watermillLogger)
6569
if err != nil {
6670
return nil, fmt.Errorf("failed to create RabbitMQ publisher: %w", err)
6771
}
72+
hc = newRabbitMQHealthCheck(pub)
6873
case "googlepubsub":
6974
pub, err = newGooglePubSubPublisher(cfg, watermillLogger)
7075
if err != nil {
7176
return nil, fmt.Errorf("failed to create Google Pub/Sub publisher: %w", err)
7277
}
78+
79+
// Create a persistent Pub/Sub client for health checks, reused across calls.
80+
var healthClient *pubsub.Client
81+
healthClient, err = pubsub.NewClient(context.Background(), cfg.Broker.GooglePubSub.ProjectID)
82+
if err != nil {
83+
if closeErr := pub.Close(); closeErr != nil {
84+
return nil, fmt.Errorf("failed to create health check client: %w (also failed to close publisher: %v)", err, closeErr)
85+
}
86+
return nil, fmt.Errorf("failed to create health check client: %w", err)
87+
}
88+
hc = newGooglePubSubHealthCheck(healthClient, cfg.Broker.GooglePubSub.ProjectID)
89+
healthCloser = healthClient
7390
default:
7491
return nil, fmt.Errorf("unsupported broker type: %s", cfg.Broker.Type)
7592
}
7693

7794
return &publisher{
78-
pub: pub,
79-
logger: log,
95+
pub: pub,
96+
logger: log,
97+
healthCheck: hc,
98+
healthCloser: healthCloser,
8099
}, nil
81100
}
82101

broker/googlepubsub.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,36 @@ func parseGoogleCloudDuration(s string) (time.Duration, error) {
7777
return 0, fmt.Errorf("invalid duration format: %s", s)
7878
}
7979

80+
// healthCheckTopicName is a non-existent topic used as a connectivity probe.
81+
// GetTopic returning NotFound proves the round-trip to Pub/Sub succeeded.
82+
const healthCheckTopicName = "hyperfleet-health-probe"
83+
84+
// newGooglePubSubHealthCheck creates a health check function for a Google Pub/Sub publisher.
85+
// It reuses the provided pubsub.Client to perform a lightweight GetTopic API call
86+
// with a 3-second timeout to verify connectivity.
87+
// Requires pubsub.topics.get (not included in roles/pubsub.publisher; grant roles/pubsub.viewer or a custom role).
88+
// The caller is responsible for closing the client (via publisher.healthCloser).
89+
func newGooglePubSubHealthCheck(client *pubsub.Client, projectID string) healthCheckFunc {
90+
fullTopicName := fmt.Sprintf("projects/%s/topics/%s", projectID, healthCheckTopicName)
91+
92+
return func(ctx context.Context) error {
93+
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
94+
defer cancel()
95+
96+
_, err := client.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{
97+
Topic: fullTopicName,
98+
})
99+
if err != nil {
100+
// NotFound means the API is reachable — the topic just doesn't exist.
101+
if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
102+
return nil
103+
}
104+
return fmt.Errorf("google pub/sub health check failed: %w", err)
105+
}
106+
return nil
107+
}
108+
}
109+
80110
// newGooglePubSubPublisher creates a Google Pub/Sub publisher
81111
func newGooglePubSubPublisher(cfg *config, logger watermill.LoggerAdapter) (message.Publisher, error) {
82112
gps := cfg.Broker.GooglePubSub
@@ -161,8 +191,6 @@ func newGooglePubSubSubscriber(cfg *config, logger watermill.LoggerAdapter, subs
161191
}
162192

163193
// Configure subscription name generator to use subscription ID
164-
// The topic passed to Subscribe will be the original topic (no colon)
165-
// We append subscription ID to create unique subscription names
166194
pubsubConfig := googlepubsub.SubscriberConfig{
167195
ProjectID: gps.ProjectID,
168196
GenerateSubscriptionName: func(topic string) string {

broker/health_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package broker
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
8+
"github.com/ThreeDotsLabs/watermill/message"
9+
"github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger"
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func TestPublisherHealthWithCustomHealthCheck(t *testing.T) {
14+
mockLogger := logger.NewMockLogger()
15+
16+
t.Run("healthy publisher returns nil", func(t *testing.T) {
17+
p := &publisher{
18+
logger: mockLogger,
19+
healthCheck: func(_ context.Context) error {
20+
return nil
21+
},
22+
}
23+
err := p.Health(context.Background())
24+
assert.NoError(t, err)
25+
})
26+
27+
t.Run("unhealthy publisher returns error", func(t *testing.T) {
28+
p := &publisher{
29+
logger: mockLogger,
30+
healthCheck: func(_ context.Context) error {
31+
return fmt.Errorf("connection lost")
32+
},
33+
}
34+
err := p.Health(context.Background())
35+
assert.Error(t, err)
36+
assert.Contains(t, err.Error(), "connection lost")
37+
})
38+
}
39+
40+
func TestNewRabbitMQHealthCheck(t *testing.T) {
41+
t.Run("returns error for non-AMQP publisher", func(t *testing.T) {
42+
// Pass a fakeWatermillPublisher (non-AMQP) to newRabbitMQHealthCheck
43+
// to trigger the AMQP type assertion failure
44+
hc := newRabbitMQHealthCheck(&fakeWatermillPublisher{})
45+
err := hc(context.Background())
46+
assert.Error(t, err)
47+
assert.Contains(t, err.Error(), "unexpected publisher type")
48+
})
49+
}
50+
51+
// fakeWatermillPublisher implements message.Publisher for testing type assertion failures
52+
type fakeWatermillPublisher struct{}
53+
54+
func (f *fakeWatermillPublisher) Publish(_ string, _ ...*message.Message) error {
55+
return nil
56+
}
57+
58+
func (f *fakeWatermillPublisher) Close() error {
59+
return nil
60+
}

broker/publisher.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package broker
22

33
import (
44
"context"
5+
"fmt"
6+
"io"
57

68
"github.com/ThreeDotsLabs/watermill/message"
79
"github.com/cloudevents/sdk-go/v2/event"
@@ -12,14 +14,25 @@ import (
1214
type Publisher interface {
1315
// Publish publishes a CloudEvent to the specified topic with context
1416
Publish(ctx context.Context, topic string, event *event.Event) error
17+
// Health checks if the underlying broker connection is healthy.
18+
// Returns nil if healthy, or an error describing the failure.
19+
// The provided context controls the deadline/cancellation of the check.
20+
Health(ctx context.Context) error
1521
// Close closes the underlying publisher
1622
Close() error
1723
}
1824

25+
// healthCheckFunc is a function that checks broker connectivity.
26+
// Returns nil if healthy, or an error describing the failure.
27+
// The provided context controls the deadline/cancellation of the check.
28+
type healthCheckFunc func(ctx context.Context) error
29+
1930
// publisher wraps a Watermill publisher and provides a simplified interface
2031
type publisher struct {
21-
pub message.Publisher
22-
logger logger.Logger // Caller's logger (always present - default logger if not provided)
32+
pub message.Publisher
33+
logger logger.Logger // Caller's logger (always present - default logger if not provided)
34+
healthCheck healthCheckFunc
35+
healthCloser io.Closer // optional resource to close with publisher (e.g. Pub/Sub health check client)
2336
}
2437

2538
// Publish publishes a CloudEvent to the specified topic with context
@@ -45,7 +58,17 @@ func (p *publisher) Publish(ctx context.Context, topic string, event *event.Even
4558
return nil
4659
}
4760

48-
// Close closes the underlying publisher
61+
// Health checks if the underlying broker connection is healthy.
62+
// Returns nil if healthy, or an error describing the failure.
63+
// The provided context controls the deadline/cancellation of the check.
64+
func (p *publisher) Health(ctx context.Context) error {
65+
if p == nil || p.healthCheck == nil {
66+
return fmt.Errorf("health check not configured")
67+
}
68+
return p.healthCheck(ctx)
69+
}
70+
71+
// Close closes the underlying publisher and any health check resources.
4972
func (p *publisher) Close() error {
5073
p.logger.Info(context.Background(), "Closing publisher")
5174

@@ -54,5 +77,14 @@ func (p *publisher) Close() error {
5477
p.logger.Errorf(context.Background(), "Failed to close publisher: %v", err)
5578
}
5679

80+
if p.healthCloser != nil {
81+
if closeErr := p.healthCloser.Close(); closeErr != nil {
82+
p.logger.Errorf(context.Background(), "Failed to close health check client: %v", closeErr)
83+
if err == nil {
84+
err = closeErr
85+
}
86+
}
87+
}
88+
5789
return err
5890
}

broker/rabbitmq.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package broker
22

33
import (
4+
"context"
45
"fmt"
56
"strings"
67

@@ -22,6 +23,28 @@ type rabbitMQConfig struct {
2223
PublisherConfirm bool `mapstructure:"publisher_confirm"`
2324
}
2425

26+
// newRabbitMQHealthCheck creates a health check function for a RabbitMQ publisher.
27+
// It uses the AMQP ConnectionWrapper's IsConnected() and Closed() methods
28+
// to determine if the broker connection is alive.
29+
func newRabbitMQHealthCheck(pub message.Publisher) healthCheckFunc {
30+
return func(_ context.Context) error {
31+
amqpPub, ok := pub.(*amqp.Publisher)
32+
if !ok {
33+
return fmt.Errorf("unexpected publisher type for RabbitMQ health check")
34+
}
35+
if amqpPub == nil {
36+
return fmt.Errorf("RabbitMQ publisher is nil")
37+
}
38+
if amqpPub.Closed() {
39+
return fmt.Errorf("RabbitMQ connection is closed")
40+
}
41+
if !amqpPub.IsConnected() {
42+
return fmt.Errorf("RabbitMQ connection is not established")
43+
}
44+
return nil
45+
}
46+
}
47+
2548
// newRabbitMQPublisher creates a RabbitMQ publisher
2649
func newRabbitMQPublisher(cfg *config, logger watermill.LoggerAdapter) (message.Publisher, error) {
2750
amqpConfig := amqp.NewDurablePubSubConfig(

0 commit comments

Comments
 (0)