Skip to content

Simple framework for sequentially executing a series of reusable code components.

License

Notifications You must be signed in to change notification settings

armchairtheorist/simple_pipeline

Repository files navigation

Gem Version Code Climate Build Status Coverage Status

SimplePipeline

SimplePipeline is a simple framework for sequentially executing a series of code components. Each code component takes a payload, modifies it, and moves it down the pipe for the next component and so forth. One of the design goals is to make it as flexible as possible, yet keep it simple.

Installation

Install the gem from RubyGems:

gem install simple_pipeline

If you use Bundler, just add it to your Gemfile and run bundle install

gem 'simple_pipeline'

I have only tested this gem on Ruby 2.3.0, but there shouldn't be any reason why it wouldn't work on earlier Ruby versions as well.

Usage

pipeline = SimplePipeline.new

pipeline.add Pipe1.new
pipeline.add Pipe2.new
pipeline.add Pipe3.new
        
payload = {:some_key => some_value}

pipeline.process payload

SimplePipeline will call the process method on each of the pipes in the order that they were added to the pipeline. As long as the pipes don't maintain state, you can call SimplePipeline#process as many times as you want, potentially with each run working with a different payload object.

Pipes

A pipe can be any Ruby object that has a process method that accepts a single argument (the payload object). For example:

class Pipe1
  def process (payload)
    # Do something with the payload object
  end
end

If your object does not have a process method, you can still use it as a pipe, but you will need to explicitly state the method name using the :process_method option when adding the pipe to the pipeline. Likewise, the specified method must accept a single payload argument.

class AlternatePipe
  def execute (input)
    # Do something with the input
  end
    
  def invoke (param1, param2)
    # Do something else
  end
end

pipeline.add AlternativePipe.new, :process_method => :execute # => OK

pipeline.add AlternativePipe.new, :process_method => :invoke # => throws ArgumentError

Payload

The payload can be an Array, Hash, or any other Ruby object. Individual pipes have the responsibility to know what to do with the payload that is passed into the process method.

SimplePipeline supports an easy way to validate the contents of the current payload prior to the execution of a pipe. You can use the SimplePipeline::Validation mixin and specify validation rules via lambdas, which will be checked prior to processing the pipe. If any of the validation rules fail (i.e. the lambda returns false or raises an error), a SimplePipeline::Validation::Error will be raised.

class ValidatedPipe
  include SimplePipeline::Validation

  validate ->(x) { x[:a] }            # x[:a] must exist 
  validate ->(x) { x[:b] == 1 }       # x[:b] must be equal to 1
  validate ->(x) { x[:c].nil? }       # x[:c] must not exist
  validate ->(x) { x[:d][:e] < 5 }    # You can even do complex things like this

  def process (payload)
    # Do something
  end
end

pipeline = SimplePipeline.new
pipeline.add ValidatedPipe.new

payload = {:a => some_value}

# Will throw a SimplePipeline::Validation::Error because not all validation rules are satisfied
pipeline.process payload

Timeout

You can use the SimplePipeline::Timeout mixin to enforce a timeout value (in seconds) for a pipe. If the execution of the process method exceeds the specified timeout value, a Timeout::Error will be thrown.

class TimeoutPipe
  include SimplePipeline::Timeout

  # Set the timeout value to be 3 seconds
  set_timeout 3 

  def process (payload)
    # Do something
  end
end

pipeline = SimplePipeline.new
pipeline.add TimeoutPipe.new

payload = {:some_key => some_value}

# Will throw a Timeout::Error if execution of process on the TimeoutPipe instance takes longer than 3 seconds
pipeline.process payload

You can also set the timeout value on a per instance basis. This will override the timeout value set by the class definition.

pipe1 = TimeoutPipe.new
pipe1.set_timeout 10 # seconds

pipe2 = TimeoutPipe.new
pipe2.set_timeout 60 # seconds

pipeline.add pipe1
pipeline.add pipe2

If you don't want to use the SimplePipeline::Timeout mixin for your pipe, you can still set a timeout by passing in a :timeout option when you are adding the pipe. If you do this, the param value will take precedence over any other timeout value set by the mixin.

# Timeout value set to 10 seconds, even though SomePipe doesn't include SimplePipeline::Timeout
pipeline.add SomePipe.new, :timeout => 10 

# Timeout value set to 10 seconds, even though TimeoutPipe defaults to a timeout of 3 seconds
pipeline.add TimeoutPipe.new, :timeout => 10 

Exception Handling

By default, execution of the entire pipeline will halt if any of the pipes raise a StandardError. However, this can be overriden using the :continue_on_error? option.

# Pipeline continues executing if any kind of StandardError is encountered
pipeline.add pipe, :continue_on_error? => true

# Pipeline continues executing if NameError or subclass (e.g. NoMethodError) is encountered
pipeline.add pipe, :continue_on_error? => NameError

# Pipeline continues executing on either ArgumentError or NameError (or subclass)
pipeline.add pipe, :continue_on_error? => [ArgumentError, NameError]

# Pipeline continues executing if any kind of Exception is encountered - not recommended
pipeline.add pipe, :continue_on_error? => Exception

After the pipeline finishes executing, you can call SimplePipeline#errors to get an Array of errors that were caught during execution. This Array will clear itself if the pipeline is run again.

pipeline.errors # => Array of errors caught during last run

Other Pipeline-Related Projects (for Ruby)

About

Simple framework for sequentially executing a series of reusable code components.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages