Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failed to send messages of 1MB or greater #16

Open
EnalLS opened this issue Oct 28, 2021 · 10 comments
Open

Failed to send messages of 1MB or greater #16

EnalLS opened this issue Oct 28, 2021 · 10 comments

Comments

@EnalLS
Copy link

EnalLS commented Oct 28, 2021

Hi,
I’m trying to send different sizes message from my producer – with changing the default configuration of max message bytes – till the 1MB message, all sent, and then got error;
When sending messages of 1MB or greater I keep getting an Unknown Error (code: -1) that appears to come from the RDKafka Dll used by the RDKafka.jl package. it reproduces all of the time.

This type and size of messages work with the same Kafka cluster and other non-Julia clients, so it looks like an issue coming from the RDKafka.jl package.

Producer code:

p = KafkaProducer(KAFKA_BROKER_ADDRESS)
partition = 0
# Tried both
p.client.conf["message.max.bytes"] = 36423360
p.client.conf["max.message.bytes"] = 36423360
 
function send_data(message,nameOfdata)
    println("sending $nameOfdata")
    produce(p, KAFKA_TOPIC, partition, nameOfdata, message)
end
 
# Prepare random data with different sizes 
rand_string_10B = randstring(10);
rand_string_100B = randstring(100);
rand_string_1KB = randstring(1024);
rand_string_1MB = randstring(1048576);
rand_string_5MB = randstring(5242880);
rand_string_20MB = randstring(20971520);
 
for i in 1:10
    send_data(rand_string_10B,"10B String")
    send_data(rand_string_100B,"100B String")
    send_data(rand_string_1KB,"1KB String")
    send_data(rand_string_1MB,"1MB String")
    send_data (rand_string_5MB,"15MB String")
    send_data (rand_string_20MB,"20MB String")
end
 

Producer output:

