Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use activerecord-imports "import"-method #67

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 57 additions & 22 deletions lib/strategy/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,41 @@ def initialize source_database, destination_database, name, user_strategies
@destination_database = destination_database
@fields_missing_strategy = DataAnon::Core::FieldsMissingStrategy.new name
@errors = DataAnon::Core::TableErrors.new(@name)
@bulk_process = defined?(::ActiveRecord::Import)
@primary_keys = []
end

def self.whitelist?
false
end

def bulk_process?
@bulk_process
end

def bulk_process flag
@bulk_process = flag
end

def collect_for_bulk_process record
Thread.current[:bulk_process_records] << record
end

def bulk_process_records
if bulk_process?
Thread.current[:bulk_process_records] = []
yield
bulk_store Thread.current[:bulk_process_records]
else
yield
end
end

def bulk_store(records)
columns = dest_table.column_names
dest_table.import columns, records, validate: false, on_duplicate_key_update: columns, timestamps: false
end

def process_fields &block
self.instance_eval &block
self
Expand Down Expand Up @@ -114,29 +142,35 @@ def process
def process_table progress
index = 0

source_table_limited.each do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
bulk_process_records do
source_table_limited.each do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
end
progress.show index
end
progress.show index
end
end

def process_table_in_batches progress
logger.info "Processing table #{@name} records in batch size of #{@batch_size}"
index = 0

source_table_limited.find_each(:batch_size => @batch_size) do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
source_table_limited.find_in_batches(:batch_size => @batch_size) do |records|
bulk_process_records do
records.each do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
end
progress.show index
end
end
progress.show index
end
end

Expand All @@ -154,13 +188,15 @@ def process_table_in_threads progress
end

thr = Thread.new {
records.each do |record|
begin
process_record_if index, record
index += 1
rescue => exception
puts exception.inspect
@errors.log_error record, exception
bulk_process_records do
records.each do |record|
begin
process_record_if index, record
index += 1
rescue => exception
puts exception.inspect
@errors.log_error record, exception
end
end
end
}
Expand Down Expand Up @@ -199,7 +235,6 @@ def progress_bar_class progress_bar
@progress_bar = progress_bar
end


end
end
end
9 changes: 8 additions & 1 deletion lib/strategy/blacklist.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ def process_record index, record
updates[database_field_name] = strategy.anonymize(field)
end
end
record.update_columns(updates) if updates.any?
if updates.any?
if bulk_process?
record.assign_attributes(updates)
collect_for_bulk_process(record)
else
record.update_columns(updates)
end
end
end

end
Expand Down
7 changes: 5 additions & 2 deletions lib/strategy/whitelist.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ def process_record(index, record)
@primary_keys.each do |key|
dest_record[key] = record[key]
end
dest_record.save!
if bulk_process?
collect_for_bulk_process(dest_record)
else
dest_record.save!
end
end


end
end
end