This shows how to run on GCE the Confluent Platform components that are not yet in CCloud. It’s done using Docker on VMs, and GCP’s new create-with-container
API. You could do it in VMs running Docker manually if you want, or maybe with k8s but that’s a step beyond me…
The assumption is you have a CCloud broker endpoint and API credentials already.
-
Search and replace
CCLOUD_USERNAME
with your CCloud API user -
Search and replace
CCLOUD_PASSWORD
with your CCloud API password -
Search and replace
CCLOUD_BROKER_HOST
with your CCloud broker address -
Search and replace
GCP_PROJECT_NAME
with your GCP project id
_TODO: figure out appropriate bash so as to be able to set the above as environment variables passed into the instance launch commands :D _
Set this to a unique value each time, otherwise you’ll end up with clashes on internal Kafka topics on your cluster—which is bad, m’kay?
export DEPLOYMENT_VERSION=x03
gcloud compute --project=GCP_PROJECT_NAME firewall-rules create allow-connect-connect-rest --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:8083 --source-ranges=0.0.0.0/0 --target-tags=kafka-connect
gcloud compute --project=GCP_PROJECT_NAME firewall-rules create allow-connect-ksql-server --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:8088 --source-ranges=0.0.0.0/0 --target-tags=ksql-server
gcloud compute --project=GCP_PROJECT_NAME firewall-rules create allow-connect-schema-registry --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:8081 --source-ranges=0.0.0.0/0 --target-tags=schema-registry
gcloud beta compute instances create-with-container rmoff-schema-registry-$DEPLOYMENT_VERSION \
--zone=us-east1-b \
--tags schema-registry \
--container-image confluentinc/cp-schema-registry:5.0.0 \
--container-env=SCHEMA_REGISTRY_HOST_NAME=localhost,SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081,SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=SASL_SSL://CCLOUD_BROKER_HOST:9092,SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL=SASL_SSL,SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME\"\ password=\"CCLOUD_PASSWORD\"\;,SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM=PLAIN,SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL=INFO \
--container-restart-policy=never
Take the external IP of the instance created, and set it as an environment variable:
export SCHEMA_REGISTRY_HOST=xx.xx.xx.xx
gcloud beta compute instances create-with-container rmoff-ksql-server-$DEPLOYMENT_VERSION \
--zone=us-east1-b \
--tags ksql-server \
--container-image confluentinc/cp-ksql-server:5.0.0 \
--container-env=KSQL_BOOTSTRAP_SERVERS=CCLOUD_BROKER_HOST:9092,KSQL_KSQL_SCHEMA_REGISTRY_URL=http://$SCHEMA_REGISTRY_HOST:8081,KSQL_KSQL_SERVER_UI_ENABLED=false,KSQL_APPLICATION_ID=rmoff-gcp-pipeline-demo-$DEPLOYMENT_VERSION,KSQL_KSQL_STREAMS_REPLICATION_FACTOR=3,KSQL_KSQL_SINK_REPLICAS=3,KSQL_LISTENERS=http://0.0.0.0:8088,KSQL_CACHE_MAX_BYTES_BUFFERING=0,KSQL_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=HTTPS,KSQL_SECURITY_PROTOCOL=SASL_SSL,KSQL_SASL_MECHANISM=PLAIN,KSQL_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME\"\ password=\"CCLOUD_PASSWORD\"\; \
--container-restart-policy=never
Take the external IP of the instance created, and set it as an environment variable:
export KSQL_SERVER_HOST=xx.xx.xx.xx
gcloud beta compute instances create-with-container rmoff-connect-source-$DEPLOYMENT_VERSION \
--zone=us-east1-b \
--tags kafka-connect \
--container-image confluentinc/cp-kafka-connect:5.0.0 \
--container-env=^±^CONNECT_BOOTSTRAP_SERVERS=CCLOUD_BROKER_HOST:9092±CONNECT_REST_PORT=8083±CONNECT_GROUP_ID=compose-connect-group-source-$DEPLOYMENT_VERSION±CONNECT_CONFIG_STORAGE_TOPIC=docker-connect-configs-source-$DEPLOYMENT_VERSION±CONNECT_OFFSET_STORAGE_TOPIC=docker-connect-offsets-source-$DEPLOYMENT_VERSION±CONNECT_STATUS_STORAGE_TOPIC=docker-connect-status-source-$DEPLOYMENT_VERSION±CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_REST_ADVERTISED_HOST_NAME=localhost±CONNECT_LOG4J_ROOT_LOGLEVEL=INFO±CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR±CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3±CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3±CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3±CONNECT_PLUGIN_PATH=/usr/share/java,/u01/connectors/,/usr/share/confluent-hub-components±CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https±CONNECT_SASL_MECHANISM=PLAIN±CONNECT_REQUEST_TIMEOUT_MS=20000±CONNECT_RETRY_BACKOFF_MS=500±CONNECT_SECURITY_PROTOCOL=SASL_SSL±CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https±CONNECT_CONSUMER_SASL_MECHANISM=PLAIN±CONNECT_CONSUMER_REQUEST_TIMEOUT_MS=20000±CONNECT_CONSUMER_RETRY_BACKOFF_MS=500±CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL±CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https±CONNECT_PRODUCER_SASL_MECHANISM=PLAIN±CONNECT_PRODUCER_REQUEST_TIMEOUT_MS=20000±CONNECT_PRODUCER_RETRY_BACKOFF_MS=500±CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL±CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME\"\ password=\"CCLOUD_PASSWORD\"\;±CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME"\ password=\"CCLOUD_PASSWORD\"\;±CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME"\ password=\"CCLOUD_PASSWORD\"\; \
--container-restart-policy=never \
--container-command=/bin/bash \
--container-arg=-c \
--container-arg=echo\ Installing\ unzip...\ \&\&\ curl\ -so\ unzip.deb\ http://ftp.br.debian.org/debian/pool/main/u/unzip/unzip_6.0-16\+deb8u3_amd64.deb\ \&\&\ dpkg\ -i\ unzip.deb\ \&\&\ echo\ Downloading\ connector...\ \&\&\ curl\ -so\ kafka-connect-rest.zip\ https://storage.googleapis.com/rmoff-connectors/kafka-connect-rest.zip\ \&\&\ echo\ Making\ folder\ for\ connector...\ \&\&\ mkdir\ -p\ /u01/connectors/\ \&\&\ echo\ Unzipping\ connector...\ \&\&\ unzip\ -j\ kafka-connect-rest.zip\ -d\ /u01/connectors/kafka-connect-rest\ \&\&\ echo\ Launching\ Connect...\ \&\&\ /etc/confluent/docker/run
Take the external IP of the instance created, and set it as an environment variable:
export CONNECT_HOST_SOURCE=xx.xx.xx.xx
Note that this stuff the GCP creds into the container-arg
. You’ll need to replace them with your own.
gcloud beta compute instances create-with-container rmoff-connect-gbq-gcs-$DEPLOYMENT_VERSION \
--zone=us-east1-b \
--tags kafka-connect \
--container-image confluentinc/cp-kafka-connect:5.0.0 \
--container-env=^±^CONNECT_BOOTSTRAP_SERVERS=CCLOUD_BROKER_HOST:9092±CONNECT_REST_PORT=8083±CONNECT_GROUP_ID=compose-connect-group-gbq-gcs-$DEPLOYMENT_VERSION±CONNECT_CONFIG_STORAGE_TOPIC=docker-connect-configs-gbq-gcs-$DEPLOYMENT_VERSION±CONNECT_OFFSET_STORAGE_TOPIC=docker-connect-offsets-gbq-gcs-$DEPLOYMENT_VERSION±CONNECT_STATUS_STORAGE_TOPIC=docker-connect-status-gbq-gcs-$DEPLOYMENT_VERSION±CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_REST_ADVERTISED_HOST_NAME=localhost±CONNECT_LOG4J_ROOT_LOGLEVEL=INFO±CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR±CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3±CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3±CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3±CONNECT_PLUGIN_PATH=/usr/share/java,/u01/connectors/,/usr/share/confluent-hub-components±CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https±CONNECT_SASL_MECHANISM=PLAIN±CONNECT_REQUEST_TIMEOUT_MS=20000±CONNECT_RETRY_BACKOFF_MS=500±CONNECT_SECURITY_PROTOCOL=SASL_SSL±CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https±CONNECT_CONSUMER_SASL_MECHANISM=PLAIN±CONNECT_CONSUMER_REQUEST_TIMEOUT_MS=20000±CONNECT_CONSUMER_RETRY_BACKOFF_MS=500±CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL±CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https±CONNECT_PRODUCER_SASL_MECHANISM=PLAIN±CONNECT_PRODUCER_REQUEST_TIMEOUT_MS=20000±CONNECT_PRODUCER_RETRY_BACKOFF_MS=500±CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL±CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME\"\ password=\"CCLOUD_PASSWORD\"\;±CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME\"\ password=\"CCLOUD_PASSWORD\"\;±CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME\"\ password=\"CCLOUD_PASSWORD\"\; \
--container-restart-policy=never \
--container-command=/bin/bash \
--container-arg=-c \
--container-arg=echo\ \'\{\ \ \ \"type\":\ \"service_account\",\ \ \ \"project_id\":\ \"GCP_PROJECT_NAME\",\ \ \ \"private_key_id\":\ \"GCP_PRIVATE_KEY_ID\",\ \ \"private_key\":\ \"-----BEGIN\ PRIVATE\ KEY-----\\nGCP_PRIVATE_KEY\\n-----END\ PRIVATE\ KEY-----\\n\",\ \ \ \"client_email\":\ \"GCP_USER@GCP_PROJECT_NAME.iam.gserviceaccount.com\",\ \ \ \"client_id\":\ \"GCP_USER_ID\",\ \ \ \"auth_uri\":\ \"https://accounts.google.com/o/oauth2/auth\",\ \ \ \"token_uri\":\ \"https://accounts.google.com/o/oauth2/token\",\ \ \ \"auth_provider_x509_cert_url\":\ \"https://www.googleapis.com/oauth2/v1/certs\",\ \ \ \"client_x509_cert_url\":\ \"https://www.googleapis.com/robot/v1/metadata/x509/rmoff-gcs\%40GCP_PROJECT_NAME.iam.gserviceaccount.com\"\ \}\'\ \>\ /root/creds/gcp_creds.json\ \&\&\ confluent-hub\ install\ --no-prompt\ confluentinc/kafka-connect-gcs:5.0.0\ \&\&\ confluent-hub\ install\ --no-prompt\ wepay/kafka-connect-bigquery:1.1.0\ \&\&\ /etc/confluent/docker/run
Take the external IP of the instance created, and set it as an environment variable:
export CONNECT_HOST_SINK=xx.xx.xx.xx
export HOST=$CONNECT_HOST_SOURCE
export PORT=8083
export ENDPOINT=connectors
bash -c 'echo "Waiting for host to start listening on $HOST ⏳ ";while [ $(curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT) -eq 000 ];do curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT;date;sleep 5;done;nc -vz $HOST $PORT'
export HOST=$CONNECT_HOST_SINK
export PORT=8083
export ENDPOINT=connectors
bash -c 'echo "Waiting for host to start listening on $HOST ⏳ ";while [ $(curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT) -eq 000 ];do curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT;date;sleep 5;done;nc -vz $HOST $PORT'
export HOST=$SCHEMA_REGISTRY_HOST
export PORT=8081
export ENDPOINT=
bash -c 'echo "Waiting for host to start listening on $HOST ⏳ ";while [ $(curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT) -eq 000 ];do curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT;date;sleep 5;done;nc -vz $HOST $PORT'
export HOST=$KSQL_SERVER_HOST
export PORT=8088
export ENDPOINT=
bash -c 'echo "Waiting for host to start listening on $HOST ⏳ ";while [ $(curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT) -eq 000 ];do curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT;date;sleep 5;done;nc -vz $HOST $PORT'
Connect to KSQL Server from CLI:
docker run --rm --interactive --tty confluentinc/cp-ksql-cli:5.0.0 http://$KSQL_SERVER_HOST:8088
Get stdout:
gcloud compute instances get-serial-port-output my-vm-name-goes-here
Docker output:
$ gcloud logging read "jsonPayload.container.name=/my-vm-name-goes-here AND logName=projects/my-project-name/logs/gcplogs-docker-driver" --format=json --order=asc --freshness=1h --limit=100|jq '.[] | .jsonPayload.data '
The environment variables for Docker images are combined into a single --container-env
argument, and the default comma-separation is overriden to use ±
instead (±
) since commas are required as some of the environment values. See docs for more details.
To write to GCS or GBQ the Docker container needs access to GCP credentials, which are a JSON file from GCP. Options considered:
-
Bake into Docker image
-
Yuck
-
-
Put on GCS and pull into container
-
Very unsecure if public ACL
-
If not public ACL, then how do you auth the container to pull them? VM Scope would work for the host, but then you’d need to pull them from the host and pass to the container
-
-
VM Scope / service account
-
Doesn’t look like these are inherited by the container
-
-
Embed in the container startup arguments
-
Messy but works. The GCP Web UI console doesn’t permit container argument strings > 2049 (weird huh) but works fine passed through CLI:
# POC for getting creds into continer gcloud beta compute --project=GCP_PROJECT_NAME instances create-with-container cli-test-18 --zone=us-east1-b \ --container-image=confluentinc/cp-kafka-connect:5.0.0 \ --container-restart-policy=never \ --container-command=/bin/bash \ --container-arg=-c \ --container-arg=echo\ \'\{\ \ \ \"type\":\ \"service_account\",\ \ \ \"project_id\":\ \"GCP_PROJECT_NAME\",\ \ \ \"private_key_id\":\ \"GCP_PRIVATE_KEY_ID\",\ \ \"private_key\":\ \"-----BEGIN\ PRIVATE\ KEY-----\\nGCP_PRIVATE_KEY\\n-----END\ PRIVATE\ KEY-----\\n\",\ \ \ \"client_email\":\ \"GCP_USER@GCP_PROJECT_NAME.iam.gserviceaccount.com\",\ \ \ \"client_id\":\ \"GCP_USER_ID\",\ \ \ \"auth_uri\":\ \"https://accounts.google.com/o/oauth2/auth\",\ \ \ \"token_uri\":\ \"https://accounts.google.com/o/oauth2/token\",\ \ \ \"auth_provider_x509_cert_url\":\ \"https://www.googleapis.com/oauth2/v1/certs\",\ \ \ \"client_x509_cert_url\":\ \"https://www.googleapis.com/robot/v1/metadata/x509/GCP_USER\%40GCP_PROJECT_NAME.iam.gserviceaccount.com\"\ \}\'\ \>\ /tmp/gcp_creds.json\ \&\&\ cat\ /tmp/gcp_creds.json\ \&\&\ sleep\ 6000
-