Provision t2.medium machine, open port 22 to IP address from which going to configure
-
Connect as ec2-user
-
sudo yum update -y
wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u141-b15/336fa29ff2bb4ef291e347e091f7f4a7/jdk-8u141-linux-x64.rpm
sudo yum install -y jdk-8u141-linux-x64.rpm
wget http://packages.confluent.io/archive/5.3/confluent-community-5.3.1-2.12.tar.gz
tar -xvf confluent-community-5.3.1-2.12.tar.gz
sudo chmod ugo+rwx /var/log
group.id=connect-cluster
replication.factor=3
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
bootstrap.servers=<CCLOUD_HOST>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CCLOUD_API_KEY>\" password=\"<CCLOUD_API_SECRET>\";
# Connect producer and consumer specific configuration
producer.ssl.endpoint.identification.algorithm=https
producer.confluent.monitoring.interceptor.ssl.endpoint.identification.algorithm=https
consumer.ssl.endpoint.identification.algorithm=https
consumer.confluent.monitoring.interceptor.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
consumer.sasl.mechanism=PLAIN
consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
consumer.security.protocol=SASL_SSL
consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CCLOUD_API_KEY>\" password=\"<CCLOUD_API_SECRET>\";
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CCLOUD_API_KEY>\" password=\"<CCLOUD_API_SECRET>\";
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CCLOUD_API_KEY>\" password=\"<CCLOUD_API_SECRET>\";
consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CCLOUD_API_KEY>\" password=\"<CCLOUD_API_SECRET>\";
# Confluent Schema Registry for Kafka Connect
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=<SR_API_KEY>:<SR_API_SECRET>
value.converter.schema.registry.url=<SR_URL>
# Confluent Schema Registry for Kafka Connect
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.schema.registry.basic.auth.user.info=<SR_API_KEY>:<SR_API_SECRET>
key.converter.schema.registry.url=<SR_URL>
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
plugin.path=/home/ec2-user/confluent-5.3.1/share/java/,/home/ec2-user/confluent-5.3.1/share/confluent-hub-components
curl -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/source-mqtt-01/config \
-d '{
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"mqtt.server.uri" : "<url>",
"mqtt.password" : "<pw>",
"mqtt.username" : "<user>",
"mqtt.topics" : "owntracks/#",
"kafka.topic" : "data_mqtt",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max" : "1",
"confluent.topic.bootstrap.servers" : "<CCLOUD_HOST>",
"confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CCLOUD_API_KEY>\" password=\"<CCLOUD_API_SECRET>\";",
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.request.timeout.ms": "20000",
"confluent.topic.retry.backoff.ms": "500"
}'
Check status:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
source | source-mqtt-01 | RUNNING | RUNNING | io.confluent.connect.mqtt.MqttSourceConnector
Create Elasticsearch dynamic mapping template first.
Note
|
This is using Elastic Cloud which requires authorisation. If you do not need authorisation then remove the -u from curl below, and connection.username / connection.password from the Kafka Connect connectors below.
|
curl -XPUT \
-u elastic:<ELASTIC_PASSWORD> \
"https://<ELASTIC_HOST>/_template/kafkaconnect/" \
-H 'Content-Type: application/json' \
-d'{
"index_patterns": "*",
"mappings": { "_default_": { "dynamic_templates": [
{ "dates": { "match": "*_TS", "mapping": { "type": "date" } } },
{ "heights": { "match": "HEIGHT", "mapping": { "type": "float" } } },
{ "locations": { "match": "LOCATION", "mapping": { "type": "geo_point" } } }
]} } }'
Configure Kafka Connect connector.
TOPIC_PREFIX=pksqlc-4x75z
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-elastic-cloud-runner_location-00/config \
-d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "https://<ELASTIC_HOST>",
"connection.username": "elastic",
"connection.password":"<ELASTIC_PASSWORD>",
"type.name": "type.name=_doc",
"behavior.on.malformed.documents": "warn",
"topics": "'"${TOPIC_PREFIX}"'RUNNER_LOCATION",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "addTS,renameTopic",
"transforms.addTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTS.timestamp.field": "EVENT_TS",
"transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.renameTopic.regex": "'"${TOPIC_PREFIX}"'(.*)",
"transforms.renameTopic.replacement": "$1"
}'
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-elastic-cloud-runner_status-00/config \
-d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "https://<ELASTIC_HOST>",
"connection.username": "elastic",
"connection.password":"<ELASTIC_PASSWORD>",
"type.name": "type.name=_doc",
"behavior.on.malformed.documents": "warn",
"topics": "'"${TOPIC_PREFIX}"'RUNNER_STATUS",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "addTS,renameTopic",
"transforms.addTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTS.timestamp.field": "EVENT_TS",
"transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.renameTopic.regex": "'"${TOPIC_PREFIX}"'(.*)",
"transforms.renameTopic.replacement": "$1"
}'
Check status:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | column -s : -t| sed 's/\"//g'| sort
sink | sink-elastic-cloud-runner_location-00 | RUNNING | RUNNING | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
sink | sink-elastic-cloud-runner_status-00 | RUNNING | RUNNING | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
source | source-mqtt-01 | RUNNING | RUNNING | io.confluent.connect.mqtt.MqttSourceConnector