Skip to content

Commit f8f3948

Browse files
rnpridgeonedenhill
authored andcommitted
Transactional Producer API (full EOS)
1 parent 96bc921 commit f8f3948

17 files changed

+1192
-17
lines changed

.appveyor.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
environment:
22
global:
3-
LIBRDKAFKA_NUGET_VERSION: 1.3.0
3+
LIBRDKAFKA_NUGET_VERSION: 1.4.0-RC1c
44
CIBW_SKIP: cp33-* cp34-*
5-
CIBW_TEST_REQUIRES: pytest requests avro
5+
CIBW_TEST_REQUIRES: pytest pytest-timeout requests avro trivup
66
# SDK v7.0 MSVC Express 2008's SetEnv.cmd script will fail if the
77
# /E:ON and /V:ON options are not enabled in the batch script intepreter
88
# See: http://stackoverflow.com/a/13751649/163740

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,5 @@ dl-*
2222
.pytest_cache
2323
staging
2424
tests/docker/conf/tls/*
25+
.idea
26+
.python-version

.travis.yml

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
env:
22
global:
3-
- LIBRDKAFKA_VERSION=v1.3.0
3+
- LIBRDKAFKA_VERSION=v1.4.0-RC1c
44
matrix:
55
include:
66
# Source package verification with Python 2.7
@@ -57,7 +57,7 @@ install:
5757
- tools/install-interceptors.sh
5858
- pip install -U pip && pip install virtualenv
5959
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then python -m ensurepip && virtualenv /tmp/venv && source /tmp/venv/bin/activate ; fi
60-
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install pytest-timeout flake8 ; fi
60+
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install pytest-timeout flake8 trivup; fi
6161
- if [[ -z $CIBW_BEFORE_BUILD ]]; then rm -rf tmp-build ; tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp-build ; fi
6262
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then pip install cibuildwheel --force-reinstall; fi
6363
- if [[ ! -z $EXTRA_PKGS ]]; then pip install $(echo $EXTRA_PKGS) ; fi
@@ -67,8 +67,8 @@ script:
6767
- if [[ -z $CIBW_BEFORE_BUILD ]]; then flake8 ; fi
6868
# Make plugins available for tests
6969
- ldd staging/libs/* || otool -L staging/libs/* || true
70-
- if [[ -z $CIBW_BEFORE_BUILD && $TRAVIS_OS_NAME == "osx" ]]; then DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH:staging/libs py.test -v --timeout 20 --ignore=tmp-build --import-mode append ; fi
71-
- if [[ -z $CIBW_BEFORE_BUILD && $TRAVIS_OS_NAME == "linux" ]]; then LD_LIBRARY_PATH=$LD_LIBRARY_PATH:staging/libs py.test -v --timeout 20 --ignore=tmp-build --import-mode append ; fi
70+
- if [[ -z $CIBW_BEFORE_BUILD && $TRAVIS_OS_NAME == "osx" ]]; then DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH:staging/libs py.test -v --timeout=60 --ignore=tmp-build --import-mode append ; fi
71+
- if [[ -z $CIBW_BEFORE_BUILD && $TRAVIS_OS_NAME == "linux" ]]; then LD_LIBRARY_PATH=$LD_LIBRARY_PATH:staging/libs py.test -v --timeout=60 --ignore=tmp-build --import-mode append ; fi
7272
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then cibuildwheel --output-dir wheelhouse1 && tools/fixup-wheels.sh wheelhouse1 wheelhouse ; fi
7373
- if [[ -n $TRAVIS_TAG && $TRAVIS_OS_NAME == linux && -n $CIBW_BEFORE_BUILD ]]; then tools/test-manylinux.sh ; fi
7474
- if [[ -n $TRAVIS_TAG && $TRAVIS_OS_NAME == osx && -n $CIBW_BEFORE_BUILD ]]; then tools/test-osx.sh; fi

confluent_kafka/src/Producer.c

+309-5
Original file line numberDiff line numberDiff line change
@@ -356,12 +356,12 @@ static int Producer_poll0 (Handle *self, int tmout) {
356356

357357
static PyObject *Producer_poll (Handle *self, PyObject *args,
358358
PyObject *kwargs) {
359-
double tmout;
359+
double tmout = -1.0;
360360
int r;
361361
static char *kws[] = { "timeout", NULL };
362362

363-
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "d", kws, &tmout))
364-
return NULL;
363+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
364+
return NULL;
365365

366366
r = Producer_poll0(self, (int)(tmout * 1000));
367367
if (r == -1)
@@ -398,6 +398,140 @@ static PyObject *Producer_flush (Handle *self, PyObject *args,
398398
return cfl_PyInt_FromInt(qlen);
399399
}
400400

401+
static PyObject *Producer_init_transactions (Handle *self, PyObject *args) {
402+
CallState cs;
403+
rd_kafka_error_t *error;
404+
double tmout = -1.0;
405+
406+
if (!PyArg_ParseTuple(args, "|d", &tmout))
407+
return NULL;
408+
409+
CallState_begin(self, &cs);
410+
411+
error = rd_kafka_init_transactions(self->rk, cfl_timeout_ms(tmout));
412+
413+
if (!CallState_end(self, &cs)) {
414+
if (error) /* Ignore error in favour of callstate exception */
415+
rd_kafka_error_destroy(error);
416+
return NULL;
417+
}
418+
419+
if (error) {
420+
cfl_PyErr_from_error_destroy(error);
421+
return NULL;
422+
}
423+
424+
Py_RETURN_NONE;
425+
}
426+
427+
static PyObject *Producer_begin_transaction (Handle *self) {
428+
rd_kafka_error_t *error;
429+
430+
error = rd_kafka_begin_transaction(self->rk);
431+
432+
if (error) {
433+
cfl_PyErr_from_error_destroy(error);
434+
return NULL;
435+
}
436+
437+
Py_RETURN_NONE;
438+
}
439+
440+
static PyObject *Producer_send_offsets_to_transaction(Handle *self,
441+
PyObject *args) {
442+
CallState cs;
443+
rd_kafka_error_t *error;
444+
PyObject *metadata = NULL, *offsets = NULL;
445+
rd_kafka_topic_partition_list_t *c_offsets;
446+
rd_kafka_consumer_group_metadata_t *cgmd;
447+
double tmout = -1.0;
448+
449+
if (!PyArg_ParseTuple(args, "OO|d", &offsets, &metadata, &tmout))
450+
return NULL;
451+
452+
if (!(c_offsets = py_to_c_parts(offsets)))
453+
return NULL;
454+
455+
if (!(cgmd = py_to_c_cgmd(metadata))) {
456+
rd_kafka_topic_partition_list_destroy(c_offsets);
457+
return NULL;
458+
}
459+
460+
CallState_begin(self, &cs);
461+
462+
error = rd_kafka_send_offsets_to_transaction(self->rk, c_offsets,
463+
cgmd,
464+
cfl_timeout_ms(tmout));
465+
466+
rd_kafka_consumer_group_metadata_destroy(cgmd);
467+
rd_kafka_topic_partition_list_destroy(c_offsets);
468+
469+
if (!CallState_end(self, &cs)) {
470+
if (error) /* Ignore error in favour of callstate exception */
471+
rd_kafka_error_destroy(error);
472+
return NULL;
473+
}
474+
475+
if (error) {
476+
cfl_PyErr_from_error_destroy(error);
477+
return NULL;
478+
}
479+
480+
Py_RETURN_NONE;
481+
}
482+
483+
static PyObject *Producer_commit_transaction(Handle *self, PyObject *args) {
484+
CallState cs;
485+
rd_kafka_error_t *error;
486+
double tmout = -1.0;
487+
488+
if (!PyArg_ParseTuple(args, "|d", &tmout))
489+
return NULL;
490+
491+
CallState_begin(self, &cs);
492+
493+
error = rd_kafka_commit_transaction(self->rk, cfl_timeout_ms(tmout));
494+
495+
if (!CallState_end(self, &cs)) {
496+
if (error) /* Ignore error in favour of callstate exception */
497+
rd_kafka_error_destroy(error);
498+
return NULL;
499+
}
500+
501+
if (error) {
502+
cfl_PyErr_from_error_destroy(error);
503+
return NULL;
504+
}
505+
506+
Py_RETURN_NONE;
507+
}
508+
509+
static PyObject *Producer_abort_transaction(Handle *self, PyObject *args) {
510+
CallState cs;
511+
rd_kafka_error_t *error;
512+
double tmout = -1.0;
513+
514+
if (!PyArg_ParseTuple(args, "|d", &tmout))
515+
return NULL;
516+
517+
CallState_begin(self, &cs);
518+
519+
error = rd_kafka_abort_transaction(self->rk, cfl_timeout_ms(tmout));
520+
521+
if (!CallState_end(self, &cs)) {
522+
if (error) /* Ignore error in favour of callstate exception */
523+
rd_kafka_error_destroy(error);
524+
return NULL;
525+
}
526+
527+
if (error) {
528+
cfl_PyErr_from_error_destroy(error);
529+
return NULL;
530+
}
531+
532+
Py_RETURN_NONE;
533+
}
534+
401535
static PyMethodDef Producer_methods[] = {
402536
{ "produce", (PyCFunction)Producer_produce,
403537
METH_VARARGS|METH_KEYWORDS,
@@ -466,8 +600,178 @@ static PyMethodDef Producer_methods[] = {
466600
{ "list_topics", (PyCFunction)list_topics, METH_VARARGS|METH_KEYWORDS,
467601
list_topics_doc
468602
},
469-
470-
{ NULL }
603+
{ "init_transactions", (PyCFunction)Producer_init_transactions,
604+
METH_VARARGS,
605+
".. py:function: init_transactions([timeout])\n"
606+
"\n"
607+
" Initializes transactions for the producer instance.\n"
608+
"\n"
609+
" This function ensures any transactions initiated by previous\n"
610+
" instances of the producer with the same `transactional.id` are\n"
611+
" completed.\n"
612+
" If the previous instance failed with a transaction in progress\n"
613+
" the previous transaction will be aborted.\n"
614+
" This function needs to be called before any other transactional\n"
615+
" or produce functions are called when the `transactional.id` is\n"
616+
" configured.\n"
617+
"\n"
618+
" If the last transaction had begun completion (following\n"
619+
" transaction commit) but not yet finished, this function will\n"
620+
" await the previous transaction's completion.\n"
621+
"\n"
622+
" When any previous transactions have been fenced this function\n"
623+
" will acquire the internal producer id and epoch, used in all\n"
624+
" future transactional messages issued by this producer instance.\n"
625+
"\n"
626+
" Upon successful return from this function the application has to\n"
627+
" perform at least one of the following operations within \n"
628+
" `transactional.timeout.ms` to avoid timing out the transaction\n"
629+
" on the broker:\n"
630+
" * produce() (et.al)\n"
631+
" * send_offsets_to_transaction()\n"
632+
" * commit_transaction()\n"
633+
" * abort_transaction()\n"
634+
"\n"
635+
" :param float timeout: Maximum time to block in seconds.\n"
636+
"\n"
637+
" :raises: KafkaError: Use exc.args[0].retriable() to check if the\n"
638+
" operation may be retried, else treat the\n"
639+
" error as a fatal error.\n"
640+
},
641+
{ "begin_transaction", (PyCFunction)Producer_begin_transaction,
642+
METH_NOARGS,
643+
".. py:function:: begin_transaction()\n"
644+
"\n"
645+
" Begin a new transaction.\n"
646+
"\n"
647+
" init_transactions() must have been called successfully (once)\n"
648+
" before this function is called.\n"
649+
"\n"
650+
" Any messages produced or offsets sent to a transaction, after\n"
651+
" the successful return of this function will be part of the\n"
652+
" transaction and committed or aborted atomically.\n"
653+
"\n"
654+
" Complete the transaction by calling commit_transaction() or\n"
655+
" Abort the transaction by calling abort_transaction().\n"
656+
"\n"
657+
" :raises: KafkaError: Use exc.args[0].retriable() to check if the\n"
658+
" operation may be retried, else treat the\n"
659+
" error as a fatal error.\n"
660+
},
661+
{ "send_offsets_to_transaction",
662+
(PyCFunction)Producer_send_offsets_to_transaction,
663+
METH_VARARGS,
664+
".. py:function:: send_offsets_to_transaction(positions,"
665+
" group_metadata, [timeout])\n"
666+
"\n"
667+
" Sends a list of topic partition offsets to the consumer group\n"
668+
" coordinator for group_metadata and marks the offsets as part\n"
669+
" of the current transaction.\n"
670+
" These offsets will be considered committed only if the\n"
671+
" transaction is committed successfully.\n"
672+
"\n"
673+
" The offsets should be the next message your application will\n"
674+
" consume, i.e., the last processed message's offset + 1 for each\n"
675+
" partition.\n"
676+
" Either track the offsets manually during processing or use\n"
677+
" consumer.position() (on the consumer) to get the current offsets\n"
678+
" for the partitions assigned to the consumer.\n"
679+
"\n"
680+
" Use this method at the end of a consume-transform-produce loop\n"
681+
" prior to committing the transaction with commit_transaction().\n"
682+
"\n"
683+
" Note: The consumer must disable auto commits\n"
684+
" (set `enable.auto.commit` to false on the consumer).\n"
685+
"\n"
686+
" Note: Logical and invalid offsets (e.g., OFFSET_INVALID) in\n"
687+
" offsets will be ignored. If there are no valid offsets in\n"
688+
" offsets the function will return successfully and no action\n"
689+
" will be taken.\n"
690+
"\n"
691+
" :param list(TopicPartition) offsets: current consumer/processing\n"
692+
" position(offsets) for the\n"
693+
" list of partitions.\n"
694+
" :param object group_metadata: consumer group metadata retrieved\n"
695+
" from the input consumer's\n"
696+
" get_consumer_group_metadata().\n"
697+
" :param float timeout: Amount of time to block in seconds.\n"
698+
"\n"
699+
" :raises: KafkaError: Use exc.args[0].retriable() to check if the\n"
700+
" operation may be retried, or\n"
701+
" exc.args[0].txn_abortable() if the current transaction\n"
702+
" has failed and must be aborted by calling\n"
703+
" abort_transaction() and then start a new transaction\n"
704+
" with begin_transaction().\n"
705+
" Treat any other error as a fatal error.\n"
706+
"\n"
707+
},
708+
{ "commit_transaction", (PyCFunction)Producer_commit_transaction,
709+
METH_VARARGS,
710+
".. py:function:: commit_transaction([timeout])\n"
711+
"\n"
712+
" Commmit the current transaction.\n"
713+
" Any outstanding messages will be flushed (delivered) before\n"
714+
" actually committing the transaction.\n"
715+
"\n"
716+
" If any of the outstanding messages fail permanently the current\n"
717+
" transaction will enter the abortable error state and this\n"
718+
" function will return an abortable error, in this case the\n"
719+
" application must call abort_transaction() before attempting\n"
720+
" a new transaction with begin_transaction().\n"
721+
"\n"
722+
" Note: This function will block until all outstanding messages\n"
723+
" are delivered and the transaction commit request has been\n"
724+
" successfully handled by the transaction coordinator, or until\n"
725+
" the timeout expires, which ever comes first. On timeout the\n"
726+
" application may call the function again.\n"
727+
"\n"
728+
" Note: Will automatically call flush() to ensure all queued\n"
729+
" messages are delivered before attempting to commit the\n"
730+
" transaction. Delivery reports and other callbacks may thus be\n"
731+
" triggered from this method.\n"
732+
"\n"
733+
" :param float timeout: The amount of time to block in seconds.\n"
734+
"\n"
735+
" :raises: KafkaError: Use exc.args[0].retriable() to check if the\n"
736+
" operation may be retried, or\n"
737+
" exc.args[0].txn_abortable() if the current transaction\n"
738+
" has failed and must be aborted by calling\n"
739+
" abort_transaction() and then start a new transaction\n"
740+
" with begin_transaction().\n"
741+
" Treat any other error as a fatal error.\n"
742+
"\n"
743+
},
744+
{ "abort_transaction", (PyCFunction)Producer_abort_transaction,
745+
METH_VARARGS,
746+
".. py:function:: abort_transaction([timeout])\n"
747+
"\n"
748+
" Aborts the current transaction.\n"
749+
" This function should also be used to recover from non-fatal\n"
750+
" abortable transaction errors, when KafkaError.txn_abortable()\n"
751+
" is True.\n"
752+
" \n"
753+
" Any outstanding messages will be purged and fail with\n"
754+
" _PURGE_INFLIGHT or _PURGE_QUEUE.\n"
755+
" \n"
756+
" Note: This function will block until all outstanding messages\n"
757+
" are purged and the transaction abort request has been\n"
758+
" successfully handled by the transaction coordinator, or until\n"
759+
" the timeout expires, which ever comes first. On timeout the\n"
760+
" application may call the function again.\n"
761+
" \n"
762+
" Note: Will automatically call purge() and flush() to ensure\n"
763+
" all queued and in-flight messages are purged before attempting\n"
764+
" to abort the transaction.\n"
765+
"\n"
766+
" :param float timeout: The maximum amount of time to block\n"
767+
" waiting for transaction to abort in seconds.\n"
768+
"\n"
769+
" :raises: KafkaError: Use exc.args[0].retriable() to check if the\n"
770+
" operation may be retried.\n"
771+
" Treat any other error as a fatal error.\n"
772+
"\n"
773+
},
774+
{ NULL }
471775
};
472776

473777

0 commit comments

Comments
 (0)