48
48
49
49
import uuid
50
50
51
- from confluent_kafka import Producer , Consumer
51
+ from confluent_kafka import Producer , Consumer , KafkaError , KafkaException
52
52
53
53
54
+ def error_cb (err ):
55
+ """ The error callback is used for generic client errors. These
56
+ errors are generally to be considered informational as the client will
57
+ automatically try to recover from all errors, and no extra action
58
+ is typically required by the application.
59
+ For this example however, we terminate the application if the client
60
+ is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
61
+ authentication errors (_AUTHENTICATION). """
62
+
63
+ print ("Client error: {}" .format (err ))
64
+ if err .code () == KafkaError ._ALL_BROKERS_DOWN or \
65
+ err .code () == KafkaError ._AUTHENTICATION :
66
+ # Any exception raised from this callback will be re-raised from the
67
+ # triggering flush() or poll() call.
68
+ raise KafkaException (err )
69
+
70
+
71
+ # Create producer
54
72
p = Producer ({
55
73
'bootstrap.servers' : '<ccloud bootstrap servers>' ,
56
74
'sasl.mechanism' : 'PLAIN' ,
57
75
'security.protocol' : 'SASL_SSL' ,
58
76
'sasl.username' : '<ccloud key>' ,
59
- 'sasl.password' : '<ccloud secret>'
77
+ 'sasl.password' : '<ccloud secret>' ,
78
+ 'error_cb' : error_cb ,
60
79
})
61
80
62
81
63
82
def acked (err , msg ):
64
83
"""Delivery report callback called (from flush()) on successful or failed delivery of the message."""
65
84
if err is not None :
66
- print ("failed to deliver message: {}" .format (err .str ()))
85
+ print ('Failed to deliver message: {}' .format (err .str ()))
67
86
else :
68
- print ("produced to: {} [{}] @ {}" .format (msg .topic (), msg .partition (), msg .offset ()))
87
+ print ('Produced to: {} [{}] @ {}' .format (msg .topic (), msg .partition (), msg .offset ()))
88
+
69
89
90
+ for n in range (0 , 10 ):
91
+ # Produce message: this is an asynchronous operation.
92
+ # Upon successful or permanently failed delivery to the broker the
93
+ # callback will be called to propagate the produce result.
94
+ # The delivery callback is triggered from poll() or flush().
95
+ # For long running
96
+ # produce loops it is recommended to call poll() to serve these
97
+ # delivery report callbacks.
98
+ p .produce ('python-test-topic' , value = 'python test value nr {}' .format (n ),
99
+ callback = acked )
70
100
71
- p .produce ('python-test-topic' , value = 'python test value' , callback = acked )
101
+ # Trigger delivery report callbacks from previous produce calls.
102
+ p .poll (0 )
72
103
73
104
# flush() is typically called when the producer is done sending messages to wait
74
105
# for outstanding messages to be transmitted to the broker and delivery report
75
106
# callbacks to get called. For continous producing you should call p.poll(0)
76
107
# after each produce() call to trigger delivery report callbacks.
77
108
p .flush (10 )
78
109
110
+
111
+ # Create consumer
79
112
c = Consumer ({
80
113
'bootstrap.servers' : '<ccloud bootstrap servers>' ,
81
114
'sasl.mechanism' : 'PLAIN' ,
82
115
'security.protocol' : 'SASL_SSL' ,
83
116
'sasl.username' : '<ccloud key>' ,
84
117
'sasl.password' : '<ccloud secret>' ,
85
118
'group.id' : str (uuid .uuid1 ()), # this will create a new consumer group on each invocation.
86
- 'auto.offset.reset' : 'earliest'
119
+ 'auto.offset.reset' : 'earliest' ,
120
+ 'error_cb' : error_cb ,
87
121
})
88
122
89
123
c .subscribe (['python-test-topic' ])
@@ -98,10 +132,10 @@ def acked(err, msg):
98
132
continue
99
133
if msg .error ():
100
134
# Errors are typically temporary, print error and continue.
101
- print (" Consumer error: {}" .format (msg .error ()))
135
+ print (' Consumer error: {}' .format (msg .error ()))
102
136
continue
103
137
104
- print ('consumed : {}' .format (msg .value ()))
138
+ print ('Consumed : {}' .format (msg .value ()))
105
139
106
140
except KeyboardInterrupt :
107
141
pass
0 commit comments