From 6f1ee490592f1e4478c95d6bbc66a04a472e602e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Thu, 7 Feb 2019 11:19:00 +0100 Subject: [PATCH] Replace Travis with GitLab CI (#35) --- .ci/docker-compose.override.yml | 21 +++ .ci/wait-for-it.sh | 178 ++++++++++++++++++ .ci/wait-for-services.sh | 6 + .gitignore | 1 + .gitlab-ci.yml | 44 +++++ .travis-librdkafka.sh | 19 -- .travis.yml | 14 -- README.md | 2 +- docker-compose.yml | 21 ++- message/infrastructure/amqp/pubsub_test.go | 22 ++- .../googlecloud/pubsub_bench_test.go | 2 +- .../googlecloud/pubsub_stress_test.go | 2 +- .../infrastructure/googlecloud/pubsub_test.go | 2 +- .../infrastructure/kafka/pubsub_bench_test.go | 4 +- message/infrastructure/kafka/pubsub_test.go | 16 +- message/infrastructure/nats/pubsub_test.go | 21 ++- 16 files changed, 313 insertions(+), 62 deletions(-) create mode 100644 .ci/docker-compose.override.yml create mode 100755 .ci/wait-for-it.sh create mode 100755 .ci/wait-for-services.sh create mode 100644 .gitlab-ci.yml delete mode 100755 .travis-librdkafka.sh delete mode 100644 .travis.yml diff --git a/.ci/docker-compose.override.yml b/.ci/docker-compose.override.yml new file mode 100644 index 000000000..299c03339 --- /dev/null +++ b/.ci/docker-compose.override.yml @@ -0,0 +1,21 @@ +version: '3' +services: + watermill: + image: golang:1.11 + command: /bin/true + volumes: + - .:/app + - .mod-cache:/go/pkg/mod + working_dir: /app + environment: + PUBSUB_EMULATOR_HOST: googlecloud:8085 + WATERMILL_TEST_NATS_URL: nats://nats-streaming:4222 + WATERMILL_TEST_AMQP_URI: amqp://guest:guest@rabbitmq:5672 + WATERMILL_TEST_KAFKA_BROKERS: kafka:9092 + + kafka: + environment: + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + + googlecloud: + entrypoint: gcloud --quiet beta emulators pubsub start --host-port=googlecloud:8085 --verbosity=debug --log-http diff --git a/.ci/wait-for-it.sh b/.ci/wait-for-it.sh new file mode 100755 index 000000000..071c2bee3 --- /dev/null +++ b/.ci/wait-for-it.sh @@ -0,0 +1,178 @@ +#!/usr/bin/env bash +# Use this script to test if a given TCP host/port are available + +WAITFORIT_cmdname=${0##*/} + +echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } + +usage() +{ + cat << USAGE >&2 +Usage: + $WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args] + -h HOST | --host=HOST Host or IP under test + -p PORT | --port=PORT TCP port under test + Alternatively, you specify the host and port as host:port + -s | --strict Only execute subcommand if the test succeeds + -q | --quiet Don't output any status messages + -t TIMEOUT | --timeout=TIMEOUT + Timeout in seconds, zero for no timeout + -- COMMAND ARGS Execute command with args after the test finishes +USAGE + exit 1 +} + +wait_for() +{ + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then + echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" + else + echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout" + fi + WAITFORIT_start_ts=$(date +%s) + while : + do + if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then + nc -z $WAITFORIT_HOST $WAITFORIT_PORT + WAITFORIT_result=$? + else + (echo > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1 + WAITFORIT_result=$? + fi + if [[ $WAITFORIT_result -eq 0 ]]; then + WAITFORIT_end_ts=$(date +%s) + echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds" + break + fi + sleep 1 + done + return $WAITFORIT_result +} + +wait_for_wrapper() +{ + # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 + if [[ $WAITFORIT_QUIET -eq 1 ]]; then + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & + else + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & + fi + WAITFORIT_PID=$! + trap "kill -INT -$WAITFORIT_PID" INT + wait $WAITFORIT_PID + WAITFORIT_RESULT=$? + if [[ $WAITFORIT_RESULT -ne 0 ]]; then + echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" + fi + return $WAITFORIT_RESULT +} + +# process arguments +while [[ $# -gt 0 ]] +do + case "$1" in + *:* ) + WAITFORIT_hostport=(${1//:/ }) + WAITFORIT_HOST=${WAITFORIT_hostport[0]} + WAITFORIT_PORT=${WAITFORIT_hostport[1]} + shift 1 + ;; + --child) + WAITFORIT_CHILD=1 + shift 1 + ;; + -q | --quiet) + WAITFORIT_QUIET=1 + shift 1 + ;; + -s | --strict) + WAITFORIT_STRICT=1 + shift 1 + ;; + -h) + WAITFORIT_HOST="$2" + if [[ $WAITFORIT_HOST == "" ]]; then break; fi + shift 2 + ;; + --host=*) + WAITFORIT_HOST="${1#*=}" + shift 1 + ;; + -p) + WAITFORIT_PORT="$2" + if [[ $WAITFORIT_PORT == "" ]]; then break; fi + shift 2 + ;; + --port=*) + WAITFORIT_PORT="${1#*=}" + shift 1 + ;; + -t) + WAITFORIT_TIMEOUT="$2" + if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi + shift 2 + ;; + --timeout=*) + WAITFORIT_TIMEOUT="${1#*=}" + shift 1 + ;; + --) + shift + WAITFORIT_CLI=("$@") + break + ;; + --help) + usage + ;; + *) + echoerr "Unknown argument: $1" + usage + ;; + esac +done + +if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then + echoerr "Error: you need to provide a host and port to test." + usage +fi + +WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15} +WAITFORIT_STRICT=${WAITFORIT_STRICT:-0} +WAITFORIT_CHILD=${WAITFORIT_CHILD:-0} +WAITFORIT_QUIET=${WAITFORIT_QUIET:-0} + +# check to see if timeout is from busybox? +WAITFORIT_TIMEOUT_PATH=$(type -p timeout) +WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH) +if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then + WAITFORIT_ISBUSY=1 + WAITFORIT_BUSYTIMEFLAG="-t" + +else + WAITFORIT_ISBUSY=0 + WAITFORIT_BUSYTIMEFLAG="" +fi + +if [[ $WAITFORIT_CHILD -gt 0 ]]; then + wait_for + WAITFORIT_RESULT=$? + exit $WAITFORIT_RESULT +else + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then + wait_for_wrapper + WAITFORIT_RESULT=$? + else + wait_for + WAITFORIT_RESULT=$? + fi +fi + +if [[ $WAITFORIT_CLI != "" ]]; then + if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then + echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess" + exit $WAITFORIT_RESULT + fi + exec "${WAITFORIT_CLI[@]}" +else + exit $WAITFORIT_RESULT +fi diff --git a/.ci/wait-for-services.sh b/.ci/wait-for-services.sh new file mode 100755 index 000000000..8ba95fa22 --- /dev/null +++ b/.ci/wait-for-services.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -e + +for service in zookeeper:2181 rabbitmq:5672 googlecloud:8085 nats-streaming:4222 kafka:9092; do + "$(dirname "$0")/wait-for-it.sh" -t 60 "$service" +done diff --git a/.gitignore b/.gitignore index 01ce7ec33..06030b4cb 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ docs/public/ docs/content/src-link *.out *.log +.mod-cache diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 000000000..bf06a7a98 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,44 @@ +variables: + MOD_CACHE_DIR: .mod-cache +stages: + - build + - test + +build: + stage: build + image: golang:1.11 + before_script: + - mkdir -p $GOPATH/pkg/mod + - "[ -d $MOD_CACHE_DIR ] && cp -r $MOD_CACHE_DIR/. $GOPATH/pkg/mod" + script: + - go build ./... + - cp -r $GOPATH/pkg/mod/. $MOD_CACHE_DIR + cache: + key: $CI_PROJECT_ID + paths: + - $MOD_CACHE_DIR + only: + - branches + +test: + stage: test + image: docker:stable + services: + - docker:stable-dind + cache: + key: $CI_PROJECT_ID + paths: + - $MOD_CACHE_DIR + policy: pull + before_script: + - apk add py-pip + - pip install docker-compose + - cp -v ./.ci/docker-compose.override.yml . + script: + - docker-compose pull -q + - docker-compose up -d + - docker-compose run watermill ./.ci/wait-for-services.sh + - "[[ $CI_COMMIT_REF_NAME != master ]] && export FLAGS=-short" + - docker-compose run watermill go test ./... $FLAGS + only: + - branches diff --git a/.travis-librdkafka.sh b/.travis-librdkafka.sh deleted file mode 100755 index b0906d34c..000000000 --- a/.travis-librdkafka.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/bash -set -e - -readonly version="$1" -readonly build_dir=/tmp/librdkafka - -if [ -z "$version" ]; then - echo "Usage: $0 " - exit 1 -fi - -mkdir -p "$build_dir" -cd "$build_dir" - -wget -qO- "https://github.com/edenhill/librdkafka/archive/v${version}.tar.gz" | tar xz --strip-components 1 - -./configure --prefix=/usr -make -j "$(getconf _NPROCESSORS_ONLN)" -make install diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index a008939f3..000000000 --- a/.travis.yml +++ /dev/null @@ -1,14 +0,0 @@ -language: go -go: - - 1.11.x -jobs: - include: - - stage: "Build" - name: "Build" - script: go build ./... - -before_script: - - sudo ./.travis-librdkafka.sh 0.11.6 - -env: - - GO111MODULE=on diff --git a/README.md b/README.md index 5eefc8c8b..82bb7a9c2 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Watermill -[![Build Status](https://travis-ci.org/ThreeDotsLabs/watermill.svg?branch=master)](https://travis-ci.org/ThreeDotsLabs/watermill) +[![Build Status](https://gitlab.com/threedotslabs/watermill/badges/master/build.svg)](https://gitlab.com/threedotslabs/watermill/pipelines) [![Go Report Card](https://goreportcard.com/badge/github.com/ThreeDotsLabs/watermill)](https://goreportcard.com/report/github.com/ThreeDotsLabs/watermill) [![codecov](https://codecov.io/gh/ThreeDotsLabs/watermill/branch/master/graph/badge.svg)](https://codecov.io/gh/ThreeDotsLabs/watermill) diff --git a/docker-compose.yml b/docker-compose.yml index a346b41c9..d32a26336 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,12 @@ # for Watermill development pourpses, -# For Watermill based application docker plase check check https://watermill.io/docs/getting-started/ +# For Watermill based application docker please check https://watermill.io/docs/getting-started/ version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:5.0.1 - network_mode: host + ports: + - 2181:2181 restart: on-failure environment: ZOOKEEPER_CLIENT_PORT: 2181 @@ -13,13 +14,14 @@ services: kafka: image: confluentinc/cp-kafka:5.0.1 - network_mode: host + ports: + - 9092:9092 restart: on-failure depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_NUM_PARTITIONS: 8 @@ -27,17 +29,20 @@ services: nats-streaming: image: nats-streaming:0.11.2 - network_mode: host + ports: + - 4222:4222 restart: on-failure - entrypoint: ["/nats-streaming-server", "-m=8222", "--max_channels=0"] + entrypoint: ["/nats-streaming-server", "--max_channels=0"] googlecloud: image: google/cloud-sdk:228.0.0 entrypoint: gcloud --quiet beta emulators pubsub start --host-port=localhost:8085 --verbosity=debug --log-http - network_mode: host + ports: + - 8085:8085 restart: on-failure rabbitmq: image: rabbitmq:3.7-management restart: on-failure - network_mode: host + ports: + - 5672:5672 diff --git a/message/infrastructure/amqp/pubsub_test.go b/message/infrastructure/amqp/pubsub_test.go index 9e4a77d0d..b4c8b947b 100644 --- a/message/infrastructure/amqp/pubsub_test.go +++ b/message/infrastructure/amqp/pubsub_test.go @@ -1,6 +1,7 @@ package amqp_test import ( + "os" "testing" "github.com/ThreeDotsLabs/watermill/message/infrastructure/amqp" @@ -11,12 +12,19 @@ import ( "github.com/ThreeDotsLabs/watermill/message/infrastructure" ) -var amqpURI = "amqp://guest:guest@localhost:5672/" +func amqpURI() string { + uri := os.Getenv("WATERMILL_TEST_AMQP_URI") + if uri != "" { + return uri + } + + return "amqp://guest:guest@localhost:5672/" +} func createPubSub(t *testing.T) infrastructure.PubSub { publisher, err := amqp.NewPublisher( amqp.NewDurablePubSubConfig( - amqpURI, + amqpURI(), nil, ), watermill.NewStdLogger(true, true), @@ -25,7 +33,7 @@ func createPubSub(t *testing.T) infrastructure.PubSub { subscriber, err := amqp.NewSubscriber( amqp.NewDurablePubSubConfig( - amqpURI, + amqpURI(), amqp.GenerateQueueNameTopicNameWithSuffix("test"), ), watermill.NewStdLogger(true, true), @@ -38,7 +46,7 @@ func createPubSub(t *testing.T) infrastructure.PubSub { func createPubSubWithConsumerGroup(t *testing.T, consumerGroup string) infrastructure.PubSub { publisher, err := amqp.NewPublisher( amqp.NewDurablePubSubConfig( - amqpURI, + amqpURI(), nil, ), watermill.NewStdLogger(true, true), @@ -47,7 +55,7 @@ func createPubSubWithConsumerGroup(t *testing.T, consumerGroup string) infrastru subscriber, err := amqp.NewSubscriber( amqp.NewDurablePubSubConfig( - amqpURI, + amqpURI(), amqp.GenerateQueueNameTopicNameWithSuffix(consumerGroup), ), watermill.NewStdLogger(true, true), @@ -74,7 +82,7 @@ func TestPublishSubscribe_pubsub(t *testing.T) { func createQueuePubSub(t *testing.T) infrastructure.PubSub { config := amqp.NewDurableQueueConfig( - amqpURI, + amqpURI(), ) publisher, err := amqp.NewPublisher( @@ -109,7 +117,7 @@ func TestPublishSubscribe_queue(t *testing.T) { func TestPublishSubscribe_transactional_publish(t *testing.T) { config := amqp.NewDurablePubSubConfig( - amqpURI, + amqpURI(), amqp.GenerateQueueNameTopicNameWithSuffix("test"), ) config.Publish.Transactional = true diff --git a/message/infrastructure/googlecloud/pubsub_bench_test.go b/message/infrastructure/googlecloud/pubsub_bench_test.go index 32d1aab06..2d2cc2d7c 100644 --- a/message/infrastructure/googlecloud/pubsub_bench_test.go +++ b/message/infrastructure/googlecloud/pubsub_bench_test.go @@ -10,7 +10,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message/infrastructure/googlecloud" ) -// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=localhost:8085 for this to work +// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=googlecloud:8085 for this to work func BenchmarkSubscriber(b *testing.B) { infrastructure.BenchSubscriber(b, func(n int) message.PubSub { diff --git a/message/infrastructure/googlecloud/pubsub_stress_test.go b/message/infrastructure/googlecloud/pubsub_stress_test.go index 03c82ba05..c020d9987 100644 --- a/message/infrastructure/googlecloud/pubsub_stress_test.go +++ b/message/infrastructure/googlecloud/pubsub_stress_test.go @@ -8,7 +8,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message/infrastructure" ) -// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=localhost:8085 for this to work +// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=googlecloud:8085 for this to work func TestPublishSubscribe_stress(t *testing.T) { infrastructure.TestPubSubStressTest( diff --git a/message/infrastructure/googlecloud/pubsub_test.go b/message/infrastructure/googlecloud/pubsub_test.go index ac6973e69..8b70fcaf3 100644 --- a/message/infrastructure/googlecloud/pubsub_test.go +++ b/message/infrastructure/googlecloud/pubsub_test.go @@ -17,7 +17,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message/infrastructure/googlecloud" ) -// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=localhost:8085 for this to work +// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=googlecloud:8085 for this to work func newPubSub(t *testing.T, marshaler googlecloud.MarshalerUnmarshaler, subscriptionName googlecloud.SubscriptionNameFn) message.PubSub { ctx := context.Background() diff --git a/message/infrastructure/kafka/pubsub_bench_test.go b/message/infrastructure/kafka/pubsub_bench_test.go index 9b84e955d..eb3efc0b6 100644 --- a/message/infrastructure/kafka/pubsub_bench_test.go +++ b/message/infrastructure/kafka/pubsub_bench_test.go @@ -15,7 +15,7 @@ func BenchmarkSubscriber(b *testing.B) { infrastructure.BenchSubscriber(b, func(n int) message.PubSub { logger := watermill.NopLogger{} - publisher, err := kafka.NewPublisher(brokers, kafka.DefaultMarshaler{}, nil, logger) + publisher, err := kafka.NewPublisher(kafkaBrokers(), kafka.DefaultMarshaler{}, nil, logger) if err != nil { panic(err) } @@ -25,7 +25,7 @@ func BenchmarkSubscriber(b *testing.B) { subscriber, err := kafka.NewSubscriber( kafka.SubscriberConfig{ - Brokers: brokers, + Brokers: kafkaBrokers(), ConsumerGroup: "test", }, saramaConfig, diff --git a/message/infrastructure/kafka/pubsub_test.go b/message/infrastructure/kafka/pubsub_test.go index 4486fd07a..d9028d347 100644 --- a/message/infrastructure/kafka/pubsub_test.go +++ b/message/infrastructure/kafka/pubsub_test.go @@ -1,6 +1,8 @@ package kafka_test import ( + "os" + "strings" "testing" "time" @@ -13,12 +15,18 @@ import ( "github.com/stretchr/testify/require" ) -var brokers = []string{"localhost:9092"} +func kafkaBrokers() []string { + brokers := os.Getenv("WATERMILL_TEST_KAFKA_BROKERS") + if brokers != "" { + return strings.Split(brokers, ",") + } + return []string{"localhost:9092"} +} func newPubSub(t *testing.T, marshaler kafka.MarshalerUnmarshaler, consumerGroup string) message.PubSub { logger := watermill.NewStdLogger(true, true) - publisher, err := kafka.NewPublisher(brokers, marshaler, nil, logger) + publisher, err := kafka.NewPublisher(kafkaBrokers(), marshaler, nil, logger) require.NoError(t, err) saramaConfig := kafka.DefaultSaramaSubscriberConfig() @@ -32,7 +40,7 @@ func newPubSub(t *testing.T, marshaler kafka.MarshalerUnmarshaler, consumerGroup subscriber, err := kafka.NewSubscriber( kafka.SubscriberConfig{ - Brokers: brokers, + Brokers: kafkaBrokers(), ConsumerGroup: consumerGroup, InitializeTopicDetails: &sarama.TopicDetail{ NumPartitions: 8, @@ -74,7 +82,7 @@ func createNoGroupSubscriberConstructor(t *testing.T) message.Subscriber { sub, err := kafka.NewSubscriber( kafka.SubscriberConfig{ - Brokers: brokers, + Brokers: kafkaBrokers(), ConsumerGroup: "", }, saramaConfig, diff --git a/message/infrastructure/nats/pubsub_test.go b/message/infrastructure/nats/pubsub_test.go index 4d7fd4656..d0a81eca9 100644 --- a/message/infrastructure/nats/pubsub_test.go +++ b/message/infrastructure/nats/pubsub_test.go @@ -1,24 +1,34 @@ package nats_test import ( + "os" "testing" "time" "github.com/ThreeDotsLabs/watermill" - "github.com/stretchr/testify/require" - - "github.com/ThreeDotsLabs/watermill/message/infrastructure" - "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/infrastructure" "github.com/ThreeDotsLabs/watermill/message/infrastructure/nats" + "github.com/nats-io/go-nats-streaming" + "github.com/satori/go.uuid" + "github.com/stretchr/testify/require" ) func newPubSub(t *testing.T, clientID string, queueName string) message.PubSub { logger := watermill.NewStdLogger(true, true) + + natsURL := os.Getenv("WATERMILL_TEST_NATS_URL") + if natsURL == "" { + natsURL = "nats://localhost:4222" + } + pub, err := nats.NewStreamingPublisher(nats.StreamingPublisherConfig{ ClusterID: "test-cluster", ClientID: clientID + "_pub", Marshaler: nats.GobMarshaler{}, + StanOptions: []stan.Option{ + stan.NatsURL(natsURL), + }, }, logger) require.NoError(t, err) @@ -30,6 +40,9 @@ func newPubSub(t *testing.T, clientID string, queueName string) message.PubSub { SubscribersCount: 1, AckWaitTimeout: time.Second, // AckTiemout < 5 required for continueAfterErrors Unmarshaler: nats.GobMarshaler{}, + StanOptions: []stan.Option{ + stan.NatsURL(natsURL), + }, }, logger) require.NoError(t, err)