Skip to content

Commit

Permalink
use activerecord-imports "import"-method
Browse files Browse the repository at this point in the history
See #66
  • Loading branch information
aiomaster committed Oct 16, 2019
1 parent 0e9d330 commit 98a6d67
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 25 deletions.
79 changes: 57 additions & 22 deletions lib/strategy/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,41 @@ def initialize source_database, destination_database, name, user_strategies
@destination_database = destination_database
@fields_missing_strategy = DataAnon::Core::FieldsMissingStrategy.new name
@errors = DataAnon::Core::TableErrors.new(@name)
@bulk_process = defined?(::ActiveRecord::Import)
@primary_keys = []
end

def self.whitelist?
false
end

def bulk_process?
@bulk_process
end

def bulk_process flag
@bulk_process = flag
end

def collect_for_bulk_process record
Thread.current[:bulk_process_records] << record
end

def bulk_process_records
if bulk_process?
Thread.current[:bulk_process_records] = []
yield
bulk_store Thread.current[:bulk_process_records]
else
yield
end
end

def bulk_store(records)
columns = dest_table.column_names
dest_table.import columns, records, validate: false, on_duplicate_key_update: columns, timestamps: false
end

def process_fields &block
self.instance_eval &block
self
Expand Down Expand Up @@ -114,29 +142,35 @@ def process
def process_table progress
index = 0

source_table_limited.each do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
bulk_process_records do
source_table_limited.each do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
end
progress.show index
end
progress.show index
end
end

def process_table_in_batches progress
logger.info "Processing table #{@name} records in batch size of #{@batch_size}"
index = 0

source_table_limited.find_each(:batch_size => @batch_size) do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
source_table_limited.find_in_batches(:batch_size => @batch_size) do |records|
bulk_process_records do
records.each do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
end
progress.show index
end
end
progress.show index
end
end

Expand All @@ -154,13 +188,15 @@ def process_table_in_threads progress
end

thr = Thread.new {
records.each do |record|
begin
process_record_if index, record
index += 1
rescue => exception
puts exception.inspect
@errors.log_error record, exception
bulk_process_records do
records.each do |record|
begin
process_record_if index, record
index += 1
rescue => exception
puts exception.inspect
@errors.log_error record, exception
end
end
end
}
Expand Down Expand Up @@ -199,7 +235,6 @@ def progress_bar_class progress_bar
@progress_bar = progress_bar
end


end
end
end
9 changes: 8 additions & 1 deletion lib/strategy/blacklist.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ def process_record index, record
updates[database_field_name] = strategy.anonymize(field)
end
end
record.update_columns(updates) if updates.any?
if updates.any?
if bulk_process?
record.assign_attributes(updates)
collect_for_bulk_process(record)
else
record.update_columns(updates)
end
end
end

end
Expand Down
7 changes: 5 additions & 2 deletions lib/strategy/whitelist.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ def process_record(index, record)
@primary_keys.each do |key|
dest_record[key] = record[key]
end
dest_record.save!
if bulk_process?
collect_for_bulk_process(dest_record)
else
dest_record.save!
end
end


end
end
end

0 comments on commit 98a6d67

Please sign in to comment.