diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index eba2f901330..884c8ace7aa 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -290,24 +290,36 @@ Status SlotMigrator::startMigration() { return s.Prefixed(errFailedToSetImportStatus); } + migration_type_ = srv_->GetConfig()->migrate_type; + + // If the APPLYBATCH command is not supported on the destination, + // we will fall back to the redis-command migration type. + if (migration_type_ == MigrationType::kRawKeyValue) { + bool supported = GET_OR_RET(supportedApplyBatchCommandOnDstNode(*dst_fd_)); + if (!supported) { + LOG(INFO) << "APPLYBATCH command is not supported, use redis command for migration"; + migration_type_ = MigrationType::kRedisCommand; + } + } + LOG(INFO) << "[migrate] Start migrating slot " << migrating_slot_ << ", connect destination fd " << *dst_fd_; return Status::OK(); } Status SlotMigrator::sendSnapshot() { - if (srv_->GetConfig()->migrate_type == MigrationType::kRedisCommand) { + if (migration_type_ == MigrationType::kRedisCommand) { return sendSnapshotByCmd(); - } else if (srv_->GetConfig()->migrate_type == MigrationType::kRawKeyValue) { + } else if (migration_type_ == MigrationType::kRawKeyValue) { return sendSnapshotByRawKV(); } return {Status::NotOK, errUnsupportedMigrationType}; } Status SlotMigrator::syncWAL() { - if (srv_->GetConfig()->migrate_type == MigrationType::kRedisCommand) { + if (migration_type_ == MigrationType::kRedisCommand) { return syncWALByCmd(); - } else if (srv_->GetConfig()->migrate_type == MigrationType::kRawKeyValue) { + } else if (migration_type_ == MigrationType::kRawKeyValue) { return syncWALByRawKV(); } return {Status::NotOK, errUnsupportedMigrationType}; @@ -485,6 +497,33 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) { return Status::OK(); } +StatusOr SlotMigrator::supportedApplyBatchCommandOnDstNode(int sock_fd) { + std::string cmd = redis::ArrayOfBulkStrings({"command", "info", "applybatch"}); + auto s = util::SockSend(sock_fd, cmd); + if (!s.IsOK()) { + return s.Prefixed("failed to send command info to the destination node"); + } + + UniqueEvbuf evbuf; + if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) { + return Status::FromErrno("read response error"); + } + + UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT); + if (!line) { + return Status::FromErrno("read empty response"); + } + + if (line[0] == '*') { + line = UniqueEvbufReadln(evbuf.get(), EVBUFFER_EOL_LF); + if (line && line[0] == '*') { + return true; + } + } + + return false; +} + Status SlotMigrator::checkSingleResponse(int sock_fd) { return checkMultipleResponses(sock_fd, 1); } // Commands | Response | Instance diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h index 8fefdbc9305..e1faf404682 100644 --- a/src/cluster/slot_migrate.h +++ b/src/cluster/slot_migrate.h @@ -119,6 +119,7 @@ class SlotMigrator : public redis::Database { Status authOnDstNode(int sock_fd, const std::string &password); Status setImportStatusOnDstNode(int sock_fd, int status); + static StatusOr supportedApplyBatchCommandOnDstNode(int sock_fd); Status sendSnapshotByCmd(); Status syncWALByCmd(); @@ -187,6 +188,7 @@ class SlotMigrator : public redis::Database { int dst_port_ = -1; UniqueFD dst_fd_; + MigrationType migration_type_ = MigrationType::kRedisCommand; std::atomic forbidden_slot_ = -1; std::atomic migrating_slot_ = -1; int16_t migrate_failed_slot_ = -1; diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index 6901f5f2d08..d782e64d3c5 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -1116,6 +1116,50 @@ func TestSlotMigrateDataType(t *testing.T) { }) } +func TestSlotMigrateTypeFallback(t *testing.T) { + ctx := context.Background() + + srv0 := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "migrate-type": "raw-key-value", + }) + + defer srv0.Close() + rdb0 := srv0.NewClient() + defer func() { require.NoError(t, rdb0.Close()) }() + id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, rdb0.Do(ctx, "clusterx", "setnodeid", id0).Err()) + + srv1 := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "rename-command": "APPLYBATCH APPLYBATCH_RENAMED", + }) + defer srv1.Close() + rdb1 := srv1.NewClient() + defer func() { require.NoError(t, rdb1.Close()) }() + id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, rdb1.Do(ctx, "clusterx", "setnodeid", id1).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", id0, srv0.Host(), srv0.Port()) + clusterNodes += fmt.Sprintf("%s %s %d master -", id1, srv1.Host(), srv1.Port()) + require.NoError(t, rdb0.Do(ctx, "clusterx", "setnodes", clusterNodes, "1").Err()) + require.NoError(t, rdb1.Do(ctx, "clusterx", "setnodes", clusterNodes, "1").Err()) + + t.Run("MIGRATE - Fall back to redis-command migration type when the destination does not support APPLYBATCH", func(t *testing.T) { + info, err := rdb1.Do(ctx, "command", "info", "applybatch").Slice() + require.NoError(t, err) + require.Len(t, info, 1) + require.Nil(t, info[0]) + testSlot += 1 + key := util.SlotTable[testSlot] + value := "value" + require.NoError(t, rdb0.Set(ctx, key, value, 0).Err()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val()) + waitForMigrateState(t, rdb0, testSlot, SlotMigrationStateSuccess) + require.Equal(t, value, rdb1.Get(ctx, key).Val()) + }) +} + func waitForMigrateState(t testing.TB, client *redis.Client, slot int, state SlotMigrationState) { waitForMigrateStateInDuration(t, client, slot, state, 5*time.Second) }