Skip to content

Commit

Permalink
use activerecord-imports "import"-method
Browse files Browse the repository at this point in the history
  • Loading branch information
aiomaster committed Jul 20, 2018
1 parent 6ccfda4 commit eee4c5e
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 23 deletions.
73 changes: 52 additions & 21 deletions lib/strategy/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,36 @@ def initialize source_database, destination_database, name, user_strategies
@destination_database = destination_database
@fields_missing_strategy = DataAnon::Core::FieldsMissingStrategy.new name
@errors = DataAnon::Core::TableErrors.new(@name)
@bulk_process = defined?(::ActiveRecord::Import)
@primary_keys = []
end

def self.whitelist?
false
end

def bulk_process?
@bulk_process
end

def bulk_process flag
@bulk_process = flag
end

def collect_for_bulk_process record
Thread.current[:bulk_process_records] << record
end

def bulk_process_records
if bulk_process?
Thread.current[:bulk_process_records] = []
yield
bulk_store Thread.current[:bulk_process_records]
else
yield
end
end

def process_fields &block
self.instance_eval &block
self
Expand Down Expand Up @@ -114,29 +137,35 @@ def process
def process_table progress
index = 0

source_table_limited.each do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
bulk_process_records do
source_table_limited.each do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
end
progress.show index
end
progress.show index
end
end

def process_table_in_batches progress
logger.info "Processing table #{@name} records in batch size of #{@batch_size}"
index = 0

source_table_limited.find_each(:batch_size => @batch_size) do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
source_table_limited.find_in_batches(:batch_size => @batch_size) do |records|
bulk_process_records do
records.each do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
end
progress.show index
end
end
progress.show index
end
end

Expand All @@ -154,13 +183,15 @@ def process_table_in_threads progress
end

thr = Thread.new {
records.each do |record|
begin
process_record_if index, record
index += 1
rescue => exception
puts exception.inspect
@errors.log_error record, exception
bulk_process_records do
records.each do |record|
begin
process_record_if index, record
index += 1
rescue => exception
puts exception.inspect
@errors.log_error record, exception
end
end
end
}
Expand Down
14 changes: 13 additions & 1 deletion lib/strategy/blacklist.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,19 @@ def process_record index, record
updates[database_field_name] = strategy.anonymize(field)
end
end
record.update_columns(updates) if updates.any?
if updates.any?
if bulk_process?
record.assign_attributes(updates)
collect_for_bulk_process(record)
else
record.update_columns(updates)
end
end
end

def bulk_store(records)
columns = @fields.keys
source_table.import @primary_keys + columns, records, validate: false, on_duplicate_key_update: columns
end

end
Expand Down
10 changes: 9 additions & 1 deletion lib/strategy/whitelist.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,17 @@ def process_record(index, record)
@primary_keys.each do |key|
dest_record[key] = record[key]
end
dest_record.save!
if bulk_process?
collect_for_bulk_process(dest_record)
else
dest_record.save!
end
end

def bulk_store(records)
columns = source_table.column_names
source_table.import @primary_keys + columns, records, validate: false, on_duplicate_key_update: columns
end

end
end
Expand Down

0 comments on commit eee4c5e

Please sign in to comment.