Skip to content

Commit

Permalink
add readConcern and writeConcern in client to solve the orphan docume…
Browse files Browse the repository at this point in the history
…nt problem reading from MongoS. #392
  • Loading branch information
vinllen committed Jul 24, 2020
1 parent c2d2452 commit 1e4feca
Show file tree
Hide file tree
Showing 20 changed files with 232 additions and 178 deletions.
2 changes: 2 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* IMPROVE: add oplog_max_size and oplog_avg_size metric in restful: "repl/".
* BUGFIX: fix bug of checkpoint duplicate updating when HA switch.
* IMPROVE: increase syncer thread when fetching method is change stream.
* IMPROVE: add readConcern and writeConcern in client to solve the
orphan document problem reading from MongoS. #392

2020-06-30 Alibaba Cloud.
* version: 2.4.7
Expand Down
258 changes: 132 additions & 126 deletions src/mongoshake/collector/batcher_test.go

Large diffs are not rendered by default.

8 changes: 2 additions & 6 deletions src/mongoshake/collector/ckpt/ckpt_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bytes"
"fmt"

"mongoshake/collector/configure"
"mongoshake/common"

LOG "github.com/vinllen/log4go"
Expand Down Expand Up @@ -67,7 +66,8 @@ type MongoCheckpoint struct {
func (ckpt *MongoCheckpoint) ensureNetwork() bool {
// make connection if we haven't already established one
if ckpt.Conn == nil {
if conn, err := utils.NewMongoConn(ckpt.URL, utils.VarMongoConnectModePrimary, true); err == nil {
if conn, err := utils.NewMongoConn(ckpt.URL, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority); err == nil {
ckpt.Conn = conn
ckpt.QueryHandle = conn.Session.DB(ckpt.DB).C(ckpt.Table)
} else {
Expand All @@ -76,10 +76,6 @@ func (ckpt *MongoCheckpoint) ensureNetwork() bool {
}
}

// set WriteMajority while checkpoint is writing to ConfigServer
if conf.Options.IsShardCluster() {
ckpt.Conn.Session.EnsureSafe(&mgo.Safe{WMode: utils.MajorityWriteConcern})
}
return true
}

Expand Down
21 changes: 16 additions & 5 deletions src/mongoshake/collector/ckpt/ckpt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func TestMongoCheckpoint(t *testing.T) {
conf.Options.CheckpointStorage = utils.VarCheckpointStorageDatabase

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand All @@ -53,9 +54,11 @@ func TestMongoCheckpoint(t *testing.T) {
conf.Options.CheckpointStorageUrl = testUrl
conf.Options.CheckpointStorageCollection = "ut_ckpt_table"
conf.Options.CheckpointStorage = utils.VarCheckpointStorageDatabase
conf.Options.CheckpointStorageDb = utils.VarCheckpointStorageDbReplicaDefault

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand Down Expand Up @@ -98,9 +101,11 @@ func TestMongoCheckpoint(t *testing.T) {
conf.Options.CheckpointStorageUrl = testUrl
conf.Options.CheckpointStorageCollection = "ut_ckpt_table"
conf.Options.CheckpointStorage = utils.VarCheckpointStorageDatabase
conf.Options.CheckpointStorageDb = utils.VarCheckpointStorageDbReplicaDefault

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand Down Expand Up @@ -134,9 +139,12 @@ func TestMongoCheckpoint(t *testing.T) {
conf.Options.CheckpointStorageUrl = testUrl
conf.Options.CheckpointStorageCollection = "ut_ckpt_table"
conf.Options.CheckpointStorage = utils.VarCheckpointStorageDatabase
conf.Options.CheckpointStorageDb = utils.VarCheckpointStorageDbReplicaDefault
utils.FcvCheckpoint.CurrentVersion = 1

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand Down Expand Up @@ -189,9 +197,12 @@ func TestMongoCheckpoint(t *testing.T) {
conf.Options.CheckpointStorageUrl = testUrl
conf.Options.CheckpointStorageCollection = "ut_ckpt_table"
conf.Options.CheckpointStorage = utils.VarCheckpointStorageDatabase
conf.Options.CheckpointStorageDb = utils.VarCheckpointStorageDbReplicaDefault
utils.FcvCheckpoint.CurrentVersion = 1

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand Down
6 changes: 4 additions & 2 deletions src/mongoshake/collector/coordinator/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func fetchIndexes(sourceList []*utils.MongoSource) (map[utils.NS][]mgo.Index, er
}

// 2. build connection
conn, err := utils.NewMongoConn(src.URL, utils.VarMongoConnectModeSecondaryPreferred, true)
conn, err := utils.NewMongoConn(src.URL, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault)
if err != nil {
return nil, fmt.Errorf("source[%v %v] build connection failed: %v", src.ReplicaName, src.URL, err)
}
Expand Down Expand Up @@ -141,7 +142,8 @@ func (coordinator *ReplicationCoordinator) startDocumentReplication() error {
// create target client
toUrl := conf.Options.TunnelAddress[0]
var toConn *utils.MongoConn
if toConn, err = utils.NewMongoConn(toUrl, utils.VarMongoConnectModePrimary, true); err != nil {
if toConn, err = utils.NewMongoConn(toUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault); err != nil {
return err
}
defer toConn.Close()
Expand Down
6 changes: 4 additions & 2 deletions src/mongoshake/collector/coordinator/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,17 @@ func (coordinator *ReplicationCoordinator) sanitizeMongoDB() error {

// try to connect CheckpointStorage
checkpointStorageUrl := conf.Options.CheckpointStorageUrl
if conn, err = utils.NewMongoConn(checkpointStorageUrl, utils.VarMongoConnectModePrimary, true); conn == nil || !conn.IsGood() || err != nil {
if conn, err = utils.NewMongoConn(checkpointStorageUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault); conn == nil || !conn.IsGood() || err != nil {
LOG.Critical("Connect checkpointStorageUrl[%v] error[%v]. Please add primary node into 'mongo_urls' " +
"if 'context.storage.url' is empty", checkpointStorageUrl, err)
return err
}
conn.Close()

for i, src := range coordinator.MongoD {
if conn, err = utils.NewMongoConn(src.URL, conf.Options.MongoConnectMode, true); conn == nil || !conn.IsGood() || err != nil {
if conn, err = utils.NewMongoConn(src.URL, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault); conn == nil || !conn.IsGood() || err != nil {
LOG.Critical("Connect mongo server error. %v, url : %s. See https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-solve-the-oplog-tailer-initialize-failed-no-reachable-servers-error", err, src.URL)
return err
}
Expand Down
3 changes: 2 additions & 1 deletion src/mongoshake/collector/docsyncer/doc_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func NewCollectionExecutor(id int, mongoUrl string, ns utils.NS, syncer *DBSynce

func (colExecutor *CollectionExecutor) Start() error {
var err error
if colExecutor.conn, err = utils.NewMongoConn(colExecutor.mongoUrl, utils.VarMongoConnectModePrimary, true); err != nil {
if colExecutor.conn, err = utils.NewMongoConn(colExecutor.mongoUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault); err != nil {
return err
}
if conf.Options.FullSyncExecutorMajorityEnable {
Expand Down
10 changes: 6 additions & 4 deletions src/mongoshake/collector/docsyncer/doc_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func GetAllNamespace(sources []*utils.MongoSource) (map[utils.NS]struct{}, map[s
func GetDbNamespace(url string) ([]utils.NS, map[string][]string, error) {
var err error
var conn *utils.MongoConn
if conn, err = utils.NewMongoConn(url, utils.VarMongoConnectModeSecondaryPreferred, true); conn == nil || err != nil {
if conn, err = utils.NewMongoConn(url, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault); conn == nil || err != nil {
return nil, nil, err
}
defer conn.Close()
Expand Down Expand Up @@ -119,13 +120,13 @@ func NewDocumentSplitter(src string, ns utils.NS) *DocumentSplitter {

// create connection
var err error
ds.conn, err = utils.NewMongoConn(ds.src, conf.Options.MongoConnectMode, true)
ds.conn, err = utils.NewMongoConn(ds.src, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault)
if err != nil {
LOG.Error("splitter[%s] connection mongo[%v] failed[%v]", ds, ds.src, err)
return nil
}


// get total count
count, err := ds.conn.Session.DB(ds.ns.Database).C(ds.ns.Collection).Count()
if err != nil {
Expand Down Expand Up @@ -290,7 +291,8 @@ func (reader *DocumentReader) ensureNetwork() (err error) {
reader.conn.Close()
}
// reconnect
if reader.conn, err = utils.NewMongoConn(reader.src, conf.Options.MongoConnectMode, true); reader.conn == nil || err != nil {
if reader.conn, err = utils.NewMongoConn(reader.src, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault); reader.conn == nil || err != nil {
return err
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/mongoshake/collector/docsyncer/doc_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ func TestGtDbNamespace(t *testing.T) {
fmt.Printf("TestGtDbNamespace case %d.\n", nr)
nr++

conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault)
assert.Equal(t, nil, err, "should be equal")

err = conn.Session.DB("db1").DropDatabase()
Expand Down
6 changes: 4 additions & 2 deletions src/mongoshake/collector/docsyncer/doc_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func StartNamespaceSpecSyncForSharding(csUrl string, toConn *utils.MongoConn,

var fromConn *utils.MongoConn
var err error
if fromConn, err = utils.NewMongoConn(csUrl, utils.VarMongoConnectModePrimary, true); err != nil {
if fromConn, err = utils.NewMongoConn(csUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernDefault); err != nil {
return err
}
defer fromConn.Close()
Expand Down Expand Up @@ -187,7 +188,8 @@ func StartIndexSync(indexMap map[utils.NS][]mgo.Index, toUrl string,

var conn *utils.MongoConn
var err error
if conn, err = utils.NewMongoConn(toUrl, utils.VarMongoConnectModePrimary, false); err != nil {
if conn, err = utils.NewMongoConn(toUrl, utils.VarMongoConnectModePrimary, false,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernMajority); err != nil {
return err
}
defer conn.Close()
Expand Down
9 changes: 6 additions & 3 deletions src/mongoshake/collector/docsyncer/doc_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func fetchAllDocument(conn *utils.MongoConn) ([]bson.D, error) {
func TestDbSync(t *testing.T) {
// test doSync

conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, false)
conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, false,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
assert.Equal(t, nil, err, "should be equal")

// init DocExecutor, ignore DBSyncer here
Expand Down Expand Up @@ -364,7 +365,8 @@ func TestStartDropDestCollection(t *testing.T) {
fmt.Printf("TestStartDropDestCollection case %d.\n", nr)
nr++

conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop old db
Expand Down Expand Up @@ -407,7 +409,8 @@ func TestStartDropDestCollection(t *testing.T) {
fmt.Printf("TestStartDropDestCollection case %d.\n", nr)
nr++

conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, true)
conn, err := utils.NewMongoConn(testMongoAddress, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernMajority)
assert.Equal(t, nil, err, "should be equal")

// drop old db
Expand Down
12 changes: 8 additions & 4 deletions src/mongoshake/collector/main/sanitize.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,17 @@ func checkDefaultValue() error {
func checkConnection() error {
// check mongo_urls
for _, mongo := range conf.Options.MongoUrls {
_, err := utils.NewMongoConn(mongo, conf.Options.MongoConnectMode, true)
_, err := utils.NewMongoConn(mongo, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
if err != nil {
return fmt.Errorf("connect source mongodb[%v] failed[%v]", mongo, err)
}
}

// check mongo_cs_url
if conf.Options.MongoCsUrl != "" {
_, err := utils.NewMongoConn(conf.Options.MongoCsUrl, utils.VarMongoConnectModeSecondaryPreferred, true)
_, err := utils.NewMongoConn(conf.Options.MongoCsUrl, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
if err != nil {
return fmt.Errorf("connect config-server[%v] failed[%v]", conf.Options.MongoCsUrl, err)
}
Expand All @@ -252,7 +254,8 @@ func checkConnection() error {
// check tunnel address
if conf.Options.Tunnel == utils.VarTunnelDirect {
for _, mongo := range conf.Options.TunnelAddress {
_, err := utils.NewMongoConn(mongo, conf.Options.MongoConnectMode, true)
_, err := utils.NewMongoConn(mongo, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
if err != nil {
return fmt.Errorf("connect target tunnel mongodb[%v] failed[%v]", mongo, err)
}
Expand Down Expand Up @@ -347,7 +350,8 @@ func checkConflict() error {
source = conf.Options.MongoUrls[0]
}

conn, err := utils.NewMongoConn(source, utils.VarMongoConnectModeSecondaryPreferred, true)
conn, err := utils.NewMongoConn(source, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
if err != nil {
return fmt.Errorf("connect source[%v] failed[%v]", source, err)
}
Expand Down
3 changes: 2 additions & 1 deletion src/mongoshake/collector/reader/oplog_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ func (or *OplogReader) EnsureNetwork() (err error) {
or.conn.Close()
}
// reconnect
if or.conn, err = utils.NewMongoConn(or.src, conf.Options.MongoConnectMode, true); or.conn == nil || err != nil {
if or.conn, err = utils.NewMongoConn(or.src, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault); or.conn == nil || err != nil {
err = fmt.Errorf("oplog_reader reconnect mongo instance [%s] error. %s", or.src, err.Error())
return err
}
Expand Down
9 changes: 6 additions & 3 deletions src/mongoshake/common/db_opertion.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func GetOldestTimestampBySession(session *mgo.Session) (bson.MongoTimestamp, err
func GetNewestTimestampByUrl(url string, fromMongoS bool) (bson.MongoTimestamp, error) {
var conn *MongoConn
var err error
if conn, err = NewMongoConn(url, VarMongoConnectModeSecondaryPreferred, true); conn == nil || err != nil {
if conn, err = NewMongoConn(url, VarMongoConnectModeSecondaryPreferred, true,
ReadWriteConcernDefault, ReadWriteConcernDefault); conn == nil || err != nil {
return 0, err
}
defer conn.Close()
Expand All @@ -141,7 +142,8 @@ func GetOldestTimestampByUrl(url string, fromMongoS bool) (bson.MongoTimestamp,

var conn *MongoConn
var err error
if conn, err = NewMongoConn(url, VarMongoConnectModeSecondaryPreferred, true); conn == nil || err != nil {
if conn, err = NewMongoConn(url, VarMongoConnectModeSecondaryPreferred, true,
ReadWriteConcernDefault, ReadWriteConcernDefault); conn == nil || err != nil {
return 0, err
}
defer conn.Close()
Expand All @@ -152,7 +154,8 @@ func GetOldestTimestampByUrl(url string, fromMongoS bool) (bson.MongoTimestamp,
func IsFromMongos(url string) (bool, error) {
var conn *MongoConn
var err error
if conn, err = NewMongoConn(url, VarMongoConnectModeSecondaryPreferred, true); conn == nil || err != nil {
if conn, err = NewMongoConn(url, VarMongoConnectModeSecondaryPreferred, true,
ReadWriteConcernDefault, ReadWriteConcernDefault); conn == nil || err != nil {
return false, err
}
return conn.IsMongos(), nil
Expand Down
17 changes: 15 additions & 2 deletions src/mongoshake/common/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ import (
"github.com/vinllen/mgo/bson"
)

const OplogNS = "oplog.rs"
const (
OplogNS = "oplog.rs"
ReadWriteConcernDefault = ""
ReadWriteConcernLocal = "local"
ReadWriteConcernAvailable = "available" // for >= 3.6
ReadWriteConcernMajority = "majority"
ReadWriteConcernLinearizable = "linearizable"
)

type NS struct {
Database string
Expand All @@ -31,7 +38,7 @@ type MongoConn struct {
URL string
}

func NewMongoConn(url string, connectMode string, timeout bool) (*MongoConn, error) {
func NewMongoConn(url string, connectMode string, timeout bool, readConcern, writeConcern string) (*MongoConn, error) {
if connectMode == VarMongoConnectModeStandalone {
url += "?connect=direct"
}
Expand All @@ -49,6 +56,12 @@ func NewMongoConn(url string, connectMode string, timeout bool) (*MongoConn, err
} else {
session.SetSocketTimeout(0)
}
if readConcern != ReadWriteConcernDefault || writeConcern != ReadWriteConcernDefault {
session.EnsureSafe(&mgo.Safe{
RMode: readConcern,
WMode: writeConcern,
})
}

// already ping in the session
/*if err := session.Ping(); err != nil {
Expand Down
Loading

0 comments on commit 1e4feca

Please sign in to comment.