From 7395ce520a5e94fce13b07ba29fcce9888a228c5 Mon Sep 17 00:00:00 2001 From: Will Owens Date: Sat, 14 Dec 2024 10:25:34 -0500 Subject: [PATCH] feat: additional advertised listners/broker addresses for kafka --- docs/modules/kafka.md | 40 ++++ modules/kafka/examples_test.go | 344 ++++++++++++++++++++++++++++++++- modules/kafka/kafka.go | 119 ++++++++++-- modules/kafka/kafka_test.go | 123 +++++++++++- 4 files changed, 603 insertions(+), 23 deletions(-) diff --git a/docs/modules/kafka.md b/docs/modules/kafka.md index d2f951b043..53e2d039b0 100644 --- a/docs/modules/kafka.md +++ b/docs/modules/kafka.md @@ -61,6 +61,15 @@ The Kafka container will be started using a custom shell script: [Init script](../../modules/kafka/kafka.go) inside_block:starterScript +That will set the advertised listeners with these values: + + +[Advertised Listeners](../../modules/kafka/kafka.go) inside_block:advertisedListeners + + +KafkaContainer provides methods to read the broker addresses for different +connectivity environments. + #### Environment variables The environment variables that are already set by default are: @@ -82,3 +91,34 @@ The `Brokers(ctx)` method returns the Kafka brokers as a string slice, containin [Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:getBrokers + +#### BrokersByHostDockerInternal + +The `BrokersByHostDockerInternal(ctx)` method returns the Kafka brokers as a +string slice, containing the hostname `host.docker.internal` and a random port +defined by Kafka's public port (`19092/tcp`). + +This method is useful when you need to run additional containers that need to +connect to Kafka. + + +[Get Kafka brokers by host.docker.internal](../../modules/kafka/examples_test.go) inside_block:getBrokersByHostDockerInternal + + +#### BrokersByContainerName + +The `BrokersByContainerName(ctx)` method returns the Kafka brokers as a string +slice, addressed by the container's name(`Ex: charming_dijkstra:19093`). This +method is useful when you need to run additional containers that need to connect +to Kafka. + +To use this broker address you should run all the containers inside a docker +network. + + +[Start Kafka inside a docker network](../../modules/kafka/examples_test.go) inside_block:getBrokersByContainerName_Kafka + + + +[Get Kafka brokers by container name](../../modules/kafka/examples_test.go) inside_block:getBrokersByContainerName_Kcat + diff --git a/modules/kafka/examples_test.go b/modules/kafka/examples_test.go index c275924ecc..43d74a67ec 100644 --- a/modules/kafka/examples_test.go +++ b/modules/kafka/examples_test.go @@ -1,12 +1,20 @@ package kafka_test import ( + "bytes" "context" "fmt" + "io" "log" + "strings" + + "github.com/IBM/sarama" + "github.com/docker/docker/api/types/container" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/kafka" + "github.com/testcontainers/testcontainers-go/network" + "github.com/testcontainers/testcontainers-go/wait" ) func ExampleRun() { @@ -19,7 +27,7 @@ func ExampleRun() { ) defer func() { if err := testcontainers.TerminateContainer(kafkaContainer); err != nil { - log.Printf("failed to terminate container: %s", err) + log.Fatalf("failed to terminate container: %s", err) } }() if err != nil { @@ -41,3 +49,337 @@ func ExampleRun() { // test-cluster // true } + +func ExampleKafkaContainer_BrokersByHostDockerInternal() { + ctx := context.Background() + + kafkaContainer, err := kafka.Run(ctx, + "confluentinc/confluent-local:7.5.0", + kafka.WithClusterID("test-cluster"), + ) + if err != nil { + log.Printf("failed to start container: %s", err) + return + } + + // Clean up the container after + defer func() { + if err := kafkaContainer.Terminate(ctx); err != nil { + log.Fatalf("failed to terminate container: %s", err) + } + }() + + { + state, err := kafkaContainer.State(ctx) + if err != nil { + log.Printf("failed to get container state: %s", err) + return + } + + fmt.Println(kafkaContainer.ClusterID) + fmt.Println(state.Running) + } + + const topic = "example-topic" + + // Produce a message from the host that will be read by a consumer in another docker container + { + brokers, err := kafkaContainer.Brokers(ctx) + if err != nil { + log.Print(err) + return + } + + config := sarama.NewConfig() + config.Producer.Return.Successes = true + producer, err := sarama.NewSyncProducer(brokers, config) + if err != nil { + log.Print(err) + return + } + + if _, _, err := producer.SendMessage(&sarama.ProducerMessage{ + Topic: topic, + Key: sarama.StringEncoder("key"), + Value: sarama.StringEncoder("example_message_value"), + }); err != nil { + log.Print(err) + return + } + + } + + // getBrokersByHostDockerInternal { + brokers, err := kafkaContainer.BrokersByHostDockerInternal(ctx) + if err != nil { + log.Print(err) + return + } + + // Run another container that can connect to the kafka container via hostname "host.docker.internal" + kcat, err := testcontainers.GenericContainer( + ctx, + testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kafkacat", + Entrypoint: []string{"kafkacat"}, + Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"}, + WaitingFor: wait.ForExit(), + + // Add host.docker.internal to the consumer container so it can contact the kafka borkers + HostConfigModifier: func(hc *container.HostConfig) { + hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") + }, + }, + Started: true, + }, + ) + if err != nil { + log.Printf("kafkacat error: %v", err) + return + } + + lr, err := kcat.Logs(ctx) + if err != nil { + log.Printf("kafkacat logs error: %v", err) + return + } + + logs, err := io.ReadAll(lr) + if err != nil { + log.Printf("kafkacat logs read error: %v", err) + return + } + + fmt.Println("read message:", string(bytes.TrimSpace(logs))) + // } + + // Output: + // test-cluster + // true + // read message: example_message_value +} + +func ExampleKafkaContainer_BrokersByContainerName() { + ctx := context.Background() + + // getBrokersByContainerName_Kafka { + net, err := network.New(ctx) + if err != nil { + log.Printf("failed to create network: %s", err) + return + } + + kafkaContainer, err := kafka.Run(ctx, + "confluentinc/confluent-local:7.5.0", + kafka.WithClusterID("test-cluster"), + network.WithNetwork(nil, net), // Run kafka test container in a new docker network + ) + if err != nil { + log.Printf("failed to start container: %s", err) + return + } + // } + + // Clean up the container after + defer func() { + if err := kafkaContainer.Terminate(ctx); err != nil { + log.Fatalf("failed to terminate container: %s", err) + } + }() + + { + state, err := kafkaContainer.State(ctx) + if err != nil { + log.Printf("failed to get container state: %s", err) + return + } + + fmt.Println(kafkaContainer.ClusterID) + fmt.Println(state.Running) + } + + const topic = "example-topic" + + // Produce a message from the host that will be read by a consumer in another docker container + { + brokers, err := kafkaContainer.Brokers(ctx) + if err != nil { + log.Print(err) + return + } + + config := sarama.NewConfig() + config.Producer.Return.Successes = true + producer, err := sarama.NewSyncProducer(brokers, config) + if err != nil { + log.Print(err) + return + } + + if _, _, err := producer.SendMessage(&sarama.ProducerMessage{ + Topic: topic, + Key: sarama.StringEncoder("key"), + Value: sarama.StringEncoder("example_message_value"), + }); err != nil { + log.Print(err) + return + } + } + + // getBrokersByContainerName_Kcat { + brokers, err := kafkaContainer.BrokersByContainerName(ctx) + if err != nil { + log.Print(err) + return + } + + // Run another container that can connect to the kafka container via the kafka containers name + kcat, err := testcontainers.GenericContainer( + ctx, + testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kafkacat", + Entrypoint: []string{"kafkacat"}, + Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"}, + WaitingFor: wait.ForExit(), + Networks: []string{net.Name}, // Run kafkacat in the same docker network as the testcontainer + }, + Started: true, + }, + ) + if err != nil { + log.Printf("kafkacat error: %v", err) + return + } + + lr, err := kcat.Logs(ctx) + if err != nil { + log.Printf("kafkacat logs error: %v", err) + return + } + + logs, err := io.ReadAll(lr) + if err != nil { + log.Printf("kafkacat logs read error: %v", err) + return + } + + fmt.Println("read message:", string(bytes.TrimSpace(logs))) + // } + + // Output: + // test-cluster + // true + // read message: example_message_value +} + +func ExampleKafkaContainer_BrokersByContainerId() { + ctx := context.Background() + + net, err := network.New(ctx) + if err != nil { + log.Printf("failed to create network: %s", err) + return + } + + kafkaContainer, err := kafka.Run(ctx, + "confluentinc/confluent-local:7.5.0", + kafka.WithClusterID("test-cluster"), + network.WithNetwork(nil, net), // Run kafka test container in a new docker network + ) + if err != nil { + log.Printf("failed to start container: %s", err) + return + } + + // Clean up the container after + defer func() { + if err := kafkaContainer.Terminate(ctx); err != nil { + log.Fatalf("failed to terminate container: %s", err) + } + }() + + { + state, err := kafkaContainer.State(ctx) + if err != nil { + log.Printf("failed to get container state: %s", err) + return + } + + fmt.Println(kafkaContainer.ClusterID) + fmt.Println(state.Running) + } + + const topic = "example-topic" + + // Produce a message from the host that will be read by a consumer in another docker container + { + brokers, err := kafkaContainer.Brokers(ctx) + if err != nil { + log.Print(err) + return + } + + config := sarama.NewConfig() + config.Producer.Return.Successes = true + producer, err := sarama.NewSyncProducer(brokers, config) + if err != nil { + log.Print(err) + return + } + + if _, _, err := producer.SendMessage(&sarama.ProducerMessage{ + Topic: topic, + Key: sarama.StringEncoder("key"), + Value: sarama.StringEncoder("example_message_value"), + }); err != nil { + log.Print(err) + return + } + } + + brokers, err := kafkaContainer.BrokersByContainerId(ctx) + if err != nil { + log.Print(err) + return + } + + // Run another container that can connect to the kafka container via the kafka containers ContainerID + kcat, err := testcontainers.GenericContainer( + ctx, + testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kafkacat", + Entrypoint: []string{"kafkacat"}, + Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"}, + WaitingFor: wait.ForExit(), + Networks: []string{net.Name}, // Run kafkacat in the same docker network as the testcontainer + }, + Started: true, + }, + ) + if err != nil { + log.Printf("kafkacat error: %v", err) + return + } + + lr, err := kcat.Logs(ctx) + if err != nil { + log.Printf("kafkacat logs error: %v", err) + return + } + + logs, err := io.ReadAll(lr) + if err != nil { + log.Printf("kafkacat logs read error: %v", err) + return + } + + fmt.Println("read message:", string(bytes.TrimSpace(logs))) + + // Output: + // test-cluster + // true + // read message: example_message_value +} diff --git a/modules/kafka/kafka.go b/modules/kafka/kafka.go index 73e392e1d2..8ee70b14c8 100644 --- a/modules/kafka/kafka.go +++ b/modules/kafka/kafka.go @@ -15,20 +15,37 @@ import ( "github.com/testcontainers/testcontainers-go/wait" ) -const publicPort = nat.Port("9093/tcp") +const ( + // Mapped port for advertised listener of localhost:. + PublicLocalhostPort = nat.Port("9093/tcp") + // Mapped port for advertised listener of host.docker.internal: + PublicDockerHostPort = nat.Port("19092/tcp") + // Listening port for advertised listener of :19093. This is not mapped to a random host port + NetworkInternalContainerNamePort = 19093 + // Listening port for advertised listener of :19094. This is not mapped to a random host port + NetworkInternalContainerIdPort = 19094 + // Listening port for Broker intercommunication + BrokerToBrokerPort = 9092 + // Listening port for Contoller + ControllerPort = 9094 +) + const ( starterScript = "/usr/sbin/testcontainers_start.sh" // starterScript { - starterScriptContent = `#!/bin/bash + starterScriptContent = ` +#!/bin/bash + source /etc/confluent/docker/bash-config -export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://%s:%d,BROKER://%s:9092 +export KAFKA_ADVERTISED_LISTENERS=%s echo Starting Kafka KRaft mode sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure echo '' > /etc/confluent/docker/ensure /etc/confluent/docker/configure -/etc/confluent/docker/launch` +/etc/confluent/docker/launch +` // } ) @@ -46,14 +63,34 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize // Run creates an instance of the Kafka container type func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error) { + listeners := []string{ + fmt.Sprintf("LOCALHOST://0.0.0.0:%d", PublicLocalhostPort.Int()), + fmt.Sprintf("HOST_DOCKER_INTERNAL://0.0.0.0:%d", PublicDockerHostPort.Int()), + fmt.Sprintf("CONTAINER_NAME://0.0.0.0:%d", NetworkInternalContainerNamePort), + fmt.Sprintf("CONTAINER_ID://0.0.0.0:%d", NetworkInternalContainerIdPort), + fmt.Sprintf("BROKER://0.0.0.0:%d", BrokerToBrokerPort), + fmt.Sprintf("CONTROLLER://0.0.0.0:%d", ControllerPort), + } + + protoMap := []string{ + "LOCALHOST:PLAINTEXT", + "HOST_DOCKER_INTERNAL:PLAINTEXT", + "CONTAINER_NAME:PLAINTEXT", + "CONTAINER_ID:PLAINTEXT", + "BROKER:PLAINTEXT", + "CONTROLLER:PLAINTEXT", + } req := testcontainers.ContainerRequest{ - Image: img, - ExposedPorts: []string{string(publicPort)}, + Image: img, + ExposedPorts: []string{ + string(PublicLocalhostPort), + string(PublicDockerHostPort), + }, Env: map[string]string{ // envVars { - "KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", - "KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", - "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT", + "KAFKA_LISTENERS": strings.Join(listeners, ","), + "KAFKA_REST_BOOTSTRAP_SERVERS": strings.Join(listeners, ","), + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": strings.Join(protoMap, ","), "KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER", "KAFKA_BROKER_ID": "1", "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1", @@ -123,7 +160,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom // copyStarterScript copies the starter script into the container. func copyStarterScript(ctx context.Context, c testcontainers.Container) error { - if err := wait.ForListeningPort(publicPort). + if err := wait.ForListeningPort(PublicLocalhostPort). SkipInternalCheck(). WaitUntilReady(ctx, c); err != nil { return fmt.Errorf("wait for exposed port: %w", err) @@ -141,12 +178,27 @@ func copyStarterScript(ctx context.Context, c testcontainers.Container) error { hostname := inspect.Config.Hostname - port, err := c.MappedPort(ctx, publicPort) + portLh, err := c.MappedPort(ctx, PublicLocalhostPort) if err != nil { return fmt.Errorf("mapped port: %w", err) } - scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), hostname) + portDh, err := c.MappedPort(ctx, PublicDockerHostPort) + if err != nil { + return fmt.Errorf("mapped port: %w", err) + } + + // advertisedListeners { + advertisedListeners := []string{ + fmt.Sprintf("LOCALHOST://%s:%d", host, portLh.Int()), + fmt.Sprintf("HOST_DOCKER_INTERNAL://%s:%d", "host.docker.internal", portDh.Int()), + fmt.Sprintf("CONTAINER_NAME://%s:%d", strings.Trim(inspect.Name, "/"), NetworkInternalContainerNamePort), + fmt.Sprintf("CONTAINER_ID://%s:%d", hostname, NetworkInternalContainerIdPort), + fmt.Sprintf("BROKER://%s:%d", hostname, BrokerToBrokerPort), + } + + scriptContent := fmt.Sprintf(starterScriptContent, strings.Join(advertisedListeners, ",")) + // } if err := c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755); err != nil { return fmt.Errorf("copy to container: %w", err) @@ -165,13 +217,15 @@ func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption { // Brokers retrieves the broker connection strings from Kafka with only one entry, // defined by the exposed public port. +// +// Example Output: localhost: func (kc *KafkaContainer) Brokers(ctx context.Context) ([]string, error) { host, err := kc.Host(ctx) if err != nil { return nil, err } - port, err := kc.MappedPort(ctx, publicPort) + port, err := kc.MappedPort(ctx, PublicLocalhostPort) if err != nil { return nil, err } @@ -179,6 +233,45 @@ func (kc *KafkaContainer) Brokers(ctx context.Context) ([]string, error) { return []string{fmt.Sprintf("%s:%d", host, port.Int())}, nil } +// BrokersByHostDockerInternal retrieves broker connection string suitable when +// running 2 containers in the default docker network +// +// Example Output: host.docker.internal: +func (kc *KafkaContainer) BrokersByHostDockerInternal(ctx context.Context) ([]string, error) { + port, err := kc.MappedPort(ctx, PublicDockerHostPort) + if err != nil { + return nil, err + } + + return []string{fmt.Sprintf("%s:%d", "host.docker.internal", port.Int())}, nil +} + +// BrokersByContainerName retrieves broker connection string suitable when +// trying to connect 2 containers running within the same docker network together +// +// Example Output: zealous_murdock:19093 +func (kc *KafkaContainer) BrokersByContainerName(ctx context.Context) ([]string, error) { + inspect, err := kc.Inspect(ctx) + if err != nil { + return nil, err + } + + return []string{fmt.Sprintf("%s:%d", strings.Trim(inspect.Name, "/"), NetworkInternalContainerNamePort)}, nil +} + +// BrokersByContainerId retrieves broker connection string suitable when +// trying to connect 2 containers running within the same docker network together +// +// Example Output: e3c69e4fc625:19094 +func (kc *KafkaContainer) BrokersByContainerId(ctx context.Context) ([]string, error) { + inspect, err := kc.Inspect(ctx) + if err != nil { + return nil, err + } + + return []string{fmt.Sprintf("%s:%d", inspect.Config.Hostname, NetworkInternalContainerIdPort)}, nil +} + // configureControllerQuorumVoters sets the quorum voters for the controller. For that, it will // check if there are any network aliases defined for the container and use the first alias in the // first network. Else, it will use localhost. diff --git a/modules/kafka/kafka_test.go b/modules/kafka/kafka_test.go index af858f849f..2638fe5a49 100644 --- a/modules/kafka/kafka_test.go +++ b/modules/kafka/kafka_test.go @@ -1,26 +1,42 @@ package kafka_test import ( + "bytes" "context" + "fmt" + "io" "strings" "testing" "github.com/IBM/sarama" + "github.com/docker/docker/api/types/container" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/kafka" + "github.com/testcontainers/testcontainers-go/network" + "github.com/testcontainers/testcontainers-go/wait" ) func TestKafka(t *testing.T) { - topic := "some-topic" + const ( + topic = "some-topic" + value = "kafka-message-value" + ) ctx := context.Background() + net, err := network.New(ctx) + require.NoError(t, err) - kafkaContainer, err := kafka.Run(ctx, "confluentinc/confluent-local:7.5.0", kafka.WithClusterID("kraftCluster")) + kafkaContainer, err := kafka.Run(ctx, "confluentinc/confluent-local:7.5.0", kafka.WithClusterID("kraftCluster"), network.WithNetwork(nil, net)) testcontainers.CleanupContainer(t, kafkaContainer) require.NoError(t, err) + // Clean up the container after the test is complete + t.Cleanup(func() { + require.NoError(t, kafkaContainer.Terminate(ctx), "failed to terminate container: %v", err) + }) + assertAdvertisedListeners(t, kafkaContainer) require.Truef(t, strings.EqualFold(kafkaContainer.ClusterID, "kraftCluster"), "expected clusterID to be %s, got %s", "kraftCluster", kafkaContainer.ClusterID) @@ -56,14 +72,67 @@ func TestKafka(t *testing.T) { _, _, err = producer.SendMessage(&sarama.ProducerMessage{ Topic: topic, Key: sarama.StringEncoder("key"), - Value: sarama.StringEncoder("value"), + Value: sarama.StringEncoder(value), }) require.NoError(t, err) <-done require.Truef(t, strings.EqualFold(string(consumer.message.Key), "key"), "expected key to be %s, got %s", "key", string(consumer.message.Key)) - require.Truef(t, strings.EqualFold(string(consumer.message.Value), "value"), "expected value to be %s, got %s", "value", string(consumer.message.Value)) + require.Truef(t, strings.EqualFold(string(consumer.message.Value), value), "expected value to be %s, got %s", value, string(consumer.message.Value)) + + assertBrokers := func( + prefix string, + getBrokers func(context.Context) ([]string, error), + hostMod func(*container.HostConfig), + ) { + t.Helper() + + brokers, err = getBrokers(ctx) + require.NoError(t, err) + + t.Log(prefix, strings.Join(brokers, ",")) + + kcat, err := testcontainers.GenericContainer( + ctx, + testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kafkacat", + Entrypoint: []string{"kafkacat"}, + Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"}, + WaitingFor: wait.ForExit(), + HostConfigModifier: hostMod, + Networks: []string{net.Name}, + }, + Started: true, + }, + ) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, kcat.Terminate(ctx), "failed to terminate container") + }) + + l, err := kcat.Logs(ctx) + require.NoError(t, err) + + lb, err := io.ReadAll(l) + require.NoError(t, err) + + readMsg := string(bytes.TrimSpace(lb)) + require.Truef(t, strings.EqualFold(readMsg, value), "expected value to be %s, got %s", value, readMsg) + } + + t.Run("BrokersByHostDockerInternal", func(t *testing.T) { + assertBrokers("BrokersByHostDockerInternal: ", kafkaContainer.BrokersByHostDockerInternal, func(hc *container.HostConfig) { + hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") + }) + }) + t.Run("BrokersByContainerId", func(t *testing.T) { + assertBrokers("BrokersByContainerId: ", kafkaContainer.BrokersByContainerId, nil) + }) + t.Run("BrokersByContainerName", func(t *testing.T) { + assertBrokers("BrokersByContainerName: ", kafkaContainer.BrokersByContainerName, nil) + }) } func TestKafka_invalidVersion(t *testing.T) { @@ -75,17 +144,53 @@ func TestKafka_invalidVersion(t *testing.T) { } // assertAdvertisedListeners checks that the advertised listeners are set correctly: +// - The LOCALHOST:// protocol is using the host of the Kafka container +// - The HOST_DOCKER_INTERNAL:// protocol is using hostname host.docker.internal +// - The CONTAINER_NAME:// protocol is using the container name of the Kafka container +// - The CONTAINER_ID:// protocol is using the container ID of the Kafka container // - The BROKER:// protocol is using the hostname of the Kafka container -func assertAdvertisedListeners(t *testing.T, container testcontainers.Container) { +func assertAdvertisedListeners(t *testing.T, container *kafka.KafkaContainer) { t.Helper() - inspect, err := container.Inspect(context.Background()) + ctx := context.Background() + + inspect, err := container.Inspect(ctx) require.NoError(t, err) - brokerURL := "BROKER://" + inspect.Config.Hostname + ":9092" + portLh, err := container.MappedPort(ctx, kafka.PublicLocalhostPort) + require.NoError(t, err) - ctx := context.Background() + portDh, err := container.MappedPort(ctx, kafka.PublicDockerHostPort) + require.NoError(t, err) + + host, err := container.Host(ctx) + require.NoError(t, err) bs := testcontainers.RequireContainerExec(ctx, t, container, []string{"cat", "/usr/sbin/testcontainers_start.sh"}) - require.Containsf(t, bs, brokerURL, "expected advertised listeners to contain %s, got %s", brokerURL, bs) + assert := func(listener string) { + t.Helper() + require.Containsf(t, bs, listener, "expected advertised listeners to contain %s, got %s", listener, bs) + } + + mustBrokers := func(fn func(context.Context) ([]string, error)) string { + t.Helper() + brokers, err := fn(ctx) + require.NoError(t, err) + require.Len(t, brokers, 1) + return brokers[0] + } + + assert(fmt.Sprintf("LOCALHOST://%s:%d", host, portLh.Int())) + assert(fmt.Sprintf("LOCALHOST://%s", mustBrokers(container.Brokers))) //nolint:perfsprint + + assert(fmt.Sprintf("HOST_DOCKER_INTERNAL://host.docker.internal:%d", portDh.Int())) + assert(fmt.Sprintf("HOST_DOCKER_INTERNAL://%s", mustBrokers(container.BrokersByHostDockerInternal))) //nolint:perfsprint + + assert(fmt.Sprintf("CONTAINER_NAME://%s:%d", strings.Trim(inspect.Name, "/"), kafka.NetworkInternalContainerNamePort)) + assert(fmt.Sprintf("CONTAINER_NAME://%s", mustBrokers(container.BrokersByContainerName))) //nolint:perfsprint + + assert(fmt.Sprintf("CONTAINER_ID://%s:%d", inspect.Config.Hostname, kafka.NetworkInternalContainerIdPort)) + assert(fmt.Sprintf("CONTAINER_ID://%s", mustBrokers(container.BrokersByContainerId))) //nolint:perfsprint + + assert(fmt.Sprintf("BROKER://%s:%d", inspect.Config.Hostname, kafka.BrokerToBrokerPort)) }