Skip to content

Commit

Permalink
add more log in reader to debug
Browse files Browse the repository at this point in the history
  • Loading branch information
vinllen committed Jul 24, 2020
1 parent e452ad6 commit c2d2452
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/mongoshake/collector/docsyncer/doc_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func NewDocumentSplitter(src string, ns utils.NS) *DocumentSplitter {
return nil
}


// get total count
count, err := ds.conn.Session.DB(ds.ns.Database).C(ds.ns.Collection).Count()
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions src/mongoshake/collector/reader/event_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/vinllen/mgo/bson"
LOG "github.com/vinllen/log4go"
"github.com/vinllen/go-diskqueue"
"fmt"
)

const (
Expand Down Expand Up @@ -57,6 +58,10 @@ func NewEventReader(src string, replset string) *EventReader {
}
}

func (er *EventReader) String() string {
return fmt.Sprintf("EventReader[src:%s replset:%s]", er.src, er.replset)
}

func (er *EventReader) Name() string {
return utils.VarIncrSyncMongoFetchMethodChangeStream
}
Expand Down Expand Up @@ -136,6 +141,8 @@ func (er *EventReader) EnsureNetwork() error {
return nil
}

LOG.Info("%s ensure network", er.String())

if er.client != nil {
er.client.Close() // close old client
}
Expand Down
14 changes: 11 additions & 3 deletions src/mongoshake/collector/reader/oplog_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type OplogReader struct {
// source mongo address url
src string
replset string

// mongo oplog reader
conn *utils.MongoConn
oplogsIterator *mgo.Iter
Expand Down Expand Up @@ -64,6 +65,10 @@ func NewOplogReader(src string, replset string) *OplogReader {
}
}

func (or *OplogReader) String() string {
return fmt.Sprintf("oplogReader[src:%s replset:%s]", or.src, or.replset)
}

func (or *OplogReader) Name() string {
return utils.VarIncrSyncMongoFetchMethodOplog
}
Expand Down Expand Up @@ -148,7 +153,7 @@ func (or *OplogReader) fetcher() {
LOG.Error("oplog collection capped may happen: %v", err)
or.oplogChan <- &retOplog{nil, CollectionCappedError}
} else {
or.oplogChan <- &retOplog{nil, fmt.Errorf("get next oplog failed. release oplogsIterator, %s", err.Error())}
or.oplogChan <- &retOplog{nil, fmt.Errorf("get next oplog failed, release oplogsIterator, %s", err.Error())}
}
// wait a moment
time.Sleep(1 * time.Second)
Expand All @@ -169,6 +174,8 @@ func (or *OplogReader) EnsureNetwork() (err error) {
if or.oplogsIterator != nil {
return nil
}
LOG.Info("%s ensure network", or.String())

if or.conn == nil || (or.conn != nil && !or.conn.IsGood()) {
if or.conn != nil {
or.conn.Close()
Expand All @@ -178,6 +185,9 @@ func (or *OplogReader) EnsureNetwork() (err error) {
err = fmt.Errorf("oplog_reader reconnect mongo instance [%s] error. %s", or.src, err.Error())
return err
}

or.conn.Session.SetBatch(BatchSize)
or.conn.Session.SetPrefetch(PrefetchPercent)
}

var queryTs bson.MongoTimestamp
Expand Down Expand Up @@ -210,8 +220,6 @@ func (or *OplogReader) EnsureNetwork() (err error) {
or.firstRead = false

// rebuild syncerGroup condition statement with current checkpoint timestamp
or.conn.Session.SetBatch(BatchSize)
or.conn.Session.SetPrefetch(PrefetchPercent)
or.oplogsIterator = or.conn.Session.DB(localDB).C(utils.OplogNS).
Find(or.query).LogReplay().Tail(time.Second * tailTimeout) // this timeout is useless
return
Expand Down

0 comments on commit c2d2452

Please sign in to comment.