Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segmentation fault trying to capture SIGINT #12

Open
attdona opened this issue May 11, 2021 · 5 comments
Open

Segmentation fault trying to capture SIGINT #12

attdona opened this issue May 11, 2021 · 5 comments

Comments

@attdona
Copy link

attdona commented May 11, 2021

In my consumer I'm not able to capture SIGINT and exit in a controlled way: instead of capturing an InterruptException I got a segmentation fault.

Julia Version 1.6.0
Commit f9720dc2eb (2021-03-24 12:55 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Xeon(R) CPU E3-1225 v5 @ 3.30GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-11.0.1 (ORCJIT, skylake)

Below a minimal working example, consume.jl, for reproducing the problem:

using RDKafka

Base.exit_on_sigint(false)

function main()

    try
        c = KafkaConsumer("localhost:9092", "my-consumer-group")
        parlist = [("quickstart-events", 0)]
        subscribe(c, parlist)
        timeout_ms = 1000
        while true
            msg = poll(String, String, c, timeout_ms)
            @show(msg)
        end
    catch e
        @info e
    end
end

main()

result:

$ julia consume.jl
$ Ctrl-C
signal (11): Segmentation fault
in expression starting at /home/adona/dev/juliadev/rdkafka/src/consume.jl:21
pthread_mutex_lock at /lib/x86_64-linux-gnu/libpthread.so.0 (unknown line)
mtx_lock at /home/adona/.julia/artifacts/4bf773d4657d4a20fea405841bd2c744c3faf2a8/lib/librdkafka.so (unknown line)
rd_kafka_q_serve at /home/adona/.julia/artifacts/4bf773d4657d4a20fea405841bd2c744c3faf2a8/lib/librdkafka.so (unknown line)
rd_kafka_poll at /home/adona/.julia/artifacts/4bf773d4657d4a20fea405841bd2c744c3faf2a8/lib/librdkafka.so (unknown line)
kafka_poll at /home/adona/.julia/packages/RDKafka/GlW4o/src/wrapper.jl:114 [inlined]
#3 at /home/adona/.julia/packages/RDKafka/GlW4o/src/client.jl:52 [inlined]
macro expansion at ./asyncevent.jl:252 [inlined]
#583 at ./task.jl:406
unknown function (ip: 0x7f4949c0d1ac)
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
jl_apply at /buildworker/worker/package_linux64/build/src/julia.h:1703 [inlined]
start_task at /buildworker/worker/package_linux64/build/src/task.c:839
unknown function (ip: (nil))
Allocations: 2788065 (Pool: 2786867; Big: 1198); GC: 4
Segmentation fault (core dumped)
@dfdx
Copy link
Owner

dfdx commented May 13, 2021

Thanks for reporting this! The problem is not 100% reproducible, but I think it comes from our finalizers - before Julia object is garbage-collected, we destroy the corresponding C objects. But librdkafka seems to also catch SIGINT before us and destroy the resources on its own, leaving us with invalid handles. Unfortunately I can't see any API to check if a handler is still valid to include it into finalizers.

Is there a reason you want to terminate the process with SIGINT instead of, say, checking a terminating flag in the consumption loop?

Relevant docs: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#termination

@attdona
Copy link
Author

attdona commented May 14, 2021

SIGINT handling was my first approach because my application is a background process that is managed by a supervisor, in my case supervisord.

The default mechanism of stopping a process by a supervisor is sending a signal to the process.

Apart from that I think my overall approach has to be changed because, for what I understand, librdkafka manages it own set of threads and an internal I/O event loop that goes in addition with the libuv event loop of Julia.

The presence of two event loops, and the internal houskeeping of threads and async tasks of Julia and the "out of julia control" librdkafka threads, is probably a source of potential issues.

@dfdx
Copy link
Owner

dfdx commented May 14, 2021

According to the documentation:

librdkafka uses multiple threads internally to fully utilize modern hardware. The API is completely thread-safe and the calling application may call any of the API functions from any of its own threads at any time.

I don't think there's additional (asynchronous) event loop in librdkafka, just old plain synchronous threads, 1 main thread and one thread per broker. In a typical scenario you start a consumer and run it for a long time without interruptions, so I don't see any source for conflicts between threads or asynchronous tasks here. Segfault instead of SIGINT is an unfortunate detail which I hope we will find a way to deal with, but if it's caused by supervisord killing the service, there result seems to be the same - the program is stopped and the resources are released. Do you see any other reasons to worry about multithreading in librdkafka?

@attdona
Copy link
Author

attdona commented May 15, 2021

Thanks! looking at the sources there is not an event loop in librdkafa, I probably got confused from a misinterpretation of some test case.

Based on this, I don't see any others reason to worry about multithreading in librdkfka.

@attdona
Copy link
Author

attdona commented May 17, 2021

I've a patch that seems to works correctly regarding this issue.

I've removed all finalizer call because for what I know it is hard to say in what orders finalizer get called
and this may cause problems with the correct order of api call expected by librdkafka:

https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#high-level-kafkaconsumer

In the case of consumer this implies that he user has to explicitly close the consumer:

Base.exit_on_sigint(false)

consumer = KafkaConsumer("localhost:9092", "my-consumer-group")
try
    parlist = [("my_partition", 0)]
    subscribe(consumer, parlist)
    timeout_ms = 3000
    while true
        msg = poll(String, String, consumer, timeout_ms)
        @show(msg)
    end
catch e
    # just trace InterruptException
    @info e
finally
    close(consumer)
end

Another source of problem was the timer kafka_poll inside KafkaClient function.

I'm sending a Pull Request ...

Attilio

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants