From fa7c37474d1a1edc9e30740737989fe22fb3fdff Mon Sep 17 00:00:00 2001 From: Misha Merkushin Date: Mon, 1 Nov 2021 17:55:28 +0300 Subject: [PATCH] feat: implement async events --- .gitignore | 2 +- README.md | 15 +++++++ downstream.gemspec | 5 +++ lib/downstream.rb | 7 ++- lib/downstream/config.rb | 1 + lib/downstream/event.rb | 41 ++++++++++++++++- .../pubsub_adapters/stateless/pubsub.rb | 4 +- .../pubsub_adapters/stateless/subscriber.rb | 44 ++++++++++++++++--- .../stateless/subscriber_job.rb | 9 ++++ spec/internal/config/database.yml | 3 ++ spec/internal/db/schema.rb | 2 + spec/lib/stateless/subscriber_job_spec.rb | 31 +++++++++++++ spec/lib/subscriptions_spec.rb | 24 ++++++++++ spec/spec_helper.rb | 23 ++++++++++ 14 files changed, 200 insertions(+), 11 deletions(-) create mode 100644 lib/downstream/pubsub_adapters/stateless/subscriber_job.rb create mode 100644 spec/internal/config/database.yml create mode 100644 spec/internal/db/schema.rb create mode 100644 spec/lib/stateless/subscriber_job_spec.rb diff --git a/.gitignore b/.gitignore index d626d3c..49912f1 100644 --- a/.gitignore +++ b/.gitignore @@ -6,7 +6,7 @@ /gemfiles/ /pkg/ /spec/reports/ -/tmp/ +tmp/ .pry_history .rspec_status Gemfile.lock diff --git a/README.md b/README.md index 6f2dd4f..0b17a5e 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ Downstream supports various adapters for event handling. It can be configured in ```ruby Downstream.configure do |config| config.pubsub = :stateless # it's a default adapter + config.async_queue = :high_priority # nil by default end ``` @@ -112,6 +113,20 @@ Downstream.subscribed(subscriber, to: ProfileCreated) do end ``` +If you want to handle events in a background job, you can pass the `async: true` option: + +```ruby +store.subscribe OnProfileCreated::DoThat, async: true +``` + +By default, a job will be enqueued into `async_queue` name from the Downstream config. You can define your own queue name for a specific subscriber: + +```ruby +store.subscribe OnProfileCreated::DoThat, async: {queue: :low_priority} +``` + +**NOTE:** all subscribers are synchronous by default + ## Testing You can test subscribers as normal Ruby objects. diff --git a/downstream.gemspec b/downstream.gemspec index d1f8474..41e71ac 100644 --- a/downstream.gemspec +++ b/downstream.gemspec @@ -27,12 +27,17 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.required_ruby_version = ">= 2.5" + spec.add_dependency "after_commit_everywhere", "~> 1.0" + spec.add_dependency "globalid", "~> 0.5" spec.add_dependency "rails", ">= 6" spec.add_development_dependency "appraisal", "~> 2.2" spec.add_development_dependency "bundler", ">= 1.16" + spec.add_development_dependency "combustion", "~> 1.3" spec.add_development_dependency "debug", "~> 1.3" spec.add_development_dependency "rake", "~> 13.0" spec.add_development_dependency "rspec", "~> 3.0" + spec.add_development_dependency "rspec-rails", "~> 5.0" + spec.add_development_dependency "sqlite3", "~> 1.4" spec.add_development_dependency "standard", "~> 1.3" end diff --git a/lib/downstream.rb b/lib/downstream.rb index 1492d96..62f644b 100644 --- a/lib/downstream.rb +++ b/lib/downstream.rb @@ -1,7 +1,10 @@ # frozen_string_literal: true require "active_support" +require "active_job" require "active_model" +require "globalid" +require "after_commit_everywhere" require "downstream/config" require "downstream/event" @@ -20,13 +23,13 @@ def configure yield config end - def subscribe(subscriber = nil, to: nil, &block) + def subscribe(subscriber = nil, to: nil, async: false, &block) subscriber ||= block if block raise ArgumentError, "Subsriber must be present" if subscriber.nil? identifier = construct_identifier(subscriber, to) - pubsub.subscribe(identifier, subscriber) + pubsub.subscribe(identifier, subscriber, async: async) end # temporary subscriptions diff --git a/lib/downstream/config.rb b/lib/downstream/config.rb index dff6003..70d53ad 100644 --- a/lib/downstream/config.rb +++ b/lib/downstream/config.rb @@ -4,6 +4,7 @@ module Downstream class Config + attr_accessor :async_queue attr_writer :namespace def namespace diff --git a/lib/downstream/event.rb b/lib/downstream/event.rb index 787a4b8..fef2a2e 100644 --- a/lib/downstream/event.rb +++ b/lib/downstream/event.rb @@ -1,10 +1,24 @@ # frozen_string_literal: true +GlobalID::Locator.use :downstream do |gid| + params = gid.params.each_with_object({}) do |(key, value), memo| + memo[key.to_sym] = if value.is_a?(String) && value.start_with?("gid://") + GlobalID::Locator.locate(value) + else + value + end + end + + gid.model_name.constantize + .new(event_id: gid.model_id, **params) +end + module Downstream class Event extend ActiveModel::Naming + include GlobalID::Identification - RESERVED_ATTRIBUTES = %i[event_id type].freeze + RESERVED_ATTRIBUTES = %i[id event_id type].freeze class << self attr_writer :identifier @@ -57,6 +71,8 @@ def lookup_ancestors attr_reader :event_id, :data, :errors + alias_method :id, :event_id + def initialize(event_id: nil, **params) @event_id = event_id || SecureRandom.hex(10) validate_attributes!(params) @@ -77,6 +93,20 @@ def to_h } end + def to_global_id + new_data = data.each_with_object({}) do |(key, value), memo| + memo[key] = if value.respond_to?(:to_global_id) + value.to_global_id + else + value + end + end + + super(new_data.merge!(app: :downstream)) + end + + alias_method :to_gid, :to_global_id + def inspect "#{self.class.name}<#{type}##{event_id}>, data: #{data}" end @@ -85,6 +115,15 @@ def read_attribute_for_validation(attr) data.fetch(attr) end + def ==(other) + super || + other.instance_of?(self.class) && + !event_id.nil? && + other.event_id == event_id + end + + alias_method :eql?, :== + private def validate_attributes!(params) diff --git a/lib/downstream/pubsub_adapters/stateless/pubsub.rb b/lib/downstream/pubsub_adapters/stateless/pubsub.rb index c54712e..b2ba9dd 100644 --- a/lib/downstream/pubsub_adapters/stateless/pubsub.rb +++ b/lib/downstream/pubsub_adapters/stateless/pubsub.rb @@ -4,10 +4,10 @@ module Downstream module Stateless class Pubsub < AbstractPubsub - def subscribe(identifier, callable) + def subscribe(identifier, callable, async: false) ActiveSupport::Notifications.subscribe( identifier, - Subscriber.new(callable) + Subscriber.new(callable, async: async) ) end diff --git a/lib/downstream/pubsub_adapters/stateless/subscriber.rb b/lib/downstream/pubsub_adapters/stateless/subscriber.rb index 2773cd7..4693482 100644 --- a/lib/downstream/pubsub_adapters/stateless/subscriber.rb +++ b/lib/downstream/pubsub_adapters/stateless/subscriber.rb @@ -1,19 +1,53 @@ +require_relative "subscriber_job" + module Downstream module Stateless class Subscriber - attr_reader :callable + include AfterCommitEverywhere + + attr_reader :callable, :async - def initialize(callable) + def initialize(callable, async: false) @callable = callable + @async = async + end + + def async? + !!async end - def call(name, event) - if (callable.respond_to?(:arity) && callable.arity == 2) || callable.method(:call).arity == 2 - callable.call(name, event) + def call(_name, event) + if async? + if callable.is_a?(Proc) || callable.name.nil? + raise ArgumentError, "Anonymous subscribers (blocks/procs/lambdas or anonymous modules) cannot be asynchronous" + end + + raise ArgumentError, "Async subscriber must be a module/class, not instance" unless callable.is_a?(Module) + + after_commit do + SubscriberJob.then do |job| + if (queue_name = async_queue_name) + job.set(queue: queue_name) + else + job + end + end.perform_later(event, callable.name) + end else callable.call(event) end end + + private + + def async_queue_name + return @async_queue_name if defined?(@async_queue_name) + + name = async[:queue] if async.is_a?(Hash) + name ||= Downstream.config.async_queue + + @async_queue_name = name + end end end end diff --git a/lib/downstream/pubsub_adapters/stateless/subscriber_job.rb b/lib/downstream/pubsub_adapters/stateless/subscriber_job.rb new file mode 100644 index 0000000..5d651f2 --- /dev/null +++ b/lib/downstream/pubsub_adapters/stateless/subscriber_job.rb @@ -0,0 +1,9 @@ +module Downstream + module Stateless + class SubscriberJob < ActiveJob::Base + def perform(event, callable) + callable.constantize.call(event) + end + end + end +end diff --git a/spec/internal/config/database.yml b/spec/internal/config/database.yml new file mode 100644 index 0000000..87341fb --- /dev/null +++ b/spec/internal/config/database.yml @@ -0,0 +1,3 @@ +test: + adapter: sqlite3 + database: tmp/test.sqlite3 diff --git a/spec/internal/db/schema.rb b/spec/internal/db/schema.rb new file mode 100644 index 0000000..77ae0a2 --- /dev/null +++ b/spec/internal/db/schema.rb @@ -0,0 +1,2 @@ +ActiveRecord::Schema.define do +end diff --git a/spec/lib/stateless/subscriber_job_spec.rb b/spec/lib/stateless/subscriber_job_spec.rb new file mode 100644 index 0000000..0a7de9e --- /dev/null +++ b/spec/lib/stateless/subscriber_job_spec.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +require "spec_helper" + +module TestSubscriberJob + module Subscriber + class << self + def events + @events ||= [] + end + + def call(event) + events << event + end + end + end +end + +describe Downstream::Stateless::SubscriberJob do + let(:event_class) { Downstream::TestEvent } + let(:event) { event_class.new(user_id: 1) } + let(:callable) { "TestSubscriberJob::Subscriber" } + + subject { described_class.perform_now(event, callable) } + + it "handles an event" do + subject + expect(TestSubscriberJob::Subscriber.events.size).to eq 1 + expect(TestSubscriberJob::Subscriber.events.first).to eq event + end +end diff --git a/spec/lib/subscriptions_spec.rb b/spec/lib/subscriptions_spec.rb index 98c2cbd..39ab5e8 100644 --- a/spec/lib/subscriptions_spec.rb +++ b/spec/lib/subscriptions_spec.rb @@ -2,6 +2,20 @@ require "spec_helper" +module TestSubscriptions + module AsyncCallable + class << self + def events + @events ||= [] + end + + def call(event) + events << event + end + end + end +end + describe "sync #subscribe" do let(:event_class) { Downstream::TestEvent } @@ -85,4 +99,14 @@ def call(event) expect(events_seen.size).to eq 0 end + + it "subscribes async" do + Downstream.subscribe(TestSubscriptions::AsyncCallable, to: event_class, async: true) + + event = event_class.new(user_id: 0) + + expect { Downstream.publish(event) }.to have_enqueued_job(Downstream::Stateless::SubscriberJob) + .with(event, "TestSubscriptions::AsyncCallable") + expect(TestSubscriptions::AsyncCallable.events).to be_empty + end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 38019d6..6302726 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -3,8 +3,21 @@ require "bundler/setup" require "debug" require "rspec" + +require "combustion" +Combustion.initialize! :active_record, :action_controller, :active_job do + config.logger = Logger.new(nil) + config.log_level = :fatal + config.active_job.queue_adapter = :test +end + +require "rspec/rails" require "downstream" +Downstream.configure do |config| + config.pubsub = :stateless +end + require_relative "support/test_events" RSpec.configure do |config| @@ -14,4 +27,14 @@ config.expect_with :rspec do |c| c.syntax = :expect end + + config.filter_run_when_matching :focus + config.example_status_persistence_file_path = "tmp/rspec_examples.txt" + config.run_all_when_everything_filtered = true + + config.after(:each) do + # Clear ActiveJob jobs + ActiveJob::Base.queue_adapter.enqueued_jobs.clear + ActiveJob::Base.queue_adapter.performed_jobs.clear + end end