From e7be442eec611fab78260ccf55b0a6ca6ec4e458 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Thu, 18 Sep 2014 16:24:53 -0700 Subject: [PATCH] Batch inserts Basic strategy is to batch consecutive inserts together per namespace. Batch gets saved whenever: - An update or delete is done to the same namespace as the insert - After streaming (up to) 1000 updates from oplog, time from last batch update is larger than 5 seconds. - More than a threshold of updates have happened in this namespace. - Program is exiting/streaming stops. --- Gemfile.lock | 16 ++++----- lib/mosql/streamer.rb | 63 ++++++++++++++++++++++++++++++++---- lib/mosql/version.rb | 2 +- mosql.gemspec | 2 +- test/functional/streamer.rb | 1 + test/functional/transform.rb | 1 + 6 files changed, 68 insertions(+), 17 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 5a56283..137b701 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,12 +1,12 @@ PATH remote: . specs: - mosql (0.4.0) + mosql (0.5.0) bson_ext json log4r mongo - mongoriver (= 0.4) + mongoriver (= 0.4.2) pg rake sequel @@ -14,18 +14,18 @@ PATH GEM remote: https://rubygems.org/ specs: - bson (1.10.2) - bson_ext (1.10.2) - bson (~> 1.10.2) + bson (1.11.1) + bson_ext (1.11.1) + bson (~> 1.11.1) json (1.8.1) log4r (1.1.10) metaclass (0.0.4) minitest (3.0.0) mocha (1.0.0) metaclass (~> 0.0.1) - mongo (1.10.2) - bson (= 1.10.2) - mongoriver (0.4.0) + mongo (1.11.1) + bson (= 1.11.1) + mongoriver (0.4.2) bson_ext log4r mongo (>= 1.7) diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index 7c03810..d5a39f5 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -2,7 +2,9 @@ module MoSQL class Streamer include MoSQL::Logging - BATCH = 1000 + BATCH_SIZE = 1000 + # How long to wait before saving unsaved inserts to postgres + INSERT_BATCH_TIMELIMIT = 5.0 attr_reader :options, :tailer @@ -16,7 +18,9 @@ def initialize(opts) instance_variable_set(:"@#{parm.to_s}", opts[parm]) end - @done = false + # Hash to from namespace -> inserts that need to be made + @batch_insert_lists = Hash.new { |hash, key| hash[key] = [] } + @done = false end def stop @@ -141,13 +145,13 @@ def import_collection(ns, collection, filter) start = Time.now sql_time = 0 - collection.find(filter, :batch_size => BATCH) do |cursor| + collection.find(filter, :batch_size => BATCH_SIZE) do |cursor| with_retries do cursor.each do |obj| batch << @schema.transform(ns, obj) count += 1 - if batch.length >= BATCH + if batch.length >= BATCH_SIZE sql_time += track_time do bulk_upsert(table, ns, batch) end @@ -170,14 +174,31 @@ def optail if tail_from.is_a? Time tail_from = tailer.most_recent_position(tail_from) end + + last_batch_insert = Time.now tailer.tail(:from => tail_from) until @done tailer.stream(1000) do |op| handle_op(op) end + time = Time.now + if time - last_batch_insert >= INSERT_BATCH_TIMELIMIT + last_batch_insert = time + do_batch_inserts + end end + + log.info("Finishing, doing last batch inserts.") + do_batch_inserts end + # Handle $set, $inc and other operators in updates. Done by querying + # mongo and setting the value to whatever mongo holds at the time. + # Note that this somewhat messes with consistency as postgres will be + # "ahead" everything else if tailer is behind. + # + # If no such object is found, try to delete according to primary keys that + # must be present in selector (and not behind $set and etc). def sync_object(ns, selector) obj = collection_for_ns(ns).find_one(selector) if obj @@ -196,6 +217,33 @@ def sync_object(ns, selector) end end + # Add this op to be batch inserted to namespace + # next time a non-insert happens + def queue_to_batch_insert(op, namespace) + @batch_insert_lists[namespace] << @schema.transform(namespace, op['o']) + if @batch_insert_lists[namespace].length >= BATCH_SIZE + do_batch_inserts(namespace) + end + end + + # Do a batch insert for that namespace, putting data to postgres. + # If no namespace is given, all namespaces are done + def do_batch_inserts(namespace=nil) + if namespace.nil? + @batch_insert_lists.keys.each do |ns| + do_batch_inserts(ns) + end + else + to_batch = @batch_insert_lists[namespace] + @batch_insert_lists[namespace] = [] + return if to_batch.empty? + + table = @sql.table_for_ns(namespace) + log.debug("Batch inserting #{to_batch.length} items to #{table} from #{namespace}.") + bulk_upsert(table, namespace, to_batch) + end + end + def handle_op(op) log.debug("processing op: #{op.inspect}") unless op['ns'] && op['op'] @@ -227,9 +275,7 @@ def handle_op(op) if collection_name == 'system.indexes' log.info("Skipping index update: #{op.inspect}") else - unsafe_handle_exceptions(ns, op['o']) do - @sql.upsert_ns(ns, op['o']) - end + queue_to_batch_insert(op, ns) end when 'u' selector = op['o2'] @@ -238,6 +284,7 @@ def handle_op(op) log.debug("resync #{ns}: #{selector['_id']} (update was: #{update.inspect})") sync_object(ns, selector) else + do_batch_inserts(ns) # The update operation replaces the existing object, but # preserves its _id field, so grab the _id off of the @@ -259,6 +306,8 @@ def handle_op(op) end end when 'd' + do_batch_inserts(ns) + if options[:ignore_delete] log.debug("Ignoring delete op on #{ns} as instructed.") else diff --git a/lib/mosql/version.rb b/lib/mosql/version.rb index 96f803b..daa119c 100644 --- a/lib/mosql/version.rb +++ b/lib/mosql/version.rb @@ -1,3 +1,3 @@ module MoSQL - VERSION = "0.4.0" + VERSION = "0.5.0" end diff --git a/mosql.gemspec b/mosql.gemspec index 8e83e6c..0fbc7fb 100644 --- a/mosql.gemspec +++ b/mosql.gemspec @@ -18,7 +18,7 @@ Gem::Specification.new do |gem| %w[sequel pg mongo bson_ext rake log4r json ].each { |dep| gem.add_runtime_dependency(dep) } - gem.add_runtime_dependency "mongoriver", "0.4" + gem.add_runtime_dependency "mongoriver", "0.4.2" gem.add_development_dependency "minitest" gem.add_development_dependency "mocha" diff --git a/test/functional/streamer.rb b/test/functional/streamer.rb index b4f0122..068845b 100644 --- a/test/functional/streamer.rb +++ b/test/functional/streamer.rb @@ -365,6 +365,7 @@ def build_streamer "ns" => "db.has_timestamp", "o" => mongo['db']['has_timestamp'].find_one({_id: id}) }) + @streamer.do_batch_inserts got = @sequel[:has_timestamp].where(:_id => id.to_s).select.first[:ts] assert_equal(ts.to_i, got.to_i) assert_equal(ts.tv_usec, got.tv_usec) diff --git a/test/functional/transform.rb b/test/functional/transform.rb index 234b97a..7cf0bcb 100644 --- a/test/functional/transform.rb +++ b/test/functional/transform.rb @@ -114,6 +114,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional "ns" => "test.test_transform", "o" => collection.find_one(_id: id) }) + streamer.do_batch_inserts got = @sequel[:test_transform].where(_id: id).to_a assert_equal(sql, got.first[:value], "was able to transform a #{typ} field while streaming")