From d44497894a3d2786b3acc6b77692c4cea9373c57 Mon Sep 17 00:00:00 2001 From: raararaara Date: Mon, 11 Nov 2024 14:59:33 +0900 Subject: [PATCH 1/5] Add migration for detaching documents from deactivated clients --- cmd/yorkie/migration.go | 2 + migrations/v0.5.6/document-detach.go | 129 +++++++++++++++++++++++++++ migrations/v0.5.6/main.go | 34 +++++++ 3 files changed, 165 insertions(+) create mode 100644 migrations/v0.5.6/document-detach.go create mode 100644 migrations/v0.5.6/main.go diff --git a/cmd/yorkie/migration.go b/cmd/yorkie/migration.go index a99ea0ca7..7ab31cdd7 100644 --- a/cmd/yorkie/migration.go +++ b/cmd/yorkie/migration.go @@ -30,6 +30,7 @@ import ( "github.com/yorkie-team/yorkie/cmd/yorkie/config" v053 "github.com/yorkie-team/yorkie/migrations/v0.5.3" + v056 "github.com/yorkie-team/yorkie/migrations/v0.5.6" yorkiemongo "github.com/yorkie-team/yorkie/server/backend/database/mongo" ) @@ -43,6 +44,7 @@ var ( // migrationMap is a map of migration functions for each version. var migrationMap = map[string]func(ctx context.Context, db *mongo.Client, dbName string, batchSize int) error{ "v0.5.3": v053.RunMigration, + "v0.5.6": v056.RunMigration, } // runMigration runs the migration for the given version. diff --git a/migrations/v0.5.6/document-detach.go b/migrations/v0.5.6/document-detach.go new file mode 100644 index 000000000..0f0771fed --- /dev/null +++ b/migrations/v0.5.6/document-detach.go @@ -0,0 +1,129 @@ +/* + * Copyright 2024 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v056 + +import ( + "context" + "fmt" + "github.com/yorkie-team/yorkie/server/backend/database" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +func progressMigrationBach( + _ context.Context, + _ *mongo.Collection, + _ []database.ClientInfo, +) error { + return nil +} + +// DetachDocumentsFromDeactivatedClients migrates the client collection +// to detach documents attached to a deactivated client. +func DetachDocumentsFromDeactivatedClients( + ctx context.Context, + db *mongo.Client, + databaseName string, + batchSize int, +) error { + collection := db.Database(databaseName).Collection("clients") + + filter := bson.M{ + "status": "deactivated", + "documents": bson.M{ + "$ne": nil, + "$exists": true, + }, + "$expr": bson.M{ + "$gt": bson.A{ + bson.M{ + "$size": bson.M{ + "$filter": bson.M{ + "input": bson.M{ + "$objectToArray": "$documents", + }, + "as": "doc", + "cond": bson.M{ + "$eq": bson.A{"$$doc.v.status", "attached"}, + }, + }, + }, + }, + 0, + }, + }, + } + + // 01. Count the number of target clients + totalCount, err := collection.CountDocuments(ctx, filter) + if err != nil { + return err + } + fmt.Printf("total clients: %d\n", totalCount) + + //res := collection.FindOne(ctx, filter) + //if res.Err() == mongo.ErrNoDocuments { + // return fmt.Errorf("no doc") + //} + //if res.Err() != nil { + // return fmt.Errorf("error occurs") + //} + // + //clientInfo := database.ClientInfo{} + //if err := res.Decode(&clientInfo); err != nil { + // return fmt.Errorf("decode error") + //} + // + //fmt.Printf("%d %s\n", batchSize, clientInfo.ID.String()) + + cursor, err := collection.Find(ctx, filter) + if err != nil { + return err + } + + var infos []database.ClientInfo + + batchCount := 1 + for cursor.Next(ctx) { + var clientInfo database.ClientInfo + if err := cursor.Decode(&clientInfo); err != nil { + return fmt.Errorf("decode client info: %w", err) + } + + infos = append(infos, clientInfo) + + if len(infos) >= batchSize { + if err := progressMigrationBach(ctx, collection, infos); err != nil { + return err + } + + // TODO(raararaara): print progress + infos = infos[:0] + batchCount++ + } + } + if len(infos) > 0 { + if err := progressMigrationBach(ctx, collection, infos); err != nil { + return err + } + } + + // TODO(raararaara): validation check + + return nil +} diff --git a/migrations/v0.5.6/main.go b/migrations/v0.5.6/main.go new file mode 100644 index 000000000..a39b43a56 --- /dev/null +++ b/migrations/v0.5.6/main.go @@ -0,0 +1,34 @@ +/* + * Copyright 2024 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package v056 provides migration for v0.5.6 +package v056 + +import ( + "context" + + "go.mongodb.org/mongo-driver/mongo" +) + +// RunMigration runs migrations for v0.5.6 +func RunMigration(ctx context.Context, db *mongo.Client, databaseName string, batchSize int) error { + err := DetachDocumentsFromDeactivatedClients(ctx, db, databaseName, batchSize) + if err != nil { + return err + } + + return nil +} From 5482729415e18faa6d6754e8fa34b8549999f1f0 Mon Sep 17 00:00:00 2001 From: raararaara Date: Mon, 11 Nov 2024 16:12:25 +0900 Subject: [PATCH 2/5] Add detach documents script --- migrations/v0.5.6/document-detach.go | 119 ++++++++++++++++++++++----- 1 file changed, 97 insertions(+), 22 deletions(-) diff --git a/migrations/v0.5.6/document-detach.go b/migrations/v0.5.6/document-detach.go index 0f0771fed..1bef10c70 100644 --- a/migrations/v0.5.6/document-detach.go +++ b/migrations/v0.5.6/document-detach.go @@ -19,17 +19,95 @@ package v056 import ( "context" "fmt" - "github.com/yorkie-team/yorkie/server/backend/database" + "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + + "github.com/yorkie-team/yorkie/api/types" + "github.com/yorkie-team/yorkie/server/backend/database" ) +const ( + // StatusKey is the key of the status field. + StatusKey = "status" +) + +func validateDetach(ctx context.Context, collection *mongo.Collection) ([]string, error) { + var failedClients []string + totalCount, err := collection.CountDocuments(ctx, bson.M{}) + if err != nil { + return nil, err + } + fmt.Printf("Validation check for %d clients\n", totalCount) + + cursor, err := collection.Find(ctx, bson.M{}) + if err != nil { + return nil, err + } + + for cursor.Next(ctx) { + var info database.ClientInfo + if err := cursor.Decode(&info); err != nil { + return nil, fmt.Errorf("decode client info: %w", err) + } + + // 01. ensure deactivated + if info.Status != database.ClientDeactivated { + continue + } + + // 02. ensure detached + hasAttachedDocs := false + for _, clientDocInfo := range info.Documents { + if clientDocInfo.Status == database.DocumentAttached { + hasAttachedDocs = true + break + } + } + if hasAttachedDocs { + failedClients = append(failedClients, info.ID.String()) + } + } + + return failedClients, nil +} + func progressMigrationBach( - _ context.Context, - _ *mongo.Collection, - _ []database.ClientInfo, + ctx context.Context, + collection *mongo.Collection, + infos []*database.ClientInfo, ) error { + for _, info := range infos { + fmt.Printf("clientID: %s\n", info.ID) + for docID, clientDocInfo := range info.Documents { + if clientDocInfo.Status != database.DocumentAttached { + continue + } + + updater := bson.M{ + "$set": bson.M{ + clientDocInfoKey(docID, "server_seq"): 0, + clientDocInfoKey(docID, "client_seq"): 0, + clientDocInfoKey(docID, StatusKey): database.DocumentDetached, + "updated_at": time.Now(), + }, + } + + result := collection.FindOneAndUpdate(ctx, bson.M{ + "project_id": info.ProjectID, + "_id": info.ID, + }, updater) + + if result.Err() != nil { + if result.Err() == mongo.ErrNoDocuments { + return fmt.Errorf("%s: %w", info.Key, database.ErrClientNotFound) + } + return fmt.Errorf("update client info: %w", result.Err()) + } + } + + } return nil } @@ -76,27 +154,12 @@ func DetachDocumentsFromDeactivatedClients( } fmt.Printf("total clients: %d\n", totalCount) - //res := collection.FindOne(ctx, filter) - //if res.Err() == mongo.ErrNoDocuments { - // return fmt.Errorf("no doc") - //} - //if res.Err() != nil { - // return fmt.Errorf("error occurs") - //} - // - //clientInfo := database.ClientInfo{} - //if err := res.Decode(&clientInfo); err != nil { - // return fmt.Errorf("decode error") - //} - // - //fmt.Printf("%d %s\n", batchSize, clientInfo.ID.String()) - cursor, err := collection.Find(ctx, filter) if err != nil { return err } - var infos []database.ClientInfo + var infos []*database.ClientInfo batchCount := 1 for cursor.Next(ctx) { @@ -105,7 +168,7 @@ func DetachDocumentsFromDeactivatedClients( return fmt.Errorf("decode client info: %w", err) } - infos = append(infos, clientInfo) + infos = append(infos, &clientInfo) if len(infos) >= batchSize { if err := progressMigrationBach(ctx, collection, infos); err != nil { @@ -123,7 +186,19 @@ func DetachDocumentsFromDeactivatedClients( } } - // TODO(raararaara): validation check + res, err := validateDetach(ctx, collection) + if err != nil { + return err + } + fmt.Printf("Number of failed clients: %d\n", len(res)) + if 0 < len(res) && len(res) < 100 { + fmt.Print(res) + } return nil } + +// clientDocInfoKey returns the key for the client document info. +func clientDocInfoKey(docID types.ID, prefix string) string { + return fmt.Sprintf("documents.%s.%s", docID, prefix) +} From 0fa59bc3066332a7f2d3dcbab1fabcda9c816c5d Mon Sep 17 00:00:00 2001 From: raararaara Date: Wed, 13 Nov 2024 19:16:22 +0900 Subject: [PATCH 3/5] Fix migration batch to activate the deactivated clients that have attached documents --- migrations/v0.5.6/document-detach.go | 63 ++++++++++------------------ migrations/v0.5.6/main.go | 2 +- 2 files changed, 23 insertions(+), 42 deletions(-) diff --git a/migrations/v0.5.6/document-detach.go b/migrations/v0.5.6/document-detach.go index 1bef10c70..298d4ac3c 100644 --- a/migrations/v0.5.6/document-detach.go +++ b/migrations/v0.5.6/document-detach.go @@ -24,7 +24,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" - "github.com/yorkie-team/yorkie/api/types" "github.com/yorkie-team/yorkie/server/backend/database" ) @@ -33,7 +32,7 @@ const ( StatusKey = "status" ) -func validateDetach(ctx context.Context, collection *mongo.Collection) ([]string, error) { +func validateDetach(ctx context.Context, collection *mongo.Collection, filter bson.M) ([]string, error) { var failedClients []string totalCount, err := collection.CountDocuments(ctx, bson.M{}) if err != nil { @@ -41,7 +40,7 @@ func validateDetach(ctx context.Context, collection *mongo.Collection) ([]string } fmt.Printf("Validation check for %d clients\n", totalCount) - cursor, err := collection.Find(ctx, bson.M{}) + cursor, err := collection.Find(ctx, filter) if err != nil { return nil, err } @@ -73,47 +72,34 @@ func validateDetach(ctx context.Context, collection *mongo.Collection) ([]string return failedClients, nil } -func progressMigrationBach( +func progressMigrationBatch( ctx context.Context, collection *mongo.Collection, infos []*database.ClientInfo, ) error { for _, info := range infos { - fmt.Printf("clientID: %s\n", info.ID) - for docID, clientDocInfo := range info.Documents { - if clientDocInfo.Status != database.DocumentAttached { - continue - } - - updater := bson.M{ - "$set": bson.M{ - clientDocInfoKey(docID, "server_seq"): 0, - clientDocInfoKey(docID, "client_seq"): 0, - clientDocInfoKey(docID, StatusKey): database.DocumentDetached, - "updated_at": time.Now(), - }, - } - - result := collection.FindOneAndUpdate(ctx, bson.M{ - "project_id": info.ProjectID, - "_id": info.ID, - }, updater) - - if result.Err() != nil { - if result.Err() == mongo.ErrNoDocuments { - return fmt.Errorf("%s: %w", info.Key, database.ErrClientNotFound) - } - return fmt.Errorf("update client info: %w", result.Err()) + result := collection.FindOneAndUpdate(ctx, bson.M{ + "project_id": info.ProjectID, + "_id": info.ID, + }, bson.M{ + "$set": bson.M{ + StatusKey: database.ClientActivated, + "updated_at": time.Now().AddDate(-1, 0, 0), + }, + }) + if result.Err() != nil { + if result.Err() == mongo.ErrNoDocuments { + return fmt.Errorf("%s: %w", info.Key, database.ErrClientNotFound) } + return fmt.Errorf("update client info: %w", result.Err()) } - } return nil } -// DetachDocumentsFromDeactivatedClients migrates the client collection -// to detach documents attached to a deactivated client. -func DetachDocumentsFromDeactivatedClients( +// ReactivateClients migrates the client collection to activate the clients +// that are in a deactivated but have attached documents. +func ReactivateClients( ctx context.Context, db *mongo.Client, databaseName string, @@ -171,7 +157,7 @@ func DetachDocumentsFromDeactivatedClients( infos = append(infos, &clientInfo) if len(infos) >= batchSize { - if err := progressMigrationBach(ctx, collection, infos); err != nil { + if err := progressMigrationBatch(ctx, collection, infos); err != nil { return err } @@ -181,12 +167,12 @@ func DetachDocumentsFromDeactivatedClients( } } if len(infos) > 0 { - if err := progressMigrationBach(ctx, collection, infos); err != nil { + if err := progressMigrationBatch(ctx, collection, infos); err != nil { return err } } - res, err := validateDetach(ctx, collection) + res, err := validateDetach(ctx, collection, filter) if err != nil { return err } @@ -197,8 +183,3 @@ func DetachDocumentsFromDeactivatedClients( return nil } - -// clientDocInfoKey returns the key for the client document info. -func clientDocInfoKey(docID types.ID, prefix string) string { - return fmt.Sprintf("documents.%s.%s", docID, prefix) -} diff --git a/migrations/v0.5.6/main.go b/migrations/v0.5.6/main.go index a39b43a56..679102838 100644 --- a/migrations/v0.5.6/main.go +++ b/migrations/v0.5.6/main.go @@ -25,7 +25,7 @@ import ( // RunMigration runs migrations for v0.5.6 func RunMigration(ctx context.Context, db *mongo.Client, databaseName string, batchSize int) error { - err := DetachDocumentsFromDeactivatedClients(ctx, db, databaseName, batchSize) + err := ReactivateClients(ctx, db, databaseName, batchSize) if err != nil { return err } From 8459181d26ffb6111b016baa9771a81bf060021e Mon Sep 17 00:00:00 2001 From: raararaara Date: Wed, 13 Nov 2024 23:36:11 +0900 Subject: [PATCH 4/5] Add comments to display progress --- migrations/v0.5.6/document-detach.go | 31 +++++++++++++++------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/migrations/v0.5.6/document-detach.go b/migrations/v0.5.6/document-detach.go index 298d4ac3c..d53c468ee 100644 --- a/migrations/v0.5.6/document-detach.go +++ b/migrations/v0.5.6/document-detach.go @@ -18,6 +18,7 @@ package v056 import ( "context" + "errors" "fmt" "time" @@ -32,6 +33,7 @@ const ( StatusKey = "status" ) +// validateDetach checks whether there are deactivated clients with attached documents. func validateDetach(ctx context.Context, collection *mongo.Collection, filter bson.M) ([]string, error) { var failedClients []string totalCount, err := collection.CountDocuments(ctx, bson.M{}) @@ -72,11 +74,12 @@ func validateDetach(ctx context.Context, collection *mongo.Collection, filter bs return failedClients, nil } -func progressMigrationBatch( +// processMigrationBatch processes the migration batch. +func processMigrationBatch( ctx context.Context, collection *mongo.Collection, infos []*database.ClientInfo, -) error { +) { for _, info := range infos { result := collection.FindOneAndUpdate(ctx, bson.M{ "project_id": info.ProjectID, @@ -88,13 +91,13 @@ func progressMigrationBatch( }, }) if result.Err() != nil { - if result.Err() == mongo.ErrNoDocuments { - return fmt.Errorf("%s: %w", info.Key, database.ErrClientNotFound) + if errors.Is(result.Err(), mongo.ErrNoDocuments) { + _ = fmt.Errorf("%s: %w", info.Key, database.ErrClientNotFound) + } else { + _ = fmt.Errorf("update client info: %w", result.Err()) } - return fmt.Errorf("update client info: %w", result.Err()) } } - return nil } // ReactivateClients migrates the client collection to activate the clients @@ -133,7 +136,6 @@ func ReactivateClients( }, } - // 01. Count the number of target clients totalCount, err := collection.CountDocuments(ctx, filter) if err != nil { return err @@ -148,6 +150,7 @@ func ReactivateClients( var infos []*database.ClientInfo batchCount := 1 + done := 0 for cursor.Next(ctx) { var clientInfo database.ClientInfo if err := cursor.Decode(&clientInfo); err != nil { @@ -157,20 +160,20 @@ func ReactivateClients( infos = append(infos, &clientInfo) if len(infos) >= batchSize { - if err := progressMigrationBatch(ctx, collection, infos); err != nil { - return err - } + processMigrationBatch(ctx, collection, infos) + done += len(infos) - // TODO(raararaara): print progress + percentage := int(float64(batchSize*batchCount) / float64(totalCount) * 100) + fmt.Printf("%s.clients migration progress: %d%%(%d/%d)\n", databaseName, percentage, done, totalCount) infos = infos[:0] batchCount++ } } if len(infos) > 0 { - if err := progressMigrationBatch(ctx, collection, infos); err != nil { - return err - } + processMigrationBatch(ctx, collection, infos) + done += len(infos) } + fmt.Printf("%s.clients migration progress: %d%%(%d/%d)\n", databaseName, 100, done, totalCount) res, err := validateDetach(ctx, collection, filter) if err != nil { From d91a5f7602880aa5b88f64ecd1070e282aabddb0 Mon Sep 17 00:00:00 2001 From: raararaara Date: Thu, 14 Nov 2024 14:21:16 +0900 Subject: [PATCH 5/5] Apply coderabbit's feedback --- migrations/v0.5.6/document-detach.go | 38 +++++++++++++--------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/migrations/v0.5.6/document-detach.go b/migrations/v0.5.6/document-detach.go index d53c468ee..a86383c9c 100644 --- a/migrations/v0.5.6/document-detach.go +++ b/migrations/v0.5.6/document-detach.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -34,23 +33,27 @@ const ( ) // validateDetach checks whether there are deactivated clients with attached documents. -func validateDetach(ctx context.Context, collection *mongo.Collection, filter bson.M) ([]string, error) { - var failedClients []string +func validateDetach(ctx context.Context, collection *mongo.Collection, filter bson.M) int { + var failCount int totalCount, err := collection.CountDocuments(ctx, bson.M{}) if err != nil { - return nil, err + fmt.Printf("[Validation] Count total document failed\n") + return 0 } - fmt.Printf("Validation check for %d clients\n", totalCount) + fmt.Printf("[Validation] Validation check for %d clients\n", totalCount) cursor, err := collection.Find(ctx, filter) if err != nil { - return nil, err + fmt.Printf("[Validation] Find document failed\n") + return 0 } for cursor.Next(ctx) { var info database.ClientInfo if err := cursor.Decode(&info); err != nil { - return nil, fmt.Errorf("decode client info: %w", err) + fmt.Printf("[Validation] decode client info failed\n") + failCount++ + continue } // 01. ensure deactivated @@ -67,11 +70,12 @@ func validateDetach(ctx context.Context, collection *mongo.Collection, filter bs } } if hasAttachedDocs { - failedClients = append(failedClients, info.ID.String()) + fmt.Printf("[Validation] Client %s has attached documents\n", info.Key) + failCount++ } } - return failedClients, nil + return failCount } // processMigrationBatch processes the migration batch. @@ -86,15 +90,14 @@ func processMigrationBatch( "_id": info.ID, }, bson.M{ "$set": bson.M{ - StatusKey: database.ClientActivated, - "updated_at": time.Now().AddDate(-1, 0, 0), + StatusKey: database.ClientActivated, }, }) if result.Err() != nil { if errors.Is(result.Err(), mongo.ErrNoDocuments) { - _ = fmt.Errorf("%s: %w", info.Key, database.ErrClientNotFound) + fmt.Printf("[Migration Batch] Client not found: %s\n", info.Key) } else { - _ = fmt.Errorf("update client info: %w", result.Err()) + fmt.Printf("[Migration Batch] Failed to update client info: %v\n", result.Err()) } } } @@ -175,14 +178,7 @@ func ReactivateClients( } fmt.Printf("%s.clients migration progress: %d%%(%d/%d)\n", databaseName, 100, done, totalCount) - res, err := validateDetach(ctx, collection, filter) - if err != nil { - return err - } - fmt.Printf("Number of failed clients: %d\n", len(res)) - if 0 < len(res) && len(res) < 100 { - fmt.Print(res) - } + fmt.Printf("Number of failed clients: %d\n", validateDetach(ctx, collection, filter)) return nil }