MongoDBã®insert/updateãã¾ã¨ãã¦ãbulk insert/update ã«æµãã¦ã¼ãã£ãªãã£ãæ¸ãã
ãããå¦çãªã©ã§MongoDBã«å¤§éã®insert/updateãè¡ãã¨ããMongoidã使ã£ã¦1ã¤ã㤠#save
ãã¦ãã¨é
ãã
ã¨ãããã¨ã§ãè¤æ°ã® #save
ãã¾ã¨ã㦠bulk insert/update ã«æµãã¦ã¼ãã£ãªãã£ãæ¸ãã¦ã¿ã¾ããã
使ãæ¹
- ã¢ãã«ã¯ã©ã¹ã§ã
Mongoid::Document
ã¨Utils::BulkWriteOperationSupport
ãinclude
ããã Utils::BulkWriteOperationSupport.begin_transaction
ãå¼ã³åºãã¦ãããã¢ãã«ã®#save
ãå¼ã³åºãã- ãã®æç¹ã§ã¯MongoDBã¸ã®insert/updateã¯è¡ãããããããã¡ã«èç©ããã¾ãã
Utils::BulkWriteOperationSupport.end_transaction
ãå¼ã³åºãã¨ããããã¡ã®ãã¼ã¿ã#bulk_write
ã§ã¾ã¨ãã¦æ°¸ç¶åãããã
class TestModel # Mongoid::Document 㨠Utils::BulkWriteOperationSupport ãincludeãã include Mongoid::Document include Utils::BulkWriteOperationSupport store_in collection: 'test_model' field :name, type: String end #ç¥ puts TestModel.count # => 0 Utils::BulkWriteOperationSupport.begin_transaction # #begin_transaction ãå¼ã³åºãããã¨ãã¢ãã«ãä½æ/å¤æ´ãã¦ã #save ãå¼ã³åºãã a = TestModel.new a.name = 'a' b = TestModel.new b.name = 'b' a.save b.save # #end_transaction ãå®è¡ããã¾ã§ãæ°¸ç¶åãããªã puts TestModel.count # => 0 # #end_transaction ãå¼ã³åºãã¨ããããã¡ã®ãã¼ã¿ã #bulk_write ã§ã¾ã¨ãã¦æ°¸ç¶åãããã Utils::BulkWriteOperationSupport.end_transaction puts TestModel.count # => 2
ã¦ã¼ãã£ãªãã£ã®ã³ã¼ã
Document#save
ãæ¸ãæãã¦ã#begin_transaction
ï½#end_transaction
ã®éã§ããã°ãã¹ã¬ãããã¼ã«ã«ã«æ°¸ç¶å対象ã¨ãã¦ãã¼ã¯ã#end_transaction
ãå¼ã³åºãããã¿ã¤ãã³ã¯ã§ãã¾ã¨ãã¦#bulk_write
ã§æ°¸ç¶åãã¾ãã- åç §æ´åæ§ã®ãã§ãã¯ã¨ããããããææããªã®ã§å¿ è¦ã«å¿ãã¦æ¹é ãã¦ãã ããã
module BulkWriteOperationSupport KEY = BulkWriteOperationSupport.name def save if BulkWriteOperationSupport.in_transaction? BulkWriteOperationSupport.transaction << self else super end end def self.in_transaction? !transaction.nil? end def self.begin_transaction Thread.current[KEY] = Transaction.new end def self.end_transaction return unless in_transaction? transaction.execute Thread.current[KEY] = nil end def self.transaction Thread.current[KEY] end def create_insert_operation { :insert_one => as_document } end def create_update_operation { :update_one => { :filter => { :_id => id }, :update => {'$set' => collect_changed_values } } } end private def collect_changed_values changes.each_with_object({}) do |change, r| r[change[0].to_sym] = change[1][1] end end class Transaction def initialize @targets = {} end def <<(model) targets_of( model.class )[model.object_id] = model end def execute until @targets.empty? model_class = @targets.keys.first execute_bulk_write_operations(model_class) end end def size @targets.values.reduce(0) {|a, e| a + e.length } end private def targets_of( model_class ) @targets[model_class] ||= {} end def execute_bulk_write_operations(model_class) return unless @targets.include?(model_class) execute_parent_object_bulk_write_operations_if_exists(model_class) client = model_class.mongo_client[model_class.collection_name] operations = create_operations(@targets[model_class].values) client.bulk_write(operations) unless operations.empty? @targets.delete model_class end def execute_parent_object_bulk_write_operations_if_exists(model_class) parents = model_class.reflect_on_all_associations(:belongs_to) parents.each do |m| klass = m.klass execute_bulk_write_operations(klass) end end def create_operations(targets) targets.each_with_object([]) do |model, array| if model.new_record? model.new_record = false array << model.create_insert_operation else array << model.create_update_operation if model.changed? end end end end end