// Copyright 2015 Canonical Ltd. // Licensed under the AGPLv3, see LICENCE file for details. package mongo import ( "reflect" "time" "github.com/juju/errors" "github.com/juju/mgo/v2" "github.com/juju/mgo/v2/bson" "gopkg.in/tomb.v2" ) // OplogDoc represents a document in the oplog.rs collection. // See: http://www.kchodorow.com/blog/2010/10/12/replication-internals/ // // The Object and UpdateObject fields are returned raw to allow // unmarshalling into arbitrary types. Use the UnmarshalObject and // UnmarshalUpdate methods to unmarshall these fields. type OplogDoc struct { Timestamp bson.MongoTimestamp `bson:"ts"` OperationId int64 `bson:"h"` MongoVersion int `bson:"v"` Operation string `bson:"op"` // "i" - insert, "u" - update, "d" - delete Namespace string `bson:"ns"` Object *bson.Raw `bson:"o"` UpdateObject *bson.Raw `bson:"o2"` } // UnmarshalObject unmarshals the Object field into out. The out // argument should be a pointer or a suitable map. func (d *OplogDoc) UnmarshalObject(out interface{}) error { return d.unmarshal(d.Object, out) } // UnmarshalUpdate unmarshals the UpdateObject field into out. The out // argument should be a pointer or a suitable map. func (d *OplogDoc) UnmarshalUpdate(out interface{}) error { return d.unmarshal(d.UpdateObject, out) } func (d *OplogDoc) unmarshal(raw *bson.Raw, out interface{}) error { if raw == nil { // If the field is not set, set out to the zero value for its type. v := reflect.ValueOf(out) switch v.Kind() { case reflect.Ptr: v = v.Elem() v.Set(reflect.Zero(v.Type())) case reflect.Map: // Empty the map. for _, k := range v.MapKeys() { v.SetMapIndex(k, reflect.Value{}) } default: return errors.New("output must be a pointer or map") } return nil } return raw.Unmarshal(out) } // NewMongoTimestamp returns a bson.MongoTimestamp repesentation for // the time.Time given. Note that these timestamps are not the same // the usual MongoDB time fields. These are an internal format used // only in a few places such as the replication oplog. // // See: http://docs.mongodb.org/manual/reference/bson-types/#timestamps func NewMongoTimestamp(t time.Time) bson.MongoTimestamp { unixTime := t.Unix() if unixTime < 0 { unixTime = 0 } return bson.MongoTimestamp(unixTime << 32) } // GetOplog returns the the oplog collection in the local database. func GetOplog(session *mgo.Session) *mgo.Collection { return session.DB("local").C("oplog.rs") } func isRealOplog(c *mgo.Collection) bool { return c.Database.Name == "local" && c.Name == "oplog.rs" } // OplogSession represents a connection to the oplog store, used // to create an iterator to get oplog documents (and recreate it if it // gets killed or times out). type OplogSession interface { NewIter(bson.MongoTimestamp, []int64) Iterator Close() } type oplogSession struct { session *mgo.Session collection *mgo.Collection query bson.D } // NewOplogSession defines a new OplogSession. // // Arguments: // - "collection" is the collection to use for the oplog. Typically this // would be the result of GetOpLog. // - "query" can be used to limit the returned oplog entries. A // typical filter would limit based on ns (".") // and o (object). // // The returned session should be `Close`d when it's no longer needed. func NewOplogSession(collection *mgo.Collection, query bson.D) *oplogSession { // Use a fresh session for the tailer. session := collection.Database.Session.Copy() return &oplogSession{ session: session, collection: collection.With(session), query: query, } } const oplogTailTimeout = time.Second func (s *oplogSession) NewIter(fromTimestamp bson.MongoTimestamp, excludeIds []int64) Iterator { // When recreating the iterator (required when the cursor // is invalidated) avoid reporting oplog entries that have // already been reported. sel := append(s.query, bson.DocElem{"ts", bson.D{{"$gte", fromTimestamp}}}, bson.DocElem{"h", bson.D{{"$nin", excludeIds}}}, ) query := s.collection.Find(sel) if isRealOplog(s.collection) { // Apply an optimisation that is only supported with // the real oplog. query = query.LogReplay() } // Time the tail call out every second so that requests to // stop can be honoured. return query.Tail(oplogTailTimeout) } func (s *oplogSession) Close() { s.session.Close() } // NewOplogTailer returns a new OplogTailer. // // Arguments: // - "session" determines the collection and filtering on records that // should be returned. // - "initialTs" sets the operation timestamp to start returning // results from. This can be used to avoid an expensive initial search // through the oplog when the tailer first starts. // // Remember to call Stop on the returned OplogTailer when it is no // longer needed. func NewOplogTailer( session OplogSession, initialTs time.Time, ) *OplogTailer { t := &OplogTailer{ session: session, initialTs: NewMongoTimestamp(initialTs), outCh: make(chan *OplogDoc), } t.tomb.Go(func() error { defer func() { close(t.outCh) session.Close() }() return t.loop() }) return t } // OplogTailer tails MongoDB's replication oplog. type OplogTailer struct { tomb tomb.Tomb session OplogSession initialTs bson.MongoTimestamp outCh chan *OplogDoc } // Out returns a channel that reports the oplog entries matching the // query passed to NewOplogTailer as they appear. func (t *OplogTailer) Out() <-chan *OplogDoc { return t.outCh } // Dying returns a channel that will be closed with the OplogTailer is // shutting down. func (t *OplogTailer) Dying() <-chan struct{} { return t.tomb.Dying() } // Stop shuts down the OplogTailer. It will block until shutdown is // complete. func (t *OplogTailer) Stop() error { t.tomb.Kill(nil) return t.tomb.Wait() } // Err returns the error that caused the OplogTailer to stop. If it // finished normally or hasn't stopped then nil will be returned. func (t *OplogTailer) Err() error { return t.tomb.Err() } func (t *OplogTailer) loop() error { // lastTimestamp tracks the most recent oplog timestamp reported. lastTimestamp := t.initialTs // idsForLastTimestamp records the unique operation ids that have // been reported for the most recently reported oplog // timestamp. This is used to avoid re-reporting oplog entries // when the iterator is restarted. These timestamps are unique for // a given mongod but when there's multiple replicaset members // it's possible for there to be multiple oplog entries for a // given timestamp. // // See: http://docs.mongodb.org/v2.4/reference/bson-types/#timestamps var idsForLastTimestamp []int64 newIter := func() Iterator { return t.session.NewIter(lastTimestamp, idsForLastTimestamp) } iter := newIter() defer func() { iter.Close() }() // iter may be replaced, hence closure for { if t.dying() { return tomb.ErrDying } var doc OplogDoc if iter.Next(&doc) { select { case <-t.tomb.Dying(): return tomb.ErrDying case t.outCh <- &doc: } if doc.Timestamp > lastTimestamp { lastTimestamp = doc.Timestamp idsForLastTimestamp = nil } idsForLastTimestamp = append(idsForLastTimestamp, doc.OperationId) } else { if iter.Timeout() { continue } if err := iter.Close(); err != nil && err != mgo.ErrCursor { return err } // Either there's no error or the error is an expired // cursor; Recreate the iterator. iter = newIter() } } } func (t *OplogTailer) dying() bool { select { case <-t.tomb.Dying(): return true default: return false } }