diff --git a/.gitignore b/.gitignore index 66be0c8..b53d7f5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ collections.yml /.bundle/ -Gemfile.lock +Gemfile.lock \ No newline at end of file diff --git a/lib/mosql/cli.rb b/lib/mosql/cli.rb index 4f537ec..975e0c8 100644 --- a/lib/mosql/cli.rb +++ b/lib/mosql/cli.rb @@ -160,7 +160,7 @@ def run metadata_table = MoSQL::Tailer.create_table(@sql.db, 'mosql_tailers') @tailer = MoSQL::Tailer.new([@mongo], :existing, metadata_table, - :service => options[:service]) + {:service => options[:service], :batch => true}) @streamer = Streamer.new(:options => @options, :tailer => @tailer, @@ -173,7 +173,12 @@ def run end unless options[:skip_tail] - @streamer.optail + begin + @streamer.optail + rescue => e + puts 'Unexpected error. Attempting to save' + @streamer.saveAll(true) + end end end end diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 7e0f119..2672778 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -209,8 +209,11 @@ def transform(ns, obj, schema=nil) # Do a deep clone, because we're potentially going to be # mutating embedded objects. - obj = BSON.deserialize(BSON.serialize(obj)) - + # Bikash - Sep 8, 2016 + # failure due ot large doc. changing below line + # obj = BSON.deserialize(BSON.serialize(obj)) + obj = BSON.deserialize(BSON::BSON_CODER.serialize(obj, false, false, 16*1024*1024)) + row = [] schema[:columns].each do |col| diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index d630e96..1f3e37d 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -2,9 +2,11 @@ module MoSQL class Streamer include MoSQL::Logging - BATCH = 1000 + BATCH_SIZE = 1000 + # How long to wait before saving unsaved inserts to postgres + INSERT_BATCH_TIMELIMIT = 5.0 - attr_reader :options, :tailer + attr_reader :options, :tailer, :batch_done NEW_KEYS = [:options, :tailer, :mongo, :sql, :schema] @@ -16,7 +18,10 @@ def initialize(opts) instance_variable_set(:"@#{parm.to_s}", opts[parm]) end - @done = false + # Hash to from namespace -> inserts that need to be made + @batch_insert_lists = Hash.new { |hash, key| hash[key] = [] } + @done = false + @batch_done = true end def stop @@ -42,6 +47,7 @@ def unsafe_handle_exceptions(ns, obj) if wrapped.result && options[:unsafe] log.warn("Ignoring row (#{obj.inspect}): #{e}") else + log.error("Erroring row (#{obj.inspect}): #{e}") log.error("Error processing #{obj.inspect} for #{ns}.") raise e end @@ -51,7 +57,8 @@ def unsafe_handle_exceptions(ns, obj) def bulk_upsert(table, ns, items) begin @schema.copy_data(table.db, ns, items) - rescue Sequel::DatabaseError => e + @batch_done = true + rescue => e #Sequel::DatabaseError log.debug("Bulk insert error (#{e}), attempting invidual upserts...") cols = @schema.all_columns(@schema.find_ns(ns)) items.each do |it| @@ -61,6 +68,7 @@ def bulk_upsert(table, ns, items) @sql.upsert!(table, @schema.primary_sql_key_for_ns(ns), h) end end + @batch_done = true end end @@ -141,13 +149,13 @@ def import_collection(ns, collection, filter) start = Time.now sql_time = 0 - collection.find(filter, :batch_size => BATCH) do |cursor| + collection.find(filter, :batch_size => BATCH_SIZE) do |cursor| with_retries do cursor.each do |obj| batch << @schema.transform(ns, obj) count += 1 - if batch.length >= BATCH + if batch.length >= BATCH_SIZE sql_time += track_time do bulk_upsert(table, ns, batch) end @@ -165,19 +173,48 @@ def import_collection(ns, collection, filter) end end + def saveAll(force = false) + return unless @batch_done + if force + tailer.save_state + else + tailer.maybe_save_state + end + end + def optail tail_from = options[:tail_from] if tail_from.is_a? Time tail_from = tailer.most_recent_position(tail_from) end + + last_batch_insert = Time.now tailer.tail(:from => tail_from, :filter => options[:oplog_filter]) until @done tailer.stream(1000) do |op| handle_op(op) end + time = Time.now + if time - last_batch_insert >= INSERT_BATCH_TIMELIMIT + last_batch_insert = time + do_batch_inserts + end + + saveAll end + + log.info("Finishing, doing last batch inserts.") + do_batch_inserts + saveAll end + # Handle $set, $inc and other operators in updates. Done by querying + # mongo and setting the value to whatever mongo holds at the time. + # Note that this somewhat messes with consistency as postgres will be + # "ahead" everything else if tailer is behind. + # + # If no such object is found, try to delete according to primary keys that + # must be present in selector (and not behind $set and etc). def sync_object(ns, selector) obj = collection_for_ns(ns).find_one(selector) if obj @@ -189,6 +226,36 @@ def sync_object(ns, selector) end end + # Add this op to be batch inserted to namespace + # next time a non-insert happens + def queue_to_batch_insert(op, namespace) + @batch_insert_lists[namespace] << @schema.transform(namespace, op['o']) + if @batch_insert_lists[namespace].length >= BATCH_SIZE + do_batch_inserts(namespace) + end + end + + # Do a batch insert for that namespace, putting data to postgres. + # If no namespace is given, all namespaces are done + def do_batch_inserts(namespace=nil) + if namespace.nil? + @batch_insert_lists.keys.each do |ns| + do_batch_inserts(ns) + end + else + to_batch = @batch_insert_lists[namespace] + @batch_insert_lists[namespace] = [] + if to_batch.empty? + @batch_done = true + return + end + + table = @sql.table_for_ns(namespace) + log.debug("Batch inserting #{to_batch.length} items to #{table} from #{namespace}.") + bulk_upsert(table, namespace, to_batch) + end + end + def handle_op(op) log.debug("processing op: #{op.inspect}") unless op['ns'] && op['op'] @@ -220,9 +287,8 @@ def handle_op(op) if collection_name == 'system.indexes' log.info("Skipping index update: #{op.inspect}") else - unsafe_handle_exceptions(ns, op['o']) do - @sql.upsert_ns(ns, op['o']) - end + @batch_done = false + queue_to_batch_insert(op, ns) end when 'u' selector = op['o2'] @@ -231,6 +297,7 @@ def handle_op(op) log.debug("resync #{ns}: #{selector['_id']} (update was: #{update.inspect})") sync_object(ns, selector) else + do_batch_inserts(ns) # The update operation replaces the existing object, but # preserves its _id field, so grab the _id off of the @@ -252,6 +319,8 @@ def handle_op(op) end end when 'd' + do_batch_inserts(ns) + if options[:ignore_delete] log.debug("Ignoring delete op on #{ns} as instructed.") else diff --git a/test/functional/streamer.rb b/test/functional/streamer.rb index 43be94b..9671e48 100644 --- a/test/functional/streamer.rb +++ b/test/functional/streamer.rb @@ -392,6 +392,7 @@ def build_streamer "ns" => "db.has_timestamp", "o" => mongo['db']['has_timestamp'].find_one({_id: id}) }) + @streamer.do_batch_inserts got = @sequel[:has_timestamp].where(:_id => id.to_s).select.first[:ts] assert_equal(ts.to_i, got.to_i) assert_equal(ts.tv_usec, got.tv_usec) diff --git a/test/functional/transform.rb b/test/functional/transform.rb index 234b97a..7cf0bcb 100644 --- a/test/functional/transform.rb +++ b/test/functional/transform.rb @@ -114,6 +114,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional "ns" => "test.test_transform", "o" => collection.find_one(_id: id) }) + streamer.do_batch_inserts got = @sequel[:test_transform].where(_id: id).to_a assert_equal(sql, got.first[:value], "was able to transform a #{typ} field while streaming")