Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to flush or poll a producer? #23

Open
edward-bestx opened this issue Oct 8, 2024 · 25 comments
Open

How to flush or poll a producer? #23

edward-bestx opened this issue Oct 8, 2024 · 25 comments

Comments

@edward-bestx
Copy link

Usually one has to either poll or flush a producer after sending 1 or more messages with a call to produce.

I had a look through the source code and I don't seem to be able to find a Julia interface to either a producer.poll or producer.flush function.

(I would assume that in Julia style this would be either poll(producer, ...) or flush(producer, ...).)

Is this functionality missing from the Julia library?

@dfdx
Copy link
Owner

dfdx commented Oct 9, 2024

Yes, flush() is missing, while poll() is only implemented for consumer. I don't really have time to add this functionality at the moment, but you can try to do something like this:

@eval function RDKafka.kafka_flush(rk::Ptr{Cvoid}, timeout::Integer)
    return ccall((:rd_kafka_flush, librdkafka), Cint,
                 (Ptr{Cvoid}, Cint),
                 rk, timeout)
end

producer = ...
RDKafka.kafka_flush(producer.client.rk, 100)

This is based on the API in rdkafka.h and similar to other wrappers in wrapper.jl. If you feel like trying it, you can implement poll() in a similar way, and, perhaps, wrap it all into high-level poll(producer, ...) and flush(producer, ...).

If you try it and get error that there's no function rd_kafka_flush, then we may need to update the version of librdkafka. Let me know if this is the case.

@edward-bestx
Copy link
Author

Thanks for the quick response. I think I can help with this. Let me see if I can get something set up. I may need some help from you to understand the CICD process, and yes it would probably be wise to move librdkafka to the latest version.

@edward-bestx
Copy link
Author

Ok I seem to have got something working. I don't understand in great detail how the CICD works yet, so may need some assistance there. Also I couldn't see how to update the librdkafka version. Can you point me in the right direction there? It probably makes sense to:

  1. Get it to build with the latest C/C++ library.
  2. Add the flush implementations. I can probably do this myself, it should be relatively straightforward. I'll start looking at this now.

@dfdx
Copy link
Owner

dfdx commented Oct 9, 2024

Sounds great, thank you!

Binary dependencies are built separately in RDKakfa.jl.deps. To update the version, we need to change the URL and hash to point to the new librdkafka home.

As for CI/CD, I think it's incomplete and terribly outdated 😞 I remember I mostly tested it locally due to the need to install and run Kafka. Now I realize we can do it all in a Docker container (using Github's docker action). I have half-baked Dockerfile for Julia + Kafka here and can try to set up proper CI/CD using it during the weekend, or you can jump in and try it yourself.

@edward-bestx
Copy link
Author

Ok that's great thanks, it would be great to see if I can reproduce your local testing steps. Is this documented somewhere? If not, can you help me to understand how to test it locally, and then I will add some documentation for this.

I already have local instances of a Kafka Broker/Controller ready to go.

@edward-bestx
Copy link
Author

I'm not sure what steps to take with the dependencies repository.

I am not sure about changing the Julia version to 1.10. This is the latest LTS version, so I would assume it should be advanced beyond version 1.0?

How do I "use" the dependency repo? I don't understand what to do next - are you able to point me in the right direction? Thanks!

@dfdx
Copy link
Owner

dfdx commented Oct 9, 2024

Oh, ah... We're playing archaeologists here. RDKafka.jl.deps was supposed to work on Travis CI, but it stopped free support of open-source projects years ago, and the Julia community moved to Github Actions. Let me take a close look at what is the supposed way to do it nowadays.

Meanwhile, could you please re-point your PR to my repository? Changes to your repository will not affect binaries for RDKafka.jl.

Regarding the tests, I don't see a lot of them either :( I think at this stage it's ok to just try the changes locally and submit a PR. I will try to update CI/CD for RDKafka.jl and RDKafka.jl.deps, and then we will be able to add some real tests.

@edward-bestx
Copy link
Author

Ok I'll have a think about how to setup some more detailed automated testing, but this might have to be a secondary priority.

I've just setup a new PR to point to your repo. I don't know if the changes I have suggested will work as expected or build.

Is there a way to test any of this locally?

@dfdx
Copy link
Owner

dfdx commented Oct 9, 2024

Oops, I misled you. After reading docs of BinaryBuilder and refreshing my memories I realized that RDKafka.jl.deps is not needed anymore and binary artifacts are now published in JuliaPackaging/Yggdrasil. It already contains entry for librdkafka, so if you want to update the binaries to the latest v2.5.3, you need to make a PR to that repo. On the other hand, the current version is 2.3.0, which is most likely sufficient for your goal - in this case nothing need to be done.

@edward-bestx
Copy link
Author

Ok, that's probably going to be good enough, I can perhaps submit a PR for that at a later date. If I do one right away it's unlikely to be accepted anyway as all I would be able to do is search for the version number and increment it. That may or may not be sufficient, I don't know.

Let's circle back - so to test any changes I make to this repository - how should I go about doing that? If I can test locally, presumably I need to build this repository locally somehow and then run a Julia script to test any new functions which I add?

Does that sound sensible?

@dfdx
Copy link
Owner

dfdx commented Oct 9, 2024

Yes, absolutely. Right now I'm adding a Dockerfile that installs Julia, installs and starts Kafka in the background, creates a topic "quickstart-events" and puts a few strings into it. If you can write a test (in test/runtests.jl) that uses this setup, I will ensure it's launched on CI/CD.

@dfdx
Copy link
Owner

dfdx commented Oct 9, 2024

(You can of course create other topics, write and read from them in the tests. Kafka broken will be available at localhost:9092)

@dfdx
Copy link
Owner

dfdx commented Oct 9, 2024

I've updated CI/CD and added a sample test. To test your changes, you can:

  1. Make a PR to this repository, and CI/CD will run automatically.
  2. Run Kafka and tests for RDKafka.jl in Docker:
docker build . -t rdkafka-jl:latest
docker run rdkafka-jl:latest
  1. Manually run Apache Kafka on localhost:9092 and invoke tests from Julia REPL:
] test

@edward-bestx
Copy link
Author

Hi, sorry took a couple of days to get back to looking at this. I'm having a problem with the Docker ENTRYPOINT:

$ docker run rdkafka-jl:latest
docker: Error response from daemon: failed to create task for container: failed to create shim task: OCI runtime create failed: runc create failed: unable to start container process: exec: "${RDKAFKA_TEST_DIR}/scripts/run-tests.sh": stat ${RDKAFKA_TEST_DIR}/scripts/run-tests.sh: no such file or directory: unknown.

It seems as if the environment variable is being interpreted as a literal string. I looked at the Dockerfile and thought this should work as expected.

Did you manage to run it at your end?

@dfdx
Copy link
Owner

dfdx commented Oct 11, 2024

Oops, I lost one commit while renaming the default branch 😨 Fixed it in main, please pull and try again.

@edward-bestx
Copy link
Author

Ok great thanks - I do seem to see some issues with the example test, for example here's some output which doesn't seem quite right?

%3|1728657536.777|FAIL|rdkafka#producer-1| [thrd:bad:9092/bootstrap]: bad:9092/bootstrap: Failed to resolve 'bad:9092': Temporary failure in name resolution (after 50038ms in state CONNECT, 1 identical error(s) suppressed)
     Testing RDKafka tests passed

But regardless, this is good progress. I can probably try to add some of the missing functions at this point. Let me see how far I can get with it and I'll let you know again if I get stuck.

@dfdx
Copy link
Owner

dfdx commented Oct 11, 2024

It's the intended behavior that comes from this test:

@testset "Verify error callback is called" begin
    conf = Dict()
    conf["bootstrap.servers"] = "bad"
    ch = Channel(1)
    RDKafka.KafkaProducer(conf; err_cb=(err, reason) -> begin
        push!(ch, err)
    end)
    @test take!(ch) == -193
end

Here we set a non-existing address for the booststrap server and check that the error callback is invoked. librdkafka correctly returns code -193 and writes a message about the bad address to the console.

@edward-bestx
Copy link
Author

edward-bestx commented Oct 11, 2024

Ok that makes sense.

Perhaps you can help me understand something actually. Let's look at a very simple example function call:

function rd_kafka_version()
    ccall((:rd_kafka_version, RDKafka.librdkafka), Cint, ())
end

Can you help me to understand how Julia knows what and where this function is?

Actually, before addressing that, there's something else I need to ask:

If I want to test the local code from a Julia REPL, how can I do that?

For example, if I do:

$ cd RDKafka.jl
$ julia
julia> using RDKafka
julia> RDKafka.rd_kafka_version()
33751295

This (I think) is using a version of RDKafka.jl which is installed system wide. In other words, it is not using the code from the directory ~/RDKafka.jl which is the location where I have cloned this repository.

Does my question make sense? Sorry there's two questions in one here, but I didn't want to forget the reason why I am asking the second question.

@edward-bestx
Copy link
Author

edward-bestx commented Oct 11, 2024

Ok I found a way to do it - let me know if I'm doing something completely crazy here...

  1. Purge system environment:
$ julia
julia> ]
(@v1.10) pkg> rm --all
(@v1.10) pkg> CTRL+C
julia>
  1. Run necessary command to add package librdkafka_jll
julia> import Pkg; Pkg.add("librdkafka_jll")
  1. Load local file RDKafka.jl (pwd is src)
julia> include("RDKafka.jl")
julia> RDKafka.test_function()
"test_function"

test_function is just something which I've added to test if changes I make locally are picked up correctly.

@dfdx
Copy link
Owner

dfdx commented Oct 11, 2024

If I want to test the local code from a Julia REPL, how can I do that?

You need to activate your environment. In the Julia REPL, type the following (including the leading ], which switches REPL to the pkg mode):

] activate .

Usually, I also put code for auto-activation of the project from the current directory to ~/.julia/config/startup.jl. Something like this:

import Pkg

project_root = pwd()

# find project root if we are in a nested dir
while !in("Project.toml", project_root) && project_root != "/"
    project_root = dirname(project_root)
end

# activate this project (unless we are not in any project)
if project_root != "/"
    Pkg.activate(project_root)
end

Can you help me to understand how Julia knows what and where this function is?

ccall has the following format:

ccall((:rd_kafka_version, RDKafka.librdkafka),      Cint,            ()                       )
               ^                  ^                    ^             ^             ^
           func-name           lib-name         return-type      arg-types     arg-values (if any)

RDKafka.librdkafka points to the path to a .so, .dll or .dylib file with your binary dependencies, and that file contains the function rd_kafka_version. Knows that RDKafka doesn't specify librdakafka variable itself. Instead, this variables is imported from the autogenerated librdkafka_jll that we discussed earlier.

@dfdx
Copy link
Owner

dfdx commented Oct 11, 2024

You made it in a more creative way :) Yet, you don't have to remove all packages to work with a package locally.

@edward-bestx
Copy link
Author

You need to activate your environment. In the Julia REPL, type the following (including the leading ], which switches REPL to the pkg mode):

] activate .

Just to check, which directory are you running this from?

  • RDKafka.jl
  • RDKafka.jl/src

Think I've figured it out. Are you doing this?

$ pwd
~/RDKafka.jl
$ julia
julia> ]
(@v1.10) pkg> activate .
(@v1.10) pkg> ^C
julia> import Pkg; Pkg.add("librdkafka_jll")
julia> include("src/RDKafka.jl")
julia> RDKafka.test_function()
test_function()

If I've got that slightly wrong please do correct me. Sorry for what probably seem like slightly stupid questions, I'm still getting used to Julia. Once I understand the "standard workflow" I should be able to make progress a lot quicker. Thanks!

@dfdx
Copy link
Owner

dfdx commented Oct 11, 2024

No problem at all! There's no single "standard workflow", but it's easy to set up something that works for you.

I forgot one detail. Let's say, you have just cloned RDKafka.jl to the home directory. The files are available, but the dependencies are not downloaded. To (resolve and) download the dependencies, you need to "instantiate" the package:

$ cd ~/RDKafka.jl
$ julia
julia> ] activate .      # activate project from the current directory
julia> ] instantiate    # instantiate the package dependencies

instantiate will install librdkafka_jll, so you don't need to Pkg.add() it separately.

From here you have a choice:

  1. Include .jl files into the current Julia session.
julia> include("src/RDKafka.jl")
# or 
julia> include("src/core.jl")     # my personal trick: make a singe core.jl file that includes all other files; 
                                                   # this way I can run functions as if they were defined in Main module
  1. Use the package:
julia> using RDKafka             # assuming the package is activated, this imports the version from your current working dir

Most people prefer the second option and use Revise to automatically reload the changes from the package whenever you edit its code.


Note that there's one more way to develop Julia packages using ] dev and ] free commands. I don't use this approach for historical reasons, but you may find documentation for it here.

@edward-bestx
Copy link
Author

edward-bestx commented Oct 11, 2024

instantiate will install librdkafka_jll, so you don't need to Pkg.add() it separately.

Ah - of course. I'd forgotten about the instantiate command.

(Edit: Resolved previous issue)

@edward-bestx
Copy link
Author

julia> using RDKafka             # assuming the package is activated, this imports the version from your current working dir

Most people prefer the second option and use Revise to automatically reload the changes from the package whenever you edit its code.

Yes - I also prefer this option. Thanks! I was trying to figure out how to get it to use the one from the working directory!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants