Skip to content

Commit

Permalink
Adding PyFlink example
Browse files Browse the repository at this point in the history
  • Loading branch information
gunnarmorling committed Dec 6, 2023
1 parent b9ac689 commit aea7248
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 2 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ This repository contains examples of use cases that utilize Decodable streaming
|[Oracle CDC](oracle_cdc/)| We configure a Oracle AWS RDS with LogMiner. We then stand up a Debezium server to read change events into AWS Kinesis |
|[DynamoDb CDC](dynamodb_cdc/)| We configure a DynamoDB to send change data to Kinesis. Then we read those changes into Decodable for transformation or replication. |
|[ Logical Decoding Message Examples](postgres-logical-decoding)| We show how to retrieve logical decoding messages from the Postgres WAL |
|[ GitHub Webhooks](github-webhooks)| We show how to process GitHub Webhook events using the Decodable REST source connector |
|[GitHub Webhooks](github-webhooks)| We show how to process GitHub Webhook events using the Decodable REST source connector |
|[PyFlink](pyflink)| We run a basic PyFlink job on Kubernetes |

## License

Expand Down
2 changes: 1 addition & 1 deletion github-webhooks/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Processing GitHub Webhooks With Decodable

This repository shows how to process GitHub Webhooks with Decodable.
It accompanies the blog post <todo>.
It accompanies this [blog post](https://www.decodable.co/blog/processing-github-webhooks-with-decodable).

## Prerequisites

Expand Down
4 changes: 4 additions & 0 deletions pyflink/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM decodable-examples/pyflink-base:latest

RUN wget -P /opt/flink/usrlib https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/flink-sql-connector-kafka-3.0.2-1.18.jar
ADD --chown=flink:flink pyflink_hello_world.py /opt/flink/usrlib/pyflink_hello_world.py
12 changes: 12 additions & 0 deletions pyflink/Dockerfile.pyflink-base
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM flink:1.18.0

RUN rm -rf $JAVA_HOME
RUN /bin/sh -c set -eux; ARCH="$(dpkg --print-architecture)"; case "${ARCH}" in aarch64|arm64) ESUM='8c3146035b99c55ab26a2982f4b9abd2bf600582361cf9c732539f713d271faf'; BINARY_URL='https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.21%2B9/OpenJDK11U-jdk_aarch64_linux_hotspot_11.0.21_9.tar.gz'; ;; amd64|i386:x86-64) ESUM='60ea98daa09834fdd3162ca91ddc8d92a155ab3121204f6f643176ee0c2d0d5e'; BINARY_URL='https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.21%2B9/OpenJDK11U-jdk_x64_linux_hotspot_11.0.21_9.tar.gz'; ;; armhf|arm) ESUM='a64b005b84b173e294078fec34660ed3429d8c60726a5fb5c140e13b9e0c79fa'; BINARY_URL='https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.21%2B9/OpenJDK11U-jdk_arm_linux_hotspot_11.0.21_9.tar.gz'; ;; ppc64el|powerpc:common64) ESUM='262ff98d6d88a7c7cc522cb4ec4129491a0eb04f5b17dcca0da57cfcdcf3830d'; BINARY_URL='https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.21%2B9/OpenJDK11U-jdk_ppc64le_linux_hotspot_11.0.21_9.tar.gz'; ;; s390x|s390:64-bit) ESUM='bc67f79fb82c4131d9dcea32649c540a16aa380a9726306b9a67c5ec9690c492'; BINARY_URL='https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.21%2B9/OpenJDK11U-jdk_s390x_linux_hotspot_11.0.21_9.tar.gz'; ;; *) echo "Unsupported arch: ${ARCH}"; exit 1; ;; esac; wget --progress=dot:giga -O /tmp/openjdk.tar.gz ${BINARY_URL}; echo "${ESUM} */tmp/openjdk.tar.gz" | sha256sum -c -; mkdir -p "$JAVA_HOME"; tar --extract --file /tmp/openjdk.tar.gz --directory "$JAVA_HOME" --strip-components 1 --no-same-owner ; rm -f /tmp/openjdk.tar.gz ${JAVA_HOME}/lib/src.zip; find "$JAVA_HOME/lib" -name '*.so' -exec dirname '{}' ';' | sort -u > /etc/ld.so.conf.d/docker-openjdk.conf; ldconfig; java -Xshare:dump;

# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3 python3-pip python3-dev && \
rm -rf /var/lib/apt/lists/*

# install PyFlink
RUN pip3 install apache-flink==1.18.0
4 changes: 4 additions & 0 deletions pyflink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Getting Started With PyFlink On Kubernetes

This example project shows how to run PyFlink jobs on Kubernetes.
It accompanies this [blog post](TODO).
6 changes: 6 additions & 0 deletions pyflink/kind-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
42 changes: 42 additions & 0 deletions pyflink/pyflink-hello-world.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: pyflink-hello-world
spec:
image: decodable-examples/pyflink-hello-world:latest
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/opt/flink-python-1.18.0.jar # Note, this jarURI is actually a placeholder
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/pyflink_hello_world.py"]
parallelism: 1
upgradeMode: stateless
69 changes: 69 additions & 0 deletions pyflink/pyflink_hello_world.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import sys
import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

def pyflink_hello_world():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

t_env = StreamTableEnvironment.create(stream_execution_environment=env)

kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'flink-sql-connector-kafka-3.0.2-1.18.jar')

t_env.get_config()\
.get_configuration()\
.set_string("pipeline.jars", "file://{}".format(kafka_jar))

t_env.execute_sql("""
CREATE TABLE orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '4'
)""")

t_env.execute_sql("""
CREATE TABLE orders_sink (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092',
'properties.group.id' = 'orders-sink',
'format' = 'json'
)""")

t_env.execute_sql("""
INSERT INTO orders_sink SELECT * FROM orders""")


if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
pyflink_hello_world()

0 comments on commit aea7248

Please sign in to comment.