-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmy_streaming_kafka_pipeline.py
39 lines (25 loc) · 1.24 KB
/
my_streaming_kafka_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import argparse
import logging
from apache_beam import PCollection
from apache_beam.options.pipeline_options import PipelineOptions
from typing import List
import apache_beam as beam
from mydofns.kafka_sdfn_streaming import ReadPartitionsDoFn, ProcessKafkaPartitionsDoFn
TOPIC = "beam-topic"
def run_pipeline(topic: str, bootstrap_server: List[str], beam_options):
pipeline_options: PipelineOptions = PipelineOptions(beam_options)
with beam.Pipeline(options=pipeline_options) as p:
ps: PCollection[int] = p | beam.Create([1]) | "Check partitions" >> beam.ParDo(
ReadPartitionsDoFn(topic, bootstrap_server))
msgs: PCollection[str] = ps | "Read from Kafka" >> beam.ParDo(
ProcessKafkaPartitionsDoFn(topic, bootstrap_server))
msgs | beam.Map(lambda x: logging.info(x))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument("--bootstrap", type=str, required=False)
parser.add_argument("--topic", type=str, required=False, default=TOPIC)
my_args, beam_args = parser.parse_known_args()
run_pipeline(topic=my_args.topic,
bootstrap_server=my_args.bootstrap,
beam_options=beam_args)