Skip to content

FAQ:UDFs

Andy Coates edited this page Dec 13, 2019 · 10 revisions

Can user defined functions talk to external systems or block for long periods of time?

This is a common question raised on the Community Slack channel and through other media. Users want to call out to external services from their own UDFs, UDAFs or UDTFs, or potentially perform some other task that may take a long time or introduce a deliberate delay, say ensuring messages aren't processed for 30 minutes based on the event time in the message.

The question is, can this work with ksqlDB?

The short answer is that this is not recommended.

The slightly longer answer is that its not recommended because it will first and foremost kill the throughput and multiply the processing latency of your system, and that's just the happy path. If the delay becomes high enough, e.g. if the remote system becomes unavailable for some time, then you risk destabilizing your processing.

UDFs in ksqlDB are expected to perform relatively quick computations. Their invocation is performed inline with the flow of processing rows of data. Under the hood, KSQL is using Kafka Streams, which is making use of Kafka Consumer Groups. Consumer Groups are Kafka's way spreading the work across multiple client application instances and threads; in this case KSQL servers. Consumer Groups are designed to detect group members that are not making progress or have failed. When such a member is detected, it is ejected from the group and the work assigned to it given to another member. The challenge with long running UDFs is that they can make it appear as though KSQL is not making any progress.

A slow running UDF can cause the data being processed to be assigned to a different server or thread. The new thread will then invoke the UDF and, potentially, block, causing the data to be assigned to the next and the next thread util, potentially, all threads processing the query were blocked inside your UDF. This may be acceptable if the remote system your UDF is calling out to is completely unavailable, but less ideal if the remote system has only a partial outage, say 1-in-10 servers are running really slowly, of if you're UDF is trying to delay processing by some amount of event-time: one event that needs delaying may block all threads, stopping other events, that don't need delaying, from being processed.

OK, so hopefully that's helped give a high level overview of the problem. How about a little help? There are some patterns that may help.

  1. It may be possible to tweak the Consumer Group settings to get your use-case to work in a way that is acceptable to you. You can use the SET command to set any consumer properties. At the time of writing, session.timeout.ms is probably a good one to start with.
  2. Could your UDF output some special value if the remote service is unavailable? Could rows with this special value be post-processed in some way to be recovered?
  3. If you switch to writing a Kafka Streams app, you could choose to store the message in a state-store if the remote service was unavailable, or the message needed delaying for some other reason.
  4. Failing that, you're likely looking at writing a custom application, in any of the many languages for which there are a Kafka client.
Clone this wiki locally