diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 7e0f119..6b1bdca 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -12,7 +12,7 @@ def to_array(lst) col = { :source => ent.fetch(:source), :type => ent.fetch(:type), - :name => (ent.keys - [:source, :type]).first, + :name => (ent.keys - [:source, :type]).first } elsif ent.is_a?(Hash) && ent.keys.length == 1 && ent.values.first.is_a?(String) col = { @@ -217,8 +217,11 @@ def transform(ns, obj, schema=nil) source = col[:source] type = col[:type] - if source.start_with?("$") + if source == '__ignore__' + # Just ignore the field + elsif source.start_with?("$") v = fetch_special_source(obj, source, original) + row << v else v = fetch_and_delete_dotted(obj, source) case v @@ -234,8 +237,8 @@ def transform(ns, obj, schema=nil) else v = transform_primitive(v, type) end + row << v end - row << v end if schema[:meta][:extra_props] @@ -269,7 +272,7 @@ def sanitize(value) end def copy_column?(col) - col[:source] != '$timestamp' + col[:source] != '$timestamp' && col[:source] != '__ignore__' end def all_columns(schema, copy=false) @@ -320,7 +323,7 @@ def quote_copy(val) when Sequel::SQL::Blob "\\\\x" + [val].pack("h*") else - val.to_s.gsub(/([\\\t\n\r])/, '\\\\\\1') + val.to_s.encode!('UTF-8', :undef => :replace, :invalid => :replace).gsub(/([\\\t\n\r])/, '\\\\\\1') end end diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index d630e96..bd2a94e 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -52,8 +52,8 @@ def bulk_upsert(table, ns, items) begin @schema.copy_data(table.db, ns, items) rescue Sequel::DatabaseError => e - log.debug("Bulk insert error (#{e}), attempting invidual upserts...") - cols = @schema.all_columns(@schema.find_ns(ns)) + log.debug("Bulk insert error (#{e}), attempting individual upserts...") + cols = @schema.all_columns_for_copy(@schema.find_ns(ns)) items.each do |it| h = {} cols.zip(it).each { |k,v| h[k] = v }