From 98a1713a8637b168f0e4d0a77f8a37a388c876db Mon Sep 17 00:00:00 2001 From: Christoph Wagner Date: Tue, 19 Jun 2018 15:35:46 +0200 Subject: [PATCH] use activerecord-imports "import"-method See https://github.com/sunitparekh/data-anonymization/issues/66 --- lib/strategy/base.rb | 79 ++++++++++++++++++++++++++++----------- lib/strategy/blacklist.rb | 9 ++++- lib/strategy/whitelist.rb | 7 +++- 3 files changed, 70 insertions(+), 25 deletions(-) diff --git a/lib/strategy/base.rb b/lib/strategy/base.rb index d2d2891..cf3ec28 100644 --- a/lib/strategy/base.rb +++ b/lib/strategy/base.rb @@ -13,6 +13,7 @@ def initialize source_database, destination_database, name, user_strategies @destination_database = destination_database @fields_missing_strategy = DataAnon::Core::FieldsMissingStrategy.new name @errors = DataAnon::Core::TableErrors.new(@name) + @bulk_process = defined?(::ActiveRecord::Import) @primary_keys = [] end @@ -20,6 +21,33 @@ def self.whitelist? false end + def bulk_process? + @bulk_process + end + + def bulk_process flag + @bulk_process = flag + end + + def collect_for_bulk_process record + Thread.current[:bulk_process_records] << record + end + + def bulk_process_records + if bulk_process? + Thread.current[:bulk_process_records] = [] + yield + bulk_store Thread.current[:bulk_process_records] + else + yield + end + end + + def bulk_store(records) + columns = dest_table.column_names + dest_table.import columns, records, validate: false, on_duplicate_key_update: columns, timestamps: false + end + def process_fields &block self.instance_eval &block self @@ -114,14 +142,16 @@ def process def process_table progress index = 0 - source_table_limited.each do |record| - index += 1 - begin - process_record_if index, record - rescue => exception - @errors.log_error record, exception + bulk_process_records do + source_table_limited.each do |record| + index += 1 + begin + process_record_if index, record + rescue => exception + @errors.log_error record, exception + end + progress.show index end - progress.show index end end @@ -129,14 +159,18 @@ def process_table_in_batches progress logger.info "Processing table #{@name} records in batch size of #{@batch_size}" index = 0 - source_table_limited.find_each(:batch_size => @batch_size) do |record| - index += 1 - begin - process_record_if index, record - rescue => exception - @errors.log_error record, exception + source_table_limited.find_in_batches(:batch_size => @batch_size) do |records| + bulk_process_records do + records.each do |record| + index += 1 + begin + process_record_if index, record + rescue => exception + @errors.log_error record, exception + end + progress.show index + end end - progress.show index end end @@ -154,13 +188,15 @@ def process_table_in_threads progress end thr = Thread.new { - records.each do |record| - begin - process_record_if index, record - index += 1 - rescue => exception - puts exception.inspect - @errors.log_error record, exception + bulk_process_records do + records.each do |record| + begin + process_record_if index, record + index += 1 + rescue => exception + puts exception.inspect + @errors.log_error record, exception + end end end } @@ -199,7 +235,6 @@ def progress_bar_class progress_bar @progress_bar = progress_bar end - end end end diff --git a/lib/strategy/blacklist.rb b/lib/strategy/blacklist.rb index a64ef57..2ebcab3 100644 --- a/lib/strategy/blacklist.rb +++ b/lib/strategy/blacklist.rb @@ -12,7 +12,14 @@ def process_record index, record updates[database_field_name] = strategy.anonymize(field) end end - record.update_columns(updates) if updates.any? + if updates.any? + if bulk_process? + record.assign_attributes(updates) + collect_for_bulk_process(record) + else + record.update_columns(updates) + end + end end end diff --git a/lib/strategy/whitelist.rb b/lib/strategy/whitelist.rb index 422ee91..2c421e3 100644 --- a/lib/strategy/whitelist.rb +++ b/lib/strategy/whitelist.rb @@ -19,10 +19,13 @@ def process_record(index, record) @primary_keys.each do |key| dest_record[key] = record[key] end - dest_record.save! + if bulk_process? + collect_for_bulk_process(dest_record) + else + dest_record.save! + end end - end end end