Skip to content

Commit

Permalink
Schema refactor (#17)
Browse files Browse the repository at this point in the history
* Refactored to add support for json. Added experimental support for tracking offsets. Moved to require schemas to be supplied.

* Modified scope to ensure that jackson is in target directory.

* Updated the docker compose to the latest version

* Updated data files with changes from offsets.

* Added documentation test.

* Bumped version to 0.10.2.0-cp1.

* Added support to throw a contextual exception when a record cannot be parsed. This will throw the record and field.

* Added support to write records with a timestamp.

* Added description to connectors.

* Added generators to aid in building schemas for the input files. Added a command line utility as well.

* Added validation of character sets.

* Added support to dynamically generate schemas if `schema.generation.enabled` is set to true.

* Pulled out jfairy because it's no longer used.

* Updated the documentation.

* Modified so SimpleDateFormat is being set with the correct timezone. This was causing tests to fail in other timezones like UTC.
  • Loading branch information
jcustenborder authored May 29, 2017
1 parent 2c1ce07 commit f14906f
Show file tree
Hide file tree
Showing 78 changed files with 50,716 additions and 3,871 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
target
*.iml
24 changes: 3 additions & 21 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -1,23 +1,5 @@
#!groovy
node {
def mvnBuildNumber = "0.1.${env.BUILD_NUMBER}"
@Library('jenkins-pipeline') import com.github.jcustenborder.jenkins.pipeline.KafkaConnectPipeline

def mvnHome = tool 'M3'

checkout scm

if (env.BRANCH_NAME == 'master') {
stage 'versioning'
sh "${mvnHome}/bin/mvn -B versions:set -DgenerateBackupPoms=false -DnewVersion=${mvnBuildNumber}"
}

stage 'build'
sh "${mvnHome}/bin/mvn -B -P maven-central clean verify package"

junit '**/target/surefire-reports/TEST-*.xml'

if (env.BRANCH_NAME == 'master') {
stage 'publishing'
sh "${mvnHome}/bin/mvn -B -P github,maven-central deploy"
}
}
def pipe = new KafkaConnectPipeline()
pipe.execute()
303 changes: 256 additions & 47 deletions README.md

Large diffs are not rendered by default.

26 changes: 21 additions & 5 deletions bin/debug.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,29 @@
# limitations under the License.
#

: ${SUSPEND:='n'}
: ${INPUT_PATH:='/tmp/spooldir/input'}
: ${ERROR_PATH:='/tmp/spooldir/error'}
: ${FINISHED_PATH:='/tmp/spooldir/finished'}

set -e

mvn clean package
export KAFKA_JMX_OPTS="-Xdebug -agentlib:jdwp=transport=dt_socket,server=y,suspend=${SUSPEND},address=5005"
export CLASSPATH="$(find target/kafka-connect-target/usr/share/java -type f -name '*.jar' | tr '\n' ':')"

if [ ! -d "${INPUT_PATH}" ]; then
mkdir -p "${INPUT_PATH}"
fi

export CLASSPATH="$(find `pwd`/target/kafka-connect-spooldir-1.0-SNAPSHOT-package/share/java/ -type f -name '*.jar' | tr '\n' ':')"
#export KAFKA_JMX_OPTS='-Xdebug -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005'
if [ ! -d "${ERROR_PATH}" ]; then
mkdir -p "${ERROR_PATH}"
fi

mkdir -p /tmp/spooldir/input /tmp/spooldir/finished /tmp/spooldir/error
if [ ! -d "${FINISHED_PATH}" ]; then
mkdir -p "${FINISHED_PATH}"
fi

cp src/test/resources/io/confluent/kafka/connect/source/MOCK_DATA.csv /tmp/spooldir/input
cp src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/csv/FieldsMatch.data "${INPUT_PATH}/FieldsMatch.csv"

$CONFLUENT_HOME/bin/connect-standalone connect/connect-avro-docker.properties config/CSVExample.properties
connect-standalone config/connect-avro-docker.properties config/CSVExample.properties
27 changes: 0 additions & 27 deletions bin/suspend.sh

This file was deleted.

86 changes: 0 additions & 86 deletions checkstyle/checkstyle.xml

This file was deleted.

26 changes: 26 additions & 0 deletions config/CSVExample.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright © 2016 Jeremy Custenborder ([email protected])
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name=CsvSpoolDir
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector
input.file.pattern=^.*\.csv$
finished.path=/tmp/spooldir/finished
halt.on.error=false
topic=testing
key.schema={"name":"com.example.users.UserKey","type":"STRUCT","isOptional":false,"fieldSchemas":{"id":{"type":"INT64","isOptional":false}}}
value.schema={"name":"com.example.users.User","type":"STRUCT","isOptional":false,"fieldSchemas":{"id":{"type":"INT64","isOptional":false},"first_name":{"type":"STRING","isOptional":true},"last_name":{"type":"STRING","isOptional":true},"email":{"type":"STRING","isOptional":true},"gender":{"type":"STRING","isOptional":true},"ip_address":{"type":"STRING","isOptional":true},"last_login":{"name":"org.apache.kafka.connect.data.Timestamp","type":"INT64","version":1,"isOptional":true},"account_balance":{"name":"org.apache.kafka.connect.data.Decimal","type":"BYTES","version":1,"parameters":{"scale":"2"},"isOptional":true},"country":{"type":"STRING","isOptional":true},"favorite_color":{"type":"STRING","isOptional":true}}}
csv.first.row.as.header=true
32 changes: 0 additions & 32 deletions config/CsvWithSchema.properties

This file was deleted.

33 changes: 0 additions & 33 deletions config/CsvWithoutSchema.properties

This file was deleted.

25 changes: 25 additions & 0 deletions config/JsonExample.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Copyright © 2016 Jeremy Custenborder ([email protected])
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name=JsonSpoolDir
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector
input.file.pattern=^.*\.json$
finished.path=/tmp/spooldir/finished
halt.on.error=false
topic=testing
key.schema={"name":"com.example.users.UserKey","type":"STRUCT","isOptional":false,"fieldSchemas":{"id":{"type":"INT64","isOptional":false}}}
value.schema={"name":"com.example.users.User","type":"STRUCT","isOptional":false,"fieldSchemas":{"id":{"type":"INT64","isOptional":false},"first_name":{"type":"STRING","isOptional":true},"last_name":{"type":"STRING","isOptional":true},"email":{"type":"STRING","isOptional":true},"gender":{"type":"STRING","isOptional":true},"ip_address":{"type":"STRING","isOptional":true},"last_login":{"name":"org.apache.kafka.connect.data.Timestamp","type":"INT64","version":1,"isOptional":true},"account_balance":{"name":"org.apache.kafka.connect.data.Decimal","type":"BYTES","version":1,"parameters":{"scale":"2"},"isOptional":true},"country":{"type":"STRING","isOptional":true},"favorite_color":{"type":"STRING","isOptional":true}}}
File renamed without changes.
12 changes: 7 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,30 @@
version: "2"
services:
zookeeper:
image: confluent/zookeeper
image: confluentinc/cp-zookeeper:3.1.2-1
ports:
- "2181:2181"
environment:
zk_id: "1"
ZOOKEEPER_CLIENT_PORT: 2181
network_mode: "host"
kafka:
image: confluent/kafka
image: confluentinc/cp-kafka:3.1.2-1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: "confluent:2181"
KAFKA_ADVERTISED_LISTENERS: "plaintext://confluent:9092"
network_mode: "host"
schema-registry:
image: confluent/schema-registry
image: confluentinc/cp-schema-registry:3.1.2-1
depends_on:
- kafka
- zookeeper
ports:
- "8081:8081"
environment:
SR_KAFKASTORE_CONNECTION_URL: "confluent:2181"
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "confluent:2181"
SCHEMA_REGISTRY_HOST_NAME: confluent
network_mode: "host"
Loading

0 comments on commit f14906f

Please sign in to comment.