sending 10B String
sending 100B String
sending 1KB String
sending 1MB String
-1
ERROR: LoadError: Produce request failed with error code: -1 : Unknown e
Stacktrace:
[1] error(s::String)
@ Base .\error.jl:33
[2] produce(rkt::Ptr{Nothing}, partition::Int64, key::Vector{UInt8}, pa
@ RDKafka C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\wrappe
[3] produce(kt::RDKafka.KafkaTopic, partition::Int64, key::String, payl
@ RDKafka C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\produc
[4] produce(p::KafkaProducer, topic::String, partition::Int64, key::Str
@ RDKafka C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\produc
[5] prepare_data_and_send_it(message::String, nameOfdata::String)
@ Main c:\S\repos\dummyRepo\JuliaKafka\producer\producerJuliaServer.j
[6] top-level scope
@ c:\S\repos\dummyRepo\JuliaKafka\producer\producerJuliaServer.jl:38
in expression starting at c:\S\repos\dummyRepo\JuliaKafka\producer\produ

Consumer code:

c = KafkaConsumer(KAFKA_BROKER_ADDRESS, "my-consumer-group")
parlist = [(KAFKA_TOPIC, 0)]
subscribe(c, parlist)
timeout_ms = 1000
while true
    msg = poll(String, String, c, timeout_ms)
    println(msg)
    # @show(msg);
end

Consumer output:

nothing
nothing
nothing
Message(10B String: eYcYGpFuqe)
Message(100B String: o4dyZXBcCxsewjpOQosoxS6aiuWy2lrcofluVJJt79QXtW2f58DzRy1mZEnPc2HAReQHoW8BWc4blxtlHzQd0AwKFnUCPvcNLPCb)
Message(1KB String: nCcOw31WrRMiJUtTIPaQr4ZDGrVpfFXydPvGiscqyyJqqteutcXV6AL2wCrzhO31j2Bu6RKgVp2XD1yPVIMM59PxzXr7CipaHogkycYDDwV2IO7nCfRfad75DUxEE16QVB7XjXctu2EXiBnTAKdekPTFQovIzJvBE5iE4BXhLCfdhex2Z3vUxo5S92B0V2DhPNKTJIzIPjEenKjbPFNaaJOFOYusBmq7gkbgeZmImDCCug3YGOqKfFOH1xNUjmDW6wA0ONaRZj9CbSNMaXCgj2ykyFsoaXfTPvpCOsLFhfjLzJXRJK81Qyu6mPtxSIv19pvBOGgeIvps50RkgFvu1t3mK4L13VI4T5oMUVVYm8SVeZt5NlQV9Oc1L8FCZxAS9R63683g1z8o5razbogqRfJ37YyKJzPgCgmKNtYyH8L9FJH98YOLQ3QptBX6mEkv9579m4JppOHM12Uxw2pAeojYL5PnDHek5wHvX4rkTO8AZumRqRg70UkskceKXpn9CP6XHCIDqBmFGBSzB5OMuFZvcJS9eq0scGijmtheIqTdTLJ0LtELRtbONzw4X5Sl59f4IzkJSf03sK1XwOnKUFjr3GWc3YF8rYcL3EXgAfoWc5AqCoOupu6o9J5wQVTehmzgF0o2GjVNuSpk1gijAvs8t6cdoq1PayAPFGgb8fhtg0jQGBQeSSTAlPDpn3dC3COLiJlhpVbsfIDPf1dIyffKDa2igpyPwvwSFfG7cndH2890f7McCxOXCQRQw1EVuM5dPGmmUIHCIMILpwgGoFFV9SOyuuugQC9watQ0aY5cI0te61GyH2EyfKt6D8t3zNAgSfDBR2Y1AjpBRM0JVOB7E3pd2vtznP1OzsMZybGSSbZuIYnlaInGxtPCogoivdpI3LsdHtYRoW3BNW9hAe5S1eia4ivwca6dV7SUV3q5vNZtHf5khPdYpVsRt6LxfGDeIO1mRrdmSMwUBYZrP8vpKDuOdeu4uahT0SzJiVMGxJPRBDMRzb5HnRGgEzxG)
nothing
nothing
nothing

Thanks,
Enal.

@dfdx
Copy link
Owner

dfdx commented Oct 28, 2021

p.client.conf["message.max.bytes"] = 36423360
p.client.conf["max.message.bytes"] = 36423360

Ouch, this is not supposed to work 😨 conf is passed to the KafkaClient constructor and used to instantiate C-level client object. conf is then saved as a client's field, but changing it has no effect on the behavior.

Try the following instead:

conf = Dict("message.max.bytes" => 36423360, "max.message.bytes" => 36423360)
p = KafkaProducer(KAFKA_BROKER_ADDRESS, conf)

@EnalLS
Copy link
Author

EnalLS commented Oct 31, 2021

Given example does not work, receives a Type related error.
So, we had to add types to the dictionary constructor to avoid typing errors:
conf = Dict{String, Any}("message.max.bytes" => 364233060, "max.message.bytes" => 364233060)
when creating configuration as above, we don’t get an error, and we find the expected values in the configuration object.
However, we still have few issues.
If I ran the code , producer output:

julia> include("producerJuliaServer.jl")
1
sending 10B String
sending 100B String
sending 1KB String
sending 1MB String
sending 15MB String
sending 20MB String
2
sending 10B String
sending 100B String
sending 1KB String
sending 1MB String
sending 15MB String
sending 20MB String
3
sending 10B String
sending 100B String
sending 1KB String
sending 1MB String
sending 15MB String
sending 20MB String
4
sending 10B String
sending 100B String
sending 1KB String
sending 1MB String
sending 15MB String
sending 20MB String
5
sending 10B String
sending 100B String
sending 1KB String
sending 1MB String
sending 15MB String
sending 20MB String
6
sending 10B String
sending 100B String
sending 1KB String
sending 1MB String
sending 15MB String
sending 20MB String
7
sending 10B String
sending 100B String
sending 1KB String
sending 1MB String
sending 15MB String
sending 20MB String
8
sending 10B String
sending 100B String
sending 1KB String
sending 1MB String
sending 15MB String
sending 20MB String
9
sending 10B String
sending 100B String
sending 1KB String
sending 1MB String
sending 15MB String
sending 20MB String
10
sending 10B String
sending 100B String
sending 1KB String
sending 1MB String
sending 15MB String
sending 20MB String
Producer Done!
julia>
Please submit a bug report with steps to reproduce this fault, and any error messages that follow (in their entirety). Thanks.
Exception: EXCEPTION_ACCESS_VIOLATION at 0x63471770 -- crc32c at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line)
in expression starting at none:0

consumer side, there’s no message received.

Tried to send message with the same size several time (with 20MB, 15MB, 1MB even 1KB) after sending 2 – sometimes 3 – messages, the terminal crashed.
Producer code:

for i in 1:1
println(i)
# prepare_data_and_send_it(rand_string_10B,"10B String")
# prepare_data_and_send_it(rand_string_100B,"100B String")
prepare_data_and_send_it(rand_string_1KB,"1KB String")
# prepare_data_and_send_it(rand_string_1MB,"1MB String")
# prepare_data_and_send_it(rand_string_5MB,"15MB String")
# prepare_data_and_send_it(rand_string_20MB,"20MB String")
end

ran it, 2 times in a row, after the 2nd one, producer output:

julia> include("producerJuliaServer.jl")
1
sending 1KB String
Producer Done!
julia> include("producerJuliaServer.jl")
1
sending 1KB String
Producer Done!
julia>
Please submit a bug report with steps to reproduce this fault, and any error messages that follow (in their entirety). Thanks.
Exception: EXCEPTION_ACCESS_VIOLATION at 0x63574eea -- mtx_lock at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line)
in expression starting at none:0
mtx_lock at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line)
rd_kafka_q_serve at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line)
rd_kafka_poll at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line)
kafka_poll at C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\wrapper.jl:114 [inlined]
#3 at C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\client.jl:52 [inlined]
macro expansion at .\asyncevent.jl:252 [inlined]
#583 at .\task.jl:411
unknown function (ip: 0000000061126423)
jl_apply at /cygdrive/c/buildbot/worker/package_win64/build/src\julia.h:1703 [inlined]
start_task at /cygdrive/c/buildbot/worker/package_win64/build/src\task.c:839
Allocations: 885083 (Pool: 884745; Big: 338); GC: 3

Consumer output:

listening to messages...
got message
finished writing message
got message
finished writing message

no matter what the message size is, after sending it 2-3 times we get an error.

Some of the behavior looks like issue #11, so we modified our code to create the producer only once per running session, but still we get exceptions.
It seems to come from thread/memory related issues from within RDKafka.dll, we got the following exceptions:
Exception: EXCEPTION_ACCESS_VIOLATION at 0x63471770 -- crc32c at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line) in expression starting at REPL[3]:1
Exception: EXCEPTION_ACCESS_VIOLATION at 0x63574eea -- mtx_lock at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line)
in expression starting at none:0

Thanks,
Enal.

@dfdx
Copy link
Owner

dfdx commented Nov 1, 2021

librdkafka should be thread-safe by itself, but we may corrupt memory e.g. while destroying the allocated resources. Can you post a minimal reproducible example (the producer side should be enough)? It's hard to see what may go wrong without the actual code.

@EnalLS
Copy link
Author

EnalLS commented Nov 1, 2021

producer code :

using RDKafka
using Random
import RDKafka.produce
import RDKafka.KafkaProducer

KAFKA_BROKER_ADDRESS = "*********************************"
KAFKA_TOPIC = "test"

partition = 0

conf = Dict{String, Any}("message.max.bytes" => 364233060, "max.message.bytes" => 364233060)
p = KafkaProducer(KAFKA_BROKER_ADDRESS, conf)

rand_string_10B = randstring(10)
rand_string_100B = randstring(100);
rand_string_1KB = randstring(1024);
rand_string_1MB = randstring(1048576);
rand_string_5MB = randstring(5242880);
rand_string_20MB = randstring(20971520);
rand_arr_20MB = rand(UInt8,1048576);

function prepare_data_and_send_it(message,nameOfdata)
    println("sending $nameOfdata")
    message = string("### $nameOfdata ###", message)
    produce(p, KAFKA_TOPIC, nameOfdata, message)
end

function produce_much_stuff(iterationsNum)
    for i in 1:iterationsNum
        println(i)
        prepare_data_and_send_it(rand_string_10B,"10B String")
        prepare_data_and_send_it(rand_string_100B,"100B String")
        prepare_data_and_send_it(rand_string_1KB,"1KB String")
        prepare_data_and_send_it(rand_string_1MB,"1MB String")
        prepare_data_and_send_it(rand_string_5MB,"15MB String")
        prepare_data_and_send_it(rand_string_20MB,"20MB String")
    end
end


print("Producer Done!")
```

@dfdx
Copy link
Owner

dfdx commented Nov 1, 2021

I cannot reproduce the crash with a freshly installed RDKafka.jl on Linux. Messages of size > 1Mb indeed are not delivered, but it might be related to other settings (e.g. see this question), so I haven't looked at it yet.

Could you please verify what exactly fails for you - sending several messages (even of a small size) in a row, or sending large messages? Also, is it correct that you get the access violation error after the producer has finished its job (the message "Producer Done!" is printed by this time)?

@EnalLS
Copy link
Author

EnalLS commented Nov 2, 2021

What fails to us is sending several messages in row, usually when the messages are equal to or above 1Mb size.
We do get the "Producer Done!" print most of the time, but then we get the error, which seems to be printed by different thread.
In addition, it looks like sometimes the kafka connection remains open for 2-3 mins after the producer has finished, this because we often get broker not available messages after running the producer code few times.

@dfdx
Copy link
Owner

dfdx commented Nov 2, 2021

If you continue sending messages endlessly (so producer is never "done"), do you see the same error?

@EnalLS
Copy link
Author

EnalLS commented Nov 8, 2021

Yes,
tried this:

function produce_much_stuff()
    while true
    # for i in 1:iterationsNum
        # println(i)
        # prepare_data_and_send_it(rand_string_10B,"10B String")
        # prepare_data_and_send_it(rand_string_100B,"100B String")
        # prepare_data_and_send_it(rand_string_1KB,"1KB String")
        # prepare_data_and_send_it(rand_string_1MB,"1MB String")
        # prepare_data_and_send_it(rand_string_5MB,"15MB String")
        prepare_data_and_send_it(rand_string_20MB,"20MB String")
    end
end

the output was:

produce_much_stuff(10)
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
-1
ERROR: Produce request failed with error code: -1 : Unknown error
Stacktrace:
[1] error(s::String)
@ Base .\error.jl:33
[2] produce(rkt::Ptr{Nothing}, partition::Int64, key::Vector{UInt8}, payload::Vector{UInt8})
@ RDKafka C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\wrapper.jl:142
[3] produce(kt::RDKafka.KafkaTopic, partition::Int64, key::String, payload::String)
@ RDKafka C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\producer.jl:28
[4] produce(p::KafkaProducer, topic::String, partition::Int64, key::String, payload::String)
@ RDKafka C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\producer.jl:36
[5] produce
@ C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\producer.jl:42 [inlined]
[6] prepare_data_and_send_it(message::String, nameOfdata::String)
@ Main C:\S\repos\dummyRepo\JuliaKafka\producer\producerJuliaServer.jl:34
[7] produce_much_stuff(iterationsNum::Int64)
@ Main C:\S\repos\dummyRepo\JuliaKafka\producer\producerJuliaServer.jl:46
[8] top-level scope
@ REPL[3]:1

@EnalLS
Copy link
Author

EnalLS commented Nov 8, 2021

another output:

julia> produce_much_stuff(10)
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
sending 20MB String
Please submit a bug report with steps to reproduce this fault, and any error messages that follow (in their entirety). Thanks.
Exception: EXCEPTION_ACCESS_VIOLATION at 0xff1770 -- crc32c at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line)
in expression starting at REPL[3]:1

in there's no message received in the consumer side.

@dfdx
Copy link
Owner

dfdx commented Nov 8, 2021

Does the script work fine with messages less than 1Mb? Large messages may be blocked by many settings on both - client and server side, so one direction to go is to inspect the settings thoroughly. On the other hand, if you see the same error with smaller messages, or smaller messages are not delivered to the consumer, the problem might be in the Windows vs Linux version of librdkafka, because on Linux I cannot reproduce the described behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants