diff --git a/lib/mosql.rb b/lib/mosql.rb index a2b6fa7..64dc23a 100644 --- a/lib/mosql.rb +++ b/lib/mosql.rb @@ -3,6 +3,7 @@ require 'sequel' require 'mongoriver' require 'json' +require 'pg' require 'mosql/version' require 'mosql/log' diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 7e0f119..504fe62 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -57,8 +57,13 @@ def parse_meta(meta) meta end + def set_gridfs(db) + @gridfs = Mongo::Grid.new(db) + end + def initialize(map) @map = {} + @gridfs = nil map.each do |dbname, db| @map[dbname] = { :meta => parse_meta(db[:meta]) } db.each do |cname, spec| @@ -172,10 +177,16 @@ def fetch_exists(obj, dotted) obj.has_key?(pieces.first) end - def fetch_special_source(obj, source, original) + def fetch_special_source(ns, obj, source, original) case source when "$timestamp" Sequel.function(:now) + when "$data" + dbname, collection = ns.split(".", 2) + if collection == 'fs.files' + file = @gridfs.get(original["_id"]) + Sequel::SQL::Blob.new(file.read) + end when /^\$exists (.+)/ # We need to look in the cloned original object, not in the version that # has had some fields deleted. @@ -218,7 +229,7 @@ def transform(ns, obj, schema=nil) type = col[:type] if source.start_with?("$") - v = fetch_special_source(obj, source, original) + v = fetch_special_source(ns, obj, source, original) else v = fetch_and_delete_dotted(obj, source) case v @@ -318,7 +329,8 @@ def quote_copy(val) when DateTime, Time val.strftime("%FT%T.%6N %z") when Sequel::SQL::Blob - "\\\\x" + [val].pack("h*") + # "\\\\x" + [val].pack("h*") # Don't know how this affects other use cases... + PG::Connection.escape_bytea(val) else val.to_s.gsub(/([\\\t\n\r])/, '\\\\\\1') end diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index d630e96..92b633c 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -115,6 +115,7 @@ def initial_import log.info("Importing for Mongo DB #{dbname}...") db = @mongo.db(dbname) + @schema.set_gridfs(db) collections = db.collections.select { |c| spec.key?(c.name) } collections.each do |collection|