An LFE Wrapper for the Erlang RabbitMQ (AMQP) Client
- Introduction
- Installation
- Documentation
- Supported RabbitMQ Modules
- Usage
- Notices
- Copyright Notice
- Getting Started
- The LFE AMQP Client Library
- Programming Model
- AMQP Commands
- Including Header Files
- Connecting
- Connecting to a Broker
- Connecting To A Broker with AMQP URIs
- Channels, Exchanges, Queues, and Messages
- Creating Channels
- Managing Exchanges and Queues
- Sending Messages
- Receiving Messages
- Subscribing to Queues
- Subscribing Internals
- Closing Channels and the Connection
- Examples
- Complete Example
- Flow Control
- Delivery Flow Control
- Blocked Connections
- Handling Returned Messages
- License
Introduction ↟
The kanin library is a wrapper for various modules in the Erlang AMQP client library. kanin was created in order to not only provide a less verbose AMQP client library for LFE hackers, but one that was also more Lispy.
Installation ↟
To pull in kanin as part of your project, just add it to your rebar.config
deps:
{deps, [
...
{kanin, {git, "[email protected]:lfex/kanin.git", {branch, "master"}}}
]}.
And then do:
$ rebar3 compile
Documentation ↟
Below, in the "Usage" section, you will find information about using Kanin with RabbitMQ in LFE projects.
Also, you may be interested in the Kanin tutorials, which have been translated into LFE from the official RabbitMQ docs for Python and Erlang.
Supported Modules ↟
The following amqp_*
modules have been included in kanin:
amqp_channel
->kanin-chan
amqp_connection
->kanin-conn
amqp_uri
->kanin-uri
If your favorite amqp_*
module is not among those, feel free to submit a
new ticket requesting the addition of your desired module(s) or submit a
pull request with the desired inclusion added.
Usage ↟
Copyright Notice ↟
The following content was copied from the Erlang Client User Guide on the RabbitMQ site. The original copyright was in 2014, held by Pivotal Software, Inc.
The LFE AMQP Client Library ↟
The AMQP client provides an Erlang interface to compliant AMQP brokers. The client follows the AMQP execution model and implements the wire level marshaling required to encode and decode AMQP commands in a protocol conformant fashion. AMQP is a connection orientated protocol and multiplexes parallel interactions via multiple channels within a connection.
This user guide assumes that the reader is familiar with basic concepts of AMQP and understands exchanges, queues and bindings. This information is contained in the protocol documentation on the AMQP website. For details and exact definitions, please see the AMQP specification document.
The basic usage of the client follows these broad steps:
- Establish a connection to a broker
- Create a new channel within the open connection
- Execute AMQP commands with a channel such as sending and receiving messages, creating exchanges and queue or defining routing rules between exchanges and queues
- When no longer required, close the channel and the connection
Programming Model ↟
Once a connection has been established, and a channel has been opened, an
LFE application will typically use the kanin-chan:call/{2,3}
and
kanin-chan:cast/{2,3}
functions to achieve most of the things it needs to
do.
The underlying Erlang AMQP client library is made up of two layers:
- A high level logical layer that follows the AMQP execution model, and
- A low level driver layer that is responsible for providing a physical transport to a broker.
There are two drivers in the client library:
-
The network driver establishes a TCP connection to a protocol compliant AMQP broker and encodes each command according to the specification. To use this driver, start a connection using
kanin-conn:start/1
with the parameter set to an#amqp_params_network
record. -
The direct driver uses native Erlang messaging instead of sending AMQP encoded commands over TCP. This approach avoids the overhead of marshaling commands onto and off the wire. However, the direct driver can only be used in conjunction with the RabbitMQ broker and the client code must be deployed into the same Erlang cluster. To use the direct driver, start a connection using
kanin-conn:start/1
with the parameter set to an#amqp_params_direct
record.
At run-time, the Erlang client library re-uses a subset of the functionality from the RabbitMQ broker. In order to keep the a client deployment independent of RabbitMQ, the Erlang client build process produces an archive containing all of the common modules. This archive is then put onto the load path of the client application.
For more detailed information on the API, please refer to the reference documentation.
Furthermore, the test suite that is part of the source distribution of the client library contains many complete examples of how to program against the API.
AMQP Commands ↟
The general mechanism of interacting with the broker is to send and receive AMQP
commands that are defined in the protocol documentation. During build process,
the machine-readable version of the AMQP specification is used to auto-generate
Erlang records for each command. The code generation process also defines
sensible default values for each command. Using default values allows the
programmer to write terser code - it is only necessary to override a field if
you require non-default behaviour. The definition of each command can be
consulted in the include/rabbit-framing.lfe
header file. For example,
when using the (make-exchange.declare ...)
record-creating macro,
specifying the following:
(make-exchange.declare exchange #"my_exchange")
is equivalent to this:
(make-exchange.declare
exchange #"my_exchange"
ticket 0
type #"direct"
passive 'false
durable 'false
auto_delete 'false
internal 'false
nowait 'false
arguments '())
Including Header Files ↟
The LFE client uses a number of record definitions which you will encounter in this guide. These records fall into two broad categories:
- Auto-generated AMQP command definitions from the machine readable version of the specification
- Definitions of data structures that are commonly used throughout the client
To gain access to these records, you need to include the amqp-client.lfe
file in every module that uses the Erlang client:
(include-lib "kanin/include/amqp-client.lfe")
Connecting to a Broker ↟
The kanin-conn
module is used to start a connection to the broker. It
requires either a direction connection options record or a network connection
options record. These may be generated either by creating the records directly
or providing connection information in a URI and having those parsed to
generate the record for you.
Example optoins from record-creation:
lfe> (make-amqp_params_direct)
#(amqp_params_direct none none #"/" nonode@nohost none ())
and
lfe> (make-amqp_params_network host "localhost")
#(amqp_params_network
#"guest"
#"guest"
#"/"
"localhost"
undefined 0 0 0 infinity none
(#Fun<amqp_auth_mechanisms.plain.3> #Fun<amqp_auth_mechanisms.amqplain.3>)
() ())
Example options from URI-parsing:
lfe> (kanin-uri:parse "amqp://dave:secret@")
#(ok #(amqp_params_direct #"dave" #"secret" #"/" nonode@nohost none ()))
and
lfe> (kanin-uri:parse "amqp://alice:secret@host:10000/vhost")
#(ok
#(amqp_params_network
#"alice"
#"secret"
#"vhost"
"host"
10000 0 0 10 infinity none
(#Fun<amqp_uri.11.121287672> #Fun<amqp_uri.11.121287672>)
() ()))
(For more information in this, see the section below: "Connecting To A Broker with AMQP URIs".)
To use these options to create an actual connection, use the kanin-conn
module:
lfe> (set `#(ok ,net-opts) (kanin-uri:parse "amqp://localhost"))
lfe> (set `#(ok ,conn) (kanin-conn:start net-opts))
That's in the REPL; in an application using kanin
, you'd want to use
something like a (let ...)
statement.
The ksnin-conn:start
function returns #(ok ,conn)
, where
conn
is the pid of a process that maintains a permanent
connection to the broker.
In case of an error, the above call returns #(error ,error)
.
The example above has just "localhost"
as a parameter. However, there will
often be many more than that.
An AMQP broker contains objects organised into groups called virtual hosts. The concept of virtual hosts gives an administrator the facility to partition a broker resource into separate domains and restrict access to the objects contained within these groups. AMQP connections require client authentication and the authorisation to access specific virtual hosts.
The (make-amqp_params_network)
record macro sets the following default
values:
Parameter | Default Value |
---|---|
username | guest |
password | guest |
virtual_host | / |
host | localhost |
post | 5672 |
channel_max | 0 |
frame_max | 0 |
heartbeat | 0 |
ssl_options | none |
auth_mechanisms | (list #'amqp_auth_mechanisms:plain/3 #'amqp_auth_mechanisms:amqplain/3) |
client_properties | '() |
These values are only the defaults that will work with an out of the box broker running on the same host. If the broker or the environment has been configured differently, these values can be overridden to match the actual deployment scenario.
SSL options can also be specified globally using the ssl_options
environment
key for the amqp-client
application. They will be merged with the SSL
parameters from the URI (the latter will take precedence).
If a client wishes to run inside the same Erlang cluster as the RabbitMQ broker,
it can start a direct connection that optimises away the AMQP codec. To start a
direct connection, use kanin-conn:start/1
with the parameter set to an
(make-amqp_params_direct)
record.
Providing a username and password is optional, since the direct client is considered trusted anyway. If a username and password are provided then they will be checked and made available to authentication backends. If a username is supplied, but no password, then the user is considered trusted and logged in unconditionally. If neither username nor password are provided then the connection will be considered to be from a "dummy" user which can connect to any virtual host and issue any AMQP command.
The (make-amqp_params_direct)
record macro sets the following default
values:
Parameter | Default Value |
---|---|
username | #"guest" |
password | #"guest" |
virtual_host | #"/" |
node | (node) |
client_properties | '() |
Connecting To A Broker with AMQP URIs ↟
Instead of working the (make-amqp_params_*)
records directly, AMQP
URIs may be used. The
(kanin-uri:parse/1)
function is provided for this purpose. It parses an URI
and returns the equivalent amqp_params_*
record. Diverging from the spec,
if the hostname is omitted, the connection is assumed to be direct and an
amqp_params_direct
record is returned. In addition to the standard host,
port, user, password and vhost parameters, extra parameters may be specified via
the query string (e.g. "?heartbeat=5"
).
AMQP URIs are defined with the following ABNF rules:
amqp_URI = "amqp://" amqp_authority [ "/" vhost ] [ "?" query ]
amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ]
amqp_userinfo = username [ ":" password ]
username = *( unreserved / pct-encoded / sub-delims )
password = *( unreserved / pct-encoded / sub-delims )
vhost = segment
Here are some examples:
Parameter | Username | Password | Host | Port | Vhost |
---|---|---|---|---|---|
amqp://alice:secret@host:10000/vhost | "alice" | "secret" | "host" | 10000 | "vhost" |
amqp://bo%62:%73ecret@h%6fst:10000/%76host | "bob" | "secret" | "host" | 10000 | "vhost" |
amqp:// | |||||
amqp://:@/ | "" | "" | "" | ||
amqp://carol@ | "carol" | ||||
amqp://dave:secret@ | "dave" | "secret" | |||
amqp://host | "host" | ||||
amqp://:10000 | 10000 | ||||
amqp:///vhost | "vhost" | ||||
amqp://host/ | "host" | "" | |||
amqp://host/%2f | "host" | "/" | |||
amqp://[::1] | "[::1]" |
Using a connection URI, you can creat either direct or network connection
options, and then use those to connect via the kanin-conn:start
function (as
demonstrated above).
Creating Channels ↟
Once a connection to the broker has been established, the kanin-conn
module
can be used to create channels:
lfe> (set `#(ok ,chan) (kanin-conn:open-channel conn))
#(ok <0.114.0>)
This function takes the pid of the connection process and returns
#(ok, chan)
, where chan
is a pid that encapsulates a server side channel.
Managing Exchanges and Queues ↟
Once a channel has been established, the kanin-chan
module can be used to
manage the fundamental objects within AMQP, namely exchanges and queues. The
following function creates an exchange called my-exchange
, which by default,
is the direct exchange:
lfe> (set declare (make-exchange.declare exchange #"my-exchange"))
#(exchange.declare 0 #"my-exchange" #"direct" false false false false false ())
lfe> (kanin-chan:call chan declare)
#(exchange.declare_ok)
Similarly, a queue called my-queue
is created by this code:
lfe> (set declare (make-queue.declare queue #"my-queue"))
#(queue.declare 0 #"my-queue" false false false false false ())
lfe> (kanin-chan:call chan declare)
#(queue.declare_ok #"my-queue" 0 0)
In many scenarios, a client is not interested in the actual name of the queue
it wishes to receive messages from. In this case, it is possible to let the
broker generate a random name for a queue. To do this, send a
queue.declare
command and leave the queue attribute undefined:
lfe> (kanin-chan:call chan (make-queue.declare))
#(queue.declare_ok #"amq.gen-nQEU2b7bjcGEi39TsYxXcg" 0 0)
The server will auto-generate a queue name and return this name as part of the acknowledgement.
To create a routing rule from an exchange to a queue, the queue.bind
command is used:
lfe> (set binding (make-queue.bind queue #"my-queue"
exchange #"my-exchange"
routing_key #"my-key"))
#(queue.bind 0 #"my-queue" #"my-exchange" #"my-key" false ())
lfe> (kanin-chan:call chan binding)
#(queue.bind_ok)
When this routing rule is no longer required, this route can be deleted using
the queue.unbind
command:
lfe> (set binding (make-queue.unbind queue #"my-queue"
exchange #"my-exchange"
routing_key #"my-key"))
#(queue.unbind 0 #"my-queue" #"my-exchange" #"my-key" ())
lfe> (kanin-chan:call chan binding)
#(queue.unbind_ok)
An exchange can be deleted by the exchange.delete
command:
lfe> (set delete (make-exchange.delete exchange #"my-exchange"))
#(exchange.delete 0 #"my-exchange" false false)
lfe> (kanin-chan:call chan delete)
#(exchange.delete_ok)
Similarly, a queue is deleted using the queue.delete
command:
lfe> (set delete (make-queue.delete queue #"my-queue"))
#(queue.delete 0 #"my-queue" false false false)
lfe> (kanin-chan:call chan delete)
#(queue.delete_ok 0)
Note that we used kanin-chan:call/2
in the examples above, since we sent
AMQP synchronous methods. It is generally advisable to use kanin- chan:call/{2,3}
for synchronous methods, rather than kanin-chan:cast/{2,3}
,
even though both functions work with both sync and async method types. The
difference between the two functions is that kanin-chan:call/{2,3}
blocks
the calling process until the reply comes back from the server (for sync
methods) or the method has been sent on the wire (for async methods), whereas
kanin-chan:cast/{2,3}
returns ok
immediately. Thus, only by using kanin- chan:call/{2,3}
do we have direct feedback when the broker has acknowledged
our command.
Sending Messages ↟
To send a message to an exchange with a particular routing key, the
basic.publish
command in conjunction with the #amqp_msg{} record is used:
lfe> (set msg (make-amqp_msg payload #"foobar"))
#(amqp_msg
#(P_basic ...)
#"foobar")
lfe> (set pub (make-basic.publish exchange #"my-exchange" routing_key #"my-key"))
#(basic.publish 0 #"my-exchange" #"my-key" false false)
lfe> (kanin-chan:cast chan pub msg)
ok
By default, the properties field of the amqp_msg
record contains a minimal
implementation of the P_basic
properties structure. If an application
needs to override any of the defaults, for example, to send persistent
messages, the amqp_msg
needs to be constructed accordingly:
lfe> (set payload #"foobaz")
#"foobaz"
lfe> (set pub (make-basic.publish exchange #"my-exchange" routing_key #"my-key"))
#(basic.publish 0 #"my-exchange" #"my-key" false false)
Set persistent delivery mode property:
lfe> (set basic-properties (make-P_basic delivery_mode 2))
#(P_basic ... 2 ...)
Continue with message creation and send:
lfe> (set msg (make-amqp_msg props basic-properties payload #"foobar"))
#(amqp_msg
#(P_basic ... 2 ...)
#"foobar")
lfe> (kanin-chan:cast chan pub msg)
ok
The full list of message headers is explained in the AMQP protocol documentation.
Remember that the AMQP basic.publish
command is asynchronous. This means
that the server will not send a response to it, unless the message is not
deliverable. In this case, the message will be returned to the client. This
operation is described in the "Handling Returned Messages" section below.
Receiving Messages ↟
The simplest way to receive a message is to poll an existing queue. This is
achieved using the basic.get
command:
lfe> (set get-cmd (make-basic.get queue #"my-queue" no_ack 'true))
#(basic.get 0 #"my-queue" true)
lfe> (set `#(,(= (match-basic.get_ok) ok) ,content) (kanin-chan:call chan get-cmd))
#(#(basic.get_ok ...)
#(amqp_msg
#(P_basic ...)
#"foobar"))
lfe> ok
#(basic.get_ok ...)
lfe> content
#(amqp_msg
#(P_basic ...)
#"foobar")
The payload that is returned is an Erlang binary, and it is up to the application to decode it, as the structure of this content is opaque to the AMQP protocol.
If the queue was empty when the basic.get
command was invoked, then the
channel will return an basic.get_empty
message, as illustrated here:
lfe> (set (= (match-basic.get_empty) content)
(kanin-chan:call chan get-cmd))
#(basic.get_empty #"")
lfe> content
#(basic.get_empty #"")
Note that the previous example sets the no_ack
flag on the basic.get
command. This tells the broker that the receiver will not send an
acknowledgement of the message. In doing so, the broker can absolve itself of
the responsibility for delivery - once it believes it has delivered a message,
then it is free to assume that consuming application has taken responsibility
for it. In general, a lot of applications will not want these semantics,
rather, they will want to explicitly acknowledge the receipt of a message.
This is done with the basic.ack
command, where the no_ack
field is turned
off by default:
lfe> (set get-cmd (make-basic.get queue #"my-queue"))
#(basic.get 0 #"my-queue" false)
lfe> (set `#(,(match-basic.get_ok delivery_tag msg-tag) ,content)
(kanin-chan:call chan get-cmd))
#(...)
lfe> msg-tag
7
After the application has done what it needs to do with the response, it can acknowledge the message:
lfe> (kanin-chan:cast chan (make-basic.ack delivery_tag msg-tag))
ok
Notice that we sent the basic.ack
command using kanin-chan:cast/2
instead of kanin-chan:call/2
. This is because the broker will not send a
response to an acknowledgement, i.e. it is a fire and forget command.
Receiving messages by polling a queue is not as as efficient as subscribing a consumer to a queue, so consideration should be taken when receiving large volumes of messages.
Subscribing to Queues ↟
As indicated in the "Receiving Messages" section, subscribing to a queue can
be a more efficient means of consuming messages than the polling mechanism. To
subscribe to a queue, the basic.consume
command is used in one of two
forms:
lfe> (set sub (make-basic.consume queue #"my-queue"))
#(basic.consume 0 #"my-queue" #"" false false false false ())
lfe> (set (match-basic.consume_ok consumer_tag cnsm-tag)
(kanin-chan:subscribe chan sub consumer-pid))
or
(set sub (make-basic.consume queue #"my-queue"))
#(basic.consume 0 #"my-queue" #"" false false false false ())
lfe> (set (match-basic.consume_ok consumer_tag cnsm-tag)
(kanin-chan:call chan sub))
#(basic.consume_ok #"amq.ctag-QDTEY6V7duBFu_k86wayzg")
lfe> cnsm-tag
#"amq.ctag-QDTEY6V7duBFu_k86wayzg"
The consumer-pid
argument is the pid of a process to which the client
library will deliver messages. This can be an arbitrary Erlang process,
including the process that initiated the subscription. The basic.consume_ok
notification contains a tag that identifies the subscription. This is used at
a later point in time to cancel the subscription. This notification is sent
both to the process that created the subscription (as the return value to
kanin-chan:subscribe/3
) and as a message to the consumer process.
When a consumer process is subscribed to a queue, it will receive messages in its mailbox. An example receive loop looks like this:
(defun loop (chan)
(receive
;; This is the first message received
((match-basic.consume_ok)
(loop chan))
;; This is received when the subscription is cancelled
((match-basic.cancel_ok)
'ok)
;; A delivery
(`#(,(match-basic.deliver delivery_tag tag) ,content)
;; do somehting with the message payload here ...
(lfe_io:format "Payload: ~p~n" `(,content))
(kanin-chan:cast chan (make-basic.ack delivery_tag tag))
(loop chan))))
In this simple example, the process consumes the subscription notification and
then proceeds to wait for delivery messages to arrive in its mailbox. When
messages are received from the mailbox, the loop does something useful with
the message and sends a receipt acknowledge back to the broker. If the
subscription is cancelled, either by the consumer itself or some other
process, a cancellation notification will be sent to the consumer process. In
this scenario, the receive loop just exits. If the application does not wish
to explicitly acknowledge message receipts, it should set the no_ack
flag on
the subscription request.
To run the loop, spawn the function, subscribe the pid, and publish some messages:
lfe> (set consumer-pid (spawn (lambda () (loop chan))))
<0.141.0>
lfe> (kanin-chan:subscribe chan sub consumer-pid)
#(basic.consume_ok #"amq.ctag-wGk8K-6_YH-ovKODkDZQKA")
lfe> (kanin-chan:cast chan pub msg)
ok
Payload: #(amqp_msg
#(P_basic ...)
#"foobar")
To cancel a subscription, use the tag that the broker passed back with the
basic.consume_ok
acknowledgement:
lfe> (kanin-chan:call chan (make-basic.cancel consumer_tag tag))
Subscribing Internals ↟
The channel uses a module implementing the amqp_gen_consumer
behaviour to
determine its behaviour with regard to subscribing related events.
Effectively, this modules handles client-side consumer registration and
routing of deliveries to the appropriate consumers.
For instance, the default consumer module, amqp_selective_consumer
, keeps
track of which processes are subscribed to which queues and routes deliveries
appropriately; in addition, if the channel gives it a delivery for an unknown
consumer, it will pass it to a default consumer, should one be registered.
By contrast, amqp_direct_consumer
simply forwards all the messages it
receives from the channel to its only registered consumer.
The consumer module for a channel is chosen when the channel is opened by
setting the second parameter to kanin-conn:open-channel/2
. The consumer
module implements the amqp_gen_consumer
behaviour and thus implements
functions to handle receiving basic.consume
, basic.consume_ok
,
basic.cancel
, basic.cancel_ok
methods as well as publishes.
See the API documentation for details.
Closing Channels and the Connection ↟
When a channel is no longer required, a client should close it. This is
achieved using kanin-chan:close/1
:
lfe> (kanin-chan:close chan)
ok
To close the connection, kanin-conn:close/1
is used:
lfe> (kanin-conn:close conn)
ok
Both the channel.close
and connection.close
commands take the arguments
reply_code
(an integer) and reply_text
(binary text), which can be set by
the client depending on the reason why the channel or connection is being
closed. In general, however, the reply_code
is set to 200 to indicate a
normal shutdown. The reply_text
attribute is just an arbitrary string, that
the server may or may not log. If a client wants to set to a different reply
code and/or text, it can use the overloaded functions kanin-chan:close/3
and
kanin-conn:close/3
respectively.
Complete Example ↟
This shows a complete example:
(include-lib "ltest/include/ltest-macros.lfe")
(include-lib "kanin/include/amqp-client.lfe")
(defun example ()
(let* (;; Get connection options
(`#(ok ,opts) (kanin-uri:parse "amqp://localhost"))
;; Start a network connection
(`#(ok ,conn) (kanin-conn:start opts))
;; Open a channel on the connection
(`#(ok ,chan) (kanin-conn:open-channel conn))
(example-exchange #"example-exchange")
(example-queue #"example-queue")
(example-key #"example-key"))
;; Declare an exchange
(kanin-chan:call
chan
(make-exchange.declare
exchange example-exchange))
;; Declare a queue
(kanin-chan:call
chan
(make-queue.declare
queue example-queue))
;; Bind an exchange and queue with a routing key
(kanin-chan:call
chan
(make-queue.bind
queue example-queue
exchange example-exchange
routing_key example-key))
;; Publish a message
(let ((msg (make-amqp_msg payload #"foobar"))
(pub (make-basic.publish
exchange example-exchange
routing_key example-key)))
(kanin-chan:cast chan pub msg))
;; Get the message back from the queue
(let* ((get-cmd (make-basic.get queue example-queue))
(`#(,(match-basic.get_ok delivery_tag tag)
,(match-amqp_msg payload msg-payload))
(kanin-chan:call chan get-cmd)))
;; Do something with the message payload
(io:format "~nGot message: ~p~n" `(,msg-payload))
;; Ack the message
(kanin-chan:cast chan (make-basic.ack delivery_tag tag)))
;; Close the channel
(kanin-chan:close chan)
;; Close the connection
(kanin-conn:close conn)))
To run from the LFE REPL, simply paste the above, and then run (example)
.
In the example above, we created an exchange and queue, and bound the the two. We then created a message and published it to the exchange. The message was then dequeued and acknowledged.
Delivery Flow Control ↟
By default, there is no flow control within a channel other than normal TCP
back-pressure. A consumer can set the size of the prefetch buffer that the
broker will maintain for outstanding unacknowledged messages on a single
channel. This is achieved using the basic.qos
command:
lfe> (kanin-chan:call
chan
(make-basic.qos prefetch_count prefetch))
Applications typically should set the prefetch count, which means the processing speed of the consumer will exert back-pressure on the flow of messages in that channel.
Blocked Connections ↟
When an AMQP broker is running low on resources, for example by hitting a memory high watermark, it may choose to stop reading from publishers' network sockets.
RabbitMQ introduces a mechanism to allow clients to be told this has taken
place - invoke kanin-conn:register-blocked-handler/2
giving the pid of a
process to which (make-connection.blocked)
and (make-connection.unblocked)
messages may be sent.
Handling Returned Messages ↟
The broker will return undeliverable messages back to the originating client.
These are messages published either with the immediate or mandatory flags set.
In order for the application to get notified of a return, it must register a
callback process that can process (make-basic.return)
commands. Here is an
example of unrouteable message:
(kanin-chan:register-return-handler chan (self))
(kanin-chan:call
chan
(make-exchange.declare exchange some-name))
(let ((pub (make-basic.publish
exchange some-name
routing_key some-key
mandatory 'true)))
(kanin-chan:call chan pub (make-amqp_msg payload #"some payload"))
(receive
(`#(,(match-basic.return reply_text #"unroutable" exchange some-name)
,content)
;; Do something with the returned message
...)))
(make-basic.return )))))
License ↟
Apache License, Version 2.0
Copyright © 2014-2017, BilloSystems, Ltd. Co.
Copyright © 2015-2017, Ricardo Lanziano
Copyright © 2015-2017, Duncan McGreggor