diff --git a/go.mod b/go.mod index ec2fce562..9abb9dd77 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/spf13/cobra v1.5.0 github.com/spf13/viper v1.17.0 github.com/stretchr/testify v1.9.0 - github.com/undefinedlabs/go-mpatch v1.0.6 + github.com/undefinedlabs/go-mpatch v1.0.7 go.mongodb.org/mongo-driver v1.11.7 go.uber.org/zap v1.23.0 golang.org/x/crypto v0.16.0 @@ -30,6 +30,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +require github.com/stretchr/objx v0.5.2 // indirect + require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect diff --git a/go.sum b/go.sum index 10b9cb034..77e3fa764 100644 --- a/go.sum +++ b/go.sum @@ -318,6 +318,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -336,6 +338,8 @@ github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/undefinedlabs/go-mpatch v1.0.6 h1:h8q5ORH/GaOE1Se1DMhrOyljXZEhRcROO7agMqWXCOY= github.com/undefinedlabs/go-mpatch v1.0.6/go.mod h1:TyJZDQ/5AgyN7FSLiBJ8RO9u2c6wbtRvK827b6AVqY4= +github.com/undefinedlabs/go-mpatch v1.0.7 h1:943FMskd9oqfbZV0qRVKOUsXQhTLXL0bQTVbQSpzmBs= +github.com/undefinedlabs/go-mpatch v1.0.7/go.mod h1:TyJZDQ/5AgyN7FSLiBJ8RO9u2c6wbtRvK827b6AVqY4= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E= diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go new file mode 100644 index 000000000..42a85b642 --- /dev/null +++ b/server/packs/packs_test.go @@ -0,0 +1,314 @@ +/* + * Copyright 2024 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package packs_test + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "log" + "net/http" + "os" + "testing" + + "connectrpc.com/connect" + "github.com/stretchr/testify/assert" + + "github.com/yorkie-team/yorkie/api/converter" + "github.com/yorkie-team/yorkie/api/types" + api "github.com/yorkie-team/yorkie/api/yorkie/v1" + "github.com/yorkie-team/yorkie/api/yorkie/v1/v1connect" + "github.com/yorkie-team/yorkie/client" + "github.com/yorkie-team/yorkie/pkg/document" + "github.com/yorkie-team/yorkie/pkg/document/time" + "github.com/yorkie-team/yorkie/server/backend" + "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/backend/database/mongo" + "github.com/yorkie-team/yorkie/server/backend/housekeeping" + "github.com/yorkie-team/yorkie/server/clients" + "github.com/yorkie-team/yorkie/server/documents" + "github.com/yorkie-team/yorkie/server/packs" + "github.com/yorkie-team/yorkie/server/profiling/prometheus" + "github.com/yorkie-team/yorkie/server/rpc" + "github.com/yorkie-team/yorkie/test/helper" +) + +var ( + // ErrUpdateClientInfoFailed occurs when updating ClientInfo failed + // for testing purposes. + ErrUpdateClientInfoFailed = errors.New("updating clientinfo failed") +) + +var ( + testRPCServer *rpc.Server + testRPCAddr = fmt.Sprintf("localhost:%d", helper.RPCPort) + testClient v1connect.YorkieServiceClient + testBackend *backend.Backend + testMockDB *MockDB +) + +// MockDB represents a mock database for testing purposes +type MockDB struct { + database.Database + mockUpdateClientInfoAfterPushPull func(context.Context, *database.ClientInfo, *database.DocInfo) error +} + +// NewMockDB returns a mock database with a real database +func NewMockDB(database database.Database) *MockDB { + return &MockDB{ + Database: database, + } +} + +func (m *MockDB) UpdateClientInfoAfterPushPull( + ctx context.Context, + clientInfo *database.ClientInfo, + docInfo *database.DocInfo, +) error { + if m.mockUpdateClientInfoAfterPushPull != nil { + return m.mockUpdateClientInfoAfterPushPull(ctx, clientInfo, docInfo) + } + return m.Database.UpdateClientInfoAfterPushPull(ctx, clientInfo, docInfo) +} + +func TestMain(m *testing.M) { + met, err := prometheus.NewMetrics() + if err != nil { + log.Fatal(err) + } + + testBackend, err = backend.New( + &backend.Config{ + AdminUser: helper.AdminUser, + AdminPassword: helper.AdminPassword, + UseDefaultProject: helper.UseDefaultProject, + ClientDeactivateThreshold: helper.ClientDeactivateThreshold, + SnapshotThreshold: helper.SnapshotThreshold, + AuthWebhookCacheSize: helper.AuthWebhookSize, + ProjectInfoCacheSize: helper.ProjectInfoCacheSize, + ProjectInfoCacheTTL: helper.ProjectInfoCacheTTL.String(), + AdminTokenDuration: helper.AdminTokenDuration, + }, &mongo.Config{ + ConnectionURI: helper.MongoConnectionURI, + YorkieDatabase: helper.TestDBName(), + ConnectionTimeout: helper.MongoConnectionTimeout, + PingTimeout: helper.MongoPingTimeout, + }, &housekeeping.Config{ + Interval: helper.HousekeepingInterval.String(), + CandidatesLimitPerProject: helper.HousekeepingCandidatesLimitPerProject, + ProjectFetchSize: helper.HousekeepingProjectFetchSize, + }, met, + ) + if err != nil { + log.Fatal(err) + } + testMockDB = NewMockDB(testBackend.DB) + testBackend.DB = testMockDB + + project, err := testBackend.DB.FindProjectInfoByID( + context.Background(), + database.DefaultProjectID, + ) + if err != nil { + log.Fatal(err) + } + + testRPCServer, err = rpc.NewServer( + &rpc.Config{ + Port: helper.RPCPort, + }, testBackend, + ) + if err != nil { + log.Fatal(err) + } + + if err = testRPCServer.Start(); err != nil { + log.Fatalf("failed rpc listen: %s\n", err) + } + if err = helper.WaitForServerToStart(testRPCAddr); err != nil { + log.Fatal(err) + } + + authInterceptor := client.NewAuthInterceptor(project.PublicKey, "") + + conn := http.DefaultClient + testClient = v1connect.NewYorkieServiceClient( + conn, + "http://"+testRPCAddr, + connect.WithInterceptors(authInterceptor), + ) + + code := m.Run() + + if err := testBackend.Shutdown(); err != nil { + log.Fatal(err) + } + testRPCServer.Shutdown(true) + os.Exit(code) +} + +func triggerErrUpdateClientInfo(on bool) { + if on { + testMockDB.mockUpdateClientInfoAfterPushPull = func( + context.Context, + *database.ClientInfo, + *database.DocInfo, + ) error { + return ErrUpdateClientInfoFailed + } + } else { + testMockDB.mockUpdateClientInfoAfterPushPull = nil + } +} + +func TestPacks(t *testing.T) { + t.Run("cannot detect change duplication due to clientInfo update failure", func(t *testing.T) { + t.Skip("remove this after resolving pushpull consistency problem") + ctx := context.Background() + + projectInfo, err := testBackend.DB.FindProjectInfoByID( + ctx, + database.DefaultProjectID, + ) + assert.NoError(t, err) + project := projectInfo.ToProject() + + triggerErrUpdateClientInfo(false) + + activateResp, err := testClient.ActivateClient( + context.Background(), + connect.NewRequest(&api.ActivateClientRequest{ClientKey: helper.TestDocKey(t).String()})) + assert.NoError(t, err) + + clientID, _ := hex.DecodeString(activateResp.Msg.ClientId) + resPack, err := testClient.AttachDocument( + context.Background(), + connect.NewRequest(&api.AttachDocumentRequest{ + ClientId: activateResp.Msg.ClientId, + ChangePack: &api.ChangePack{ + DocumentKey: helper.TestDocKey(t).String(), + Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 1}, + Changes: []*api.Change{ + { + Id: &api.ChangeID{ + ClientSeq: 1, + Lamport: 1, + ActorId: clientID, + }, + }, + }, + }, + }, + )) + assert.NoError(t, err) + + actorID, err := time.ActorIDFromBytes(clientID) + assert.NoError(t, err) + + docID := types.ID(resPack.Msg.DocumentId) + docRefKey := types.DocRefKey{ + ProjectID: project.ID, + DocID: docID, + } + + // 0. Check docInfo.ServerSeq and clientInfo.Checkpoint + docInfo, err := documents.FindDocInfoByRefKey(ctx, testBackend, docRefKey) + assert.NoError(t, err) + assert.Equal(t, int64(1), docInfo.ServerSeq) + + clientInfo, err := clients.FindActiveClientInfo(ctx, testBackend.DB, types.ClientRefKey{ + ProjectID: project.ID, + ClientID: types.IDFromActorID(actorID), + }) + assert.NoError(t, err) + assert.Equal(t, int64(1), clientInfo.Checkpoint(docID).ServerSeq) + assert.Equal(t, uint32(1), clientInfo.Checkpoint(docID).ClientSeq) + + // 1. Create a ChangePack with a single Change + pack, err := converter.FromChangePack(&api.ChangePack{ + DocumentKey: helper.TestDocKey(t).String(), + Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 2}, + Changes: []*api.Change{ + { + Id: &api.ChangeID{ + ClientSeq: 2, + Lamport: 2, + ActorId: clientID, + }, + }, + }, + }) + assert.NoError(t, err) + + // 2-1. An arbitrary failure occurs while updating clientInfo + triggerErrUpdateClientInfo(true) + + _, err = packs.PushPull(ctx, testBackend, project, clientInfo, docInfo, pack, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + assert.ErrorIs(t, err, ErrUpdateClientInfoFailed) + + triggerErrUpdateClientInfo(false) + + // 2-2. pushed change is stored in the database + changes, err := packs.FindChanges(ctx, testBackend, docInfo, 2, 2) + assert.NoError(t, err) + assert.Len(t, changes, 1) + + // 2-3. docInfo.ServerSeq increases from 1 to 2 + docInfo, err = documents.FindDocInfoByRefKey(ctx, testBackend, docRefKey) + assert.NoError(t, err) + assert.Equal(t, int64(2), docInfo.ServerSeq) + + // 2-4. clientInfo.Checkpoint has not been updated + clientInfo, err = clients.FindActiveClientInfo(ctx, testBackend.DB, types.ClientRefKey{ + ProjectID: project.ID, + ClientID: types.IDFromActorID(actorID), + }) + assert.NoError(t, err) + assert.Equal(t, int64(1), clientInfo.Checkpoint(docID).ServerSeq) + assert.Equal(t, uint32(1), clientInfo.Checkpoint(docID).ClientSeq) + + // 3-1. A duplicate request is sent + _, err = packs.PushPull(ctx, testBackend, project, clientInfo, docInfo, pack, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + assert.NoError(t, err) + + // 3-2. duplicated change is not stored in the database + changes, err = packs.FindChanges(ctx, testBackend, docInfo, 3, 3) + assert.NoError(t, err) + assert.Len(t, changes, 0) + + // 3-3. The server should detect the duplication and not update docInfo.ServerSeq + docInfo, err = documents.FindDocInfoByRefKey(ctx, testBackend, docRefKey) + assert.NoError(t, err) + assert.Equal(t, int64(2), docInfo.ServerSeq) + + // 3-4. clientInfo.Checkpoint has been updated properly + clientInfo, err = clients.FindActiveClientInfo(ctx, testBackend.DB, types.ClientRefKey{ + ProjectID: project.ID, + ClientID: types.IDFromActorID(actorID), + }) + assert.NoError(t, err) + assert.Equal(t, int64(2), clientInfo.Checkpoint(docID).ServerSeq) + assert.Equal(t, uint32(2), clientInfo.Checkpoint(docID).ClientSeq) + }) +}