Skip to content

Commit

Permalink
Merge pull request #31 from bzwei/doc
Browse files Browse the repository at this point in the history
Update document to prepare for a public release of the gem
  • Loading branch information
chessbyte authored Oct 8, 2018
2 parents 8749909 + 0f613fa commit 99bdc38
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 87 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
= 0.1.0 - 4-Oct-2018
* Initial release
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ Or install it yourself as:

### Initialize a client

It is not recommended to directly create an actual client through `new` operation. Follow the example by specifying a protocol. This allows to easily switch the underlying messaging system from one type to another. Currently `:Stomp` and `:Kafka` are implemented.

```
ManageIQ::Messaging.logger = Logger.new(STDOUT)
client = ManageIQ::Messaging::Client.open(
:protocol => 'Stomp',
:host => 'localhost',
:port => 61616,
:password => 'smartvm',
Expand All @@ -46,10 +49,11 @@ Or install it yourself as:
client.close
```

Alternately, you can pass a block to `.open` without the need to explicitly close the client
Alternatively, you can pass a block to `.open` without the need to explicitly close the client.

```
ManageIQ::Messaging::Client.open(
:protocol => 'Stomp',
:host => 'localhost',
:port => 61616,
:password => 'smartvm',
Expand All @@ -76,15 +80,15 @@ This is the one-to-one publish/subscribe pattern. Multiple subscribers can subsc
}
)
client.subscribe_messages(:service => 'ems_operation', :affinity => 'ems_amazon1', :limit => 10) do |messages|
client.subscribe_messages(:service => 'ems_operation', :affinity => 'ems_amazon1') do |messages|
messages.each do |msg|
# do stuff with msg.message and msg.payload
client.ack(msg.ack_ref) # ack is required, but you can do it before or after "do stuff"
client.ack(msg.ack_ref)
end
end
# You can create a second client instance and call subscribe_messages with
# the same options. Then, both clients will take turns to consume the messages.
# the same options. Then both clients will take turns to consume the messages.
```

For better sending performance, you can publish a collection of messages together
Expand All @@ -95,7 +99,7 @@ For better sending performance, you can publish a collection of messages togethe
client.publish_messages([msg1, msg2])
```

Provide a block if you want `#publish_message` to wait on a response from the subscriber.
Provide a block if you want `#publish_message` to wait on a response from the subscriber. This feature may not be supported by every underlying messaging system.

```
client.publish_message(
Expand Down Expand Up @@ -150,7 +154,7 @@ This is the one-to-many publish/subscribe pattern. Multiple subscribers can subs
end
```

By default, events are delivered to live subscribers only. `subscribe_topic`'s `persist_ref` is not required. If a subscriber wants to receive the events it missed when it is offline, it should always create with same same `client_ref` and subscribe to the topic with the same `persist_ref`.
By default, events are delivered to live subscribers only. Some messaging systems support persistence with options.

## Development

Expand Down
2 changes: 1 addition & 1 deletion examples/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def run
puts "produced 5 messages"

puts "consumer"
client.subscribe_messages(:service => 'ems_operation', :affinity => 'ems_amazon1', :limit => 10) do |messages|
client.subscribe_messages(:service => 'ems_operation', :affinity => 'ems_amazon1') do |messages|
messages.each do |msg|
do_stuff(msg)
client.ack(msg.ack_ref)
Expand Down
182 changes: 117 additions & 65 deletions lib/manageiq/messaging/client.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,42 @@
module ManageIQ
module Messaging
# The abstract client class. It defines methods needed to publish or subscribe messages.
# It is not recommended to directly create a solid subclass instance. The proper way is
# to call class method +Client.open+ with desired protocol. For example:
#
# client = ManageIQ::Messaging::Client.open(
# :protocol => 'Stomp',
# :host => 'localhost',
# :port => 61616,
# :password => 'smartvm',
# :username => 'admin',
# :client_ref => 'generic_1',
# :encoding => 'json'
# )
#
# To close the connection one needs to explicitly call +client.close+.
# Alternatively if a block is given for the +open+ method, the connection will be closed
# automatically before existing the block. For example:
#
# ManageIQ::Messaging::Client.open(
# :protocol => 'Stomp'
# :host => 'localhost',
# :port => 61616,
# :password => 'smartvm',
# :username => 'admin',
# :client_ref => 'generic_1'
# ) do |client|
# # do stuff with the client
# end
# end
class Client
# Open or create a connection to the message broker
# @param options [Hash] the connection options
# @return [Client, nil] the client object if no block is given
# The optional block supply {|client| block }. The client will
# be automatically closed when the block terminates
# Open or create a connection to the message broker.
# Expected +options+ keys are:
# * :protocol (Implemented: 'Stomp', 'Kafka'. Default 'Stomp')
# * :encoding ('yaml' or 'json'. Default 'yaml')
# Other connection options are underlying messaging system specific.
#
# Avaiable type:
# Returns a +Client+ instance if no block is given.
def self.open(options)
protocol = options[:protocol] || :Stomp
client = Object.const_get("ManageIQ::Messaging::#{protocol}::Client").new(options)
Expand All @@ -21,70 +50,76 @@ def self.open(options)
nil
end

# Publish to a message to a queue. The message will be delivered to only one
# subscriber.
# @param options [Hash] the message attributes. Expected keys are:
# :service (service and affinity are used to determine the queue name)
# :affinity (optional)
# :class_name (optional)
# :message (e.g. method_name or message type)
# :payload (user defined structure, following are some examples)
# :instance_id
# :args
# :miq_callback
# :sender (optional, identify the sender)
# <other queue options TBA>
#
# Optionally a call back block {|response| block} can be provided to wait on
# the consumer to send an acknowledgment.
# Publish a message to a queue. The message will be delivered to only one subscriber.
# Expected keys in +options+ are:
# * :service (service and affinity are used to determine the queue name)
# * :affinity (optional)
# * :class_name (optional)
# * :message (e.g. method name or message type)
# * :payload (message body, a string or an user object that can be serialized)
# * :sender (optional, identify the sender)
# Other options are underlying messaging system specific.
#
# Optionally a call back block can be provided to wait on the consumer to send
# an acknowledgment. Not every underlying messaging system supports callback.
# Example:
#
# client.publish_message(
# :service => 'ems_operation',
# :affinity => 'ems_amazon1',
# :message => 'power_on',
# :payload => {
# :ems_ref => 'u987',
# :id => '123'
# }
# ) do |result|
# ansible_install_pkg(vm1) if result == 'running'
# end
def publish_message(options, &block)
assert_options(options, [:message, :service])

publish_message_impl(options, &block)
end

# Publish multiple messages to a queue.
# An aggregate version of `#publish_message `but for better performance
# All messages are sent in a batch
# An aggregate version of +#publish_message+ but for better performance.
# All messages are sent in a batch. Every element in +messages+ array is
# an +options+ hash.
#
# @param messages [Array] a collection of options for `#publish_message`
def publish_messages(messages)
publish_messages_impl(messages)
end

# Subscribe to receive messages from a queue
#
# @param options [Hash] attributes to configure how to receive messages.
# Available keys are:
# :service (service and affinity are used to determine the queue)
# :affinity (optional)
# :limit (optional, receives up to limit messages into the buffer)
#
# A callback block {|messages| block} needs to be provided to consume the
# messages. Example
# subscribe_message(options) do |messages|
# messages.collect do |msg|
# # from msg you get
# Subscribe to receive messages from a queue.
# Expected keys in +options+ are:
# * :service (service and affinity are used to determine the queue)
# * :affinity (optional)
# Other options are underlying messaging system specific.
#
# A callback block is needed to consume the messages:
#
# client.subscribe_message(options) do |messages|
# messages.each do |msg|
# # msg is a type of ManageIQ::Messaging::ReceivedMessage
# # attributes in msg
# msg.sender
# msg.message
# msg.payload
# msg.ack_ref (used to ack the message)
# msg.ack_ref #used to ack the message
#
# client.ack(msg.ack_ref)
# # process
# result # a result sent back to sender if expected
# # process the message
# end
# end
#
# @note The subscriber MUST ack each message independently in the callback
# block. It can decide when to ack according to whether a message can
# be retried. Ack the message in the beginning of processing if the
# message is not re-triable; otherwise ack it after the message is done.
# Any un-acked message will be redelivered to next subscriber AFTER the
# current subscriber disconnects normally or abnormally (e.g. crashed).
# Make sure a message is properly acked whatever strategy you take.
# Some messaging systems require the subscriber to ack each message in the
# callback block. The code in the block can decide when to ack according
# to whether a message can be retried. Ack the message in the beginning of
# processing if the message is not re-triable; otherwise ack it after the
# message is done. Any un-acked message will be redelivered to next subscriber
# AFTER the current subscriber disconnects normally or abnormally (e.g. crashed).
#
# To ack a message call `ack(msg.ack_ref)`
# To ack a message call +ack+(+msg.ack_ref+)
def subscribe_messages(options, &block)
raise "A block is required" unless block_given?
assert_options(options, [:service])
Expand All @@ -93,11 +128,23 @@ def subscribe_messages(options, &block)
end

# Subscribe to receive from a queue and run each message as a background job.
# @param options [Hash] attributes to configure how to receive messages
# :service (service and affinity are used to determine the queue)
# :affinity (optional)
# Expected keys in +options+ are:
# * :service (service and affinity are used to determine the queue)
# * :affinity (optional)
# Other options are underlying messaging system specific.
#
# This subscriber consumes messages sent through +publish_message+ with required
# +options+ keys, for example:
#
# This subscriber works only if the incoming message includes the class_name option
# client.publish_message(
# :service => 'generic',
# :class_name => 'MiqTask',
# :message => 'update_attributes', # method name, for instance method :instance_id is required
# :payload => {
# :instance_id => 2, # database id of class instance stored in rails DB
# :args => [{:status => 'Timeout'}] # argument list expected by the method
# }
# )
#
# Background job assumes each job is not re-triable. It will ack as soon as a request
# is received
Expand All @@ -108,12 +155,12 @@ def subscribe_background_job(options)
end

# Publish a message as a topic. All subscribers will receive a copy of the message.
# @param options [Hash] the message attributes. Expected keys are:
# :service (service is used to determine the topic address)
# :event (event name)
# :payload (user defined structure that describes the event)
# :sender (optional, identify the sender)
# <other queue options TBA>
# Expected keys in +options+ are:
# * :service (service is used to determine the topic address)
# * :event (event name)
# * :payload (message body, a string or an user object that can be serialized)
# * :sender (optional, identify the sender)
# Other options are underlying messaging system specific.
#
def publish_topic(options)
assert_options(options, [:event, :service])
Expand All @@ -122,14 +169,19 @@ def publish_topic(options)
end

# Subscribe to receive topic type messages.
# @param options [Hash] attributes to configure how to receive messages
# :service (service is used to determine the topic address)
# :persist_ref (optional, client needs to be have client_ref to use this feature)
# Expected keys in +options+ are:
# * :service (service is used to determine the topic address)
# Other options are underlying messaging system specific.
#
# Persisted event: In order to consume events missed during the period when the client is
# offline, the subscriber needs to be reconnect always with the same client_ref and persist_ref
# Some messaging systems allow subscribers to consume events missed during the period when
# the client is offline when they reconnect. Additional options are needed to turn on
# this feature.
#
# A callback {|sender, event, payload| block } needs to be provided to consume the topic
# A callback block is needed to consume the topic:
#
# client.subcribe_topic(:service => 'provider_events') do |sender, event, payload|
# # sender, event, and payload are from publish_topic
# end
#
def subscribe_topic(options, &block)
raise "A block is required" unless block_given?
Expand Down
2 changes: 2 additions & 0 deletions lib/manageiq/messaging/common.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module ManageIQ
module Messaging
module Common
private

def encode_body(headers, body)
return body if body.kind_of?(String)
headers[:encoding] = encoding
Expand Down
41 changes: 35 additions & 6 deletions lib/manageiq/messaging/kafka/client.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,41 @@
module ManageIQ
module Messaging
module Kafka
# Messaging client implementation with Kafka being the underlying supporting system.
# Do not directly instantiate an instance from this class. Use
# +ManageIQ::Messaging::Client.open+ method.
#
# Kafka specific connection options accepted by +open+ method:
# * :client_ref (A reference string to identify the client)
# * :hosts (Array of Kafka cluster hosts, or)
# * :host (Single host name)
# * :port (host port number)
# * :ssl_ca_cert (security options)
# * :ssl_client_cert
# * :ssl_client_cert_key
# * :sasl_gssapi_principal
# * :sasl_gssapi_keytab
# * :sasl_plain_username
# * :sasl_plain_password
# * :sasl_scram_username
# * :sasl_scram_password
# * :sasl_scram_mechanism
#
# Kafka specific +publish_message+ options:
# * :group_name (Used as Kafka partition_key)
#
# Kafka specific +subscribe_topic+ options:
# * :persist_ref (Used as Kafka group_id)
#
# Without +:persist_ref+ every topic subscriber receives a copy of each message
# only when they are active. If multiple topic subscribers join with the same
# +:persist_ref+, each message is consumed by only one of the subscribers. This
# allows a load balancing among the subscribers. Also any messages sent when
# all members of the +:persist_ref+ group are offline will be persisted and delivered
# when any member in the group is back online. Each message is still copied and
# delivered to other subscribers that belongs to other +:persist_ref+ groups or no group.
#
# +subscribe_background_job+ is currently not implemented.
class Client < ManageIQ::Messaging::Client
require 'kafka'
require 'manageiq/messaging/kafka/common'
Expand Down Expand Up @@ -36,12 +71,6 @@ def close

attr_reader :kafka_client

# @options options :host
# @options options :hosts (array)
# @options options :port
# @options options :client_ref (optional)
# @options options :encoding (default to 'yaml')
# @options options :ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key, :sasl_gssapi_principal, :sasl_gssapi_keytab, :sasl_plain_username, :sasl_plain_password, :sasl_scram_username, :sasl_scram_password, :sasl_scram_mechanism
def initialize(options)
hosts = Array(options[:hosts] || options[:host])
hosts.collect! { |host| "#{host}:#{options[:port]}" }
Expand Down
2 changes: 2 additions & 0 deletions lib/manageiq/messaging/kafka/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module ManageIQ
module Messaging
module Kafka
module Queue
private

def publish_message_impl(options)
raise ArgumentError, "Kafka messaging implementation does not take a block" if block_given?
raw_publish(true, *queue_for_publish(options))
Expand Down
Loading

0 comments on commit 99bdc38

Please sign in to comment.