Skip to content

Commit

Permalink
feat: implement async events
Browse files Browse the repository at this point in the history
  • Loading branch information
bibendi committed Nov 1, 2021
1 parent b6a2430 commit fa7c374
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/gemfiles/
/pkg/
/spec/reports/
/tmp/
tmp/
.pry_history
.rspec_status
Gemfile.lock
Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Downstream supports various adapters for event handling. It can be configured in
```ruby
Downstream.configure do |config|
config.pubsub = :stateless # it's a default adapter
config.async_queue = :high_priority # nil by default
end
```

Expand Down Expand Up @@ -112,6 +113,20 @@ Downstream.subscribed(subscriber, to: ProfileCreated) do
end
```

If you want to handle events in a background job, you can pass the `async: true` option:

```ruby
store.subscribe OnProfileCreated::DoThat, async: true
```

By default, a job will be enqueued into `async_queue` name from the Downstream config. You can define your own queue name for a specific subscriber:

```ruby
store.subscribe OnProfileCreated::DoThat, async: {queue: :low_priority}
```

**NOTE:** all subscribers are synchronous by default

## Testing

You can test subscribers as normal Ruby objects.
Expand Down
5 changes: 5 additions & 0 deletions downstream.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]
spec.required_ruby_version = ">= 2.5"

spec.add_dependency "after_commit_everywhere", "~> 1.0"
spec.add_dependency "globalid", "~> 0.5"
spec.add_dependency "rails", ">= 6"

spec.add_development_dependency "appraisal", "~> 2.2"
spec.add_development_dependency "bundler", ">= 1.16"
spec.add_development_dependency "combustion", "~> 1.3"
spec.add_development_dependency "debug", "~> 1.3"
spec.add_development_dependency "rake", "~> 13.0"
spec.add_development_dependency "rspec", "~> 3.0"
spec.add_development_dependency "rspec-rails", "~> 5.0"
spec.add_development_dependency "sqlite3", "~> 1.4"
spec.add_development_dependency "standard", "~> 1.3"
end
7 changes: 5 additions & 2 deletions lib/downstream.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# frozen_string_literal: true

require "active_support"
require "active_job"
require "active_model"
require "globalid"
require "after_commit_everywhere"

require "downstream/config"
require "downstream/event"
Expand All @@ -20,13 +23,13 @@ def configure
yield config
end

def subscribe(subscriber = nil, to: nil, &block)
def subscribe(subscriber = nil, to: nil, async: false, &block)
subscriber ||= block if block
raise ArgumentError, "Subsriber must be present" if subscriber.nil?

identifier = construct_identifier(subscriber, to)

pubsub.subscribe(identifier, subscriber)
pubsub.subscribe(identifier, subscriber, async: async)
end

# temporary subscriptions
Expand Down
1 change: 1 addition & 0 deletions lib/downstream/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

module Downstream
class Config
attr_accessor :async_queue
attr_writer :namespace

def namespace
Expand Down
41 changes: 40 additions & 1 deletion lib/downstream/event.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
# frozen_string_literal: true

GlobalID::Locator.use :downstream do |gid|
params = gid.params.each_with_object({}) do |(key, value), memo|
memo[key.to_sym] = if value.is_a?(String) && value.start_with?("gid://")
GlobalID::Locator.locate(value)
else
value
end
end

gid.model_name.constantize
.new(event_id: gid.model_id, **params)
end

module Downstream
class Event
extend ActiveModel::Naming
include GlobalID::Identification

RESERVED_ATTRIBUTES = %i[event_id type].freeze
RESERVED_ATTRIBUTES = %i[id event_id type].freeze

class << self
attr_writer :identifier
Expand Down Expand Up @@ -57,6 +71,8 @@ def lookup_ancestors

attr_reader :event_id, :data, :errors

alias_method :id, :event_id

def initialize(event_id: nil, **params)
@event_id = event_id || SecureRandom.hex(10)
validate_attributes!(params)
Expand All @@ -77,6 +93,20 @@ def to_h
}
end

def to_global_id
new_data = data.each_with_object({}) do |(key, value), memo|
memo[key] = if value.respond_to?(:to_global_id)
value.to_global_id
else
value
end
end

super(new_data.merge!(app: :downstream))
end

alias_method :to_gid, :to_global_id

def inspect
"#{self.class.name}<#{type}##{event_id}>, data: #{data}"
end
Expand All @@ -85,6 +115,15 @@ def read_attribute_for_validation(attr)
data.fetch(attr)
end

def ==(other)
super ||
other.instance_of?(self.class) &&
!event_id.nil? &&
other.event_id == event_id
end

alias_method :eql?, :==

private

def validate_attributes!(params)
Expand Down
4 changes: 2 additions & 2 deletions lib/downstream/pubsub_adapters/stateless/pubsub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
module Downstream
module Stateless
class Pubsub < AbstractPubsub
def subscribe(identifier, callable)
def subscribe(identifier, callable, async: false)
ActiveSupport::Notifications.subscribe(
identifier,
Subscriber.new(callable)
Subscriber.new(callable, async: async)
)
end

Expand Down
44 changes: 39 additions & 5 deletions lib/downstream/pubsub_adapters/stateless/subscriber.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,53 @@
require_relative "subscriber_job"

module Downstream
module Stateless
class Subscriber
attr_reader :callable
include AfterCommitEverywhere

attr_reader :callable, :async

def initialize(callable)
def initialize(callable, async: false)
@callable = callable
@async = async
end

def async?
!!async
end

def call(name, event)
if (callable.respond_to?(:arity) && callable.arity == 2) || callable.method(:call).arity == 2
callable.call(name, event)
def call(_name, event)
if async?
if callable.is_a?(Proc) || callable.name.nil?
raise ArgumentError, "Anonymous subscribers (blocks/procs/lambdas or anonymous modules) cannot be asynchronous"
end

raise ArgumentError, "Async subscriber must be a module/class, not instance" unless callable.is_a?(Module)

after_commit do
SubscriberJob.then do |job|
if (queue_name = async_queue_name)
job.set(queue: queue_name)
else
job
end
end.perform_later(event, callable.name)
end
else
callable.call(event)
end
end

private

def async_queue_name
return @async_queue_name if defined?(@async_queue_name)

name = async[:queue] if async.is_a?(Hash)
name ||= Downstream.config.async_queue

@async_queue_name = name
end
end
end
end
9 changes: 9 additions & 0 deletions lib/downstream/pubsub_adapters/stateless/subscriber_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module Downstream
module Stateless
class SubscriberJob < ActiveJob::Base
def perform(event, callable)
callable.constantize.call(event)
end
end
end
end
3 changes: 3 additions & 0 deletions spec/internal/config/database.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
test:
adapter: sqlite3
database: tmp/test.sqlite3
2 changes: 2 additions & 0 deletions spec/internal/db/schema.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ActiveRecord::Schema.define do
end
31 changes: 31 additions & 0 deletions spec/lib/stateless/subscriber_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

require "spec_helper"

module TestSubscriberJob
module Subscriber
class << self
def events
@events ||= []
end

def call(event)
events << event
end
end
end
end

describe Downstream::Stateless::SubscriberJob do
let(:event_class) { Downstream::TestEvent }
let(:event) { event_class.new(user_id: 1) }
let(:callable) { "TestSubscriberJob::Subscriber" }

subject { described_class.perform_now(event, callable) }

it "handles an event" do
subject
expect(TestSubscriberJob::Subscriber.events.size).to eq 1
expect(TestSubscriberJob::Subscriber.events.first).to eq event
end
end
24 changes: 24 additions & 0 deletions spec/lib/subscriptions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

require "spec_helper"

module TestSubscriptions
module AsyncCallable
class << self
def events
@events ||= []
end

def call(event)
events << event
end
end
end
end

describe "sync #subscribe" do
let(:event_class) { Downstream::TestEvent }

Expand Down Expand Up @@ -85,4 +99,14 @@ def call(event)

expect(events_seen.size).to eq 0
end

it "subscribes async" do
Downstream.subscribe(TestSubscriptions::AsyncCallable, to: event_class, async: true)

event = event_class.new(user_id: 0)

expect { Downstream.publish(event) }.to have_enqueued_job(Downstream::Stateless::SubscriberJob)
.with(event, "TestSubscriptions::AsyncCallable")
expect(TestSubscriptions::AsyncCallable.events).to be_empty
end
end
23 changes: 23 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,21 @@
require "bundler/setup"
require "debug"
require "rspec"

require "combustion"
Combustion.initialize! :active_record, :action_controller, :active_job do
config.logger = Logger.new(nil)
config.log_level = :fatal
config.active_job.queue_adapter = :test
end

require "rspec/rails"
require "downstream"

Downstream.configure do |config|
config.pubsub = :stateless
end

require_relative "support/test_events"

RSpec.configure do |config|
Expand All @@ -14,4 +27,14 @@
config.expect_with :rspec do |c|
c.syntax = :expect
end

config.filter_run_when_matching :focus
config.example_status_persistence_file_path = "tmp/rspec_examples.txt"
config.run_all_when_everything_filtered = true

config.after(:each) do
# Clear ActiveJob jobs
ActiveJob::Base.queue_adapter.enqueued_jobs.clear
ActiveJob::Base.queue_adapter.performed_jobs.clear
end
end

0 comments on commit fa7c374

Please sign in to comment.