Skip to content

Commit

Permalink
[#7126] PITR: Create snapshot for schedule
Browse files Browse the repository at this point in the history
Summary:
This diff is a follow up to D10726 / 0fde8db.

With this diff, the interval setting of schedules will be respected. We will now take snapshots:
- when a schedule is first created
- repeatedly, after the set amount of time, specified by each schedule's interval

Test Plan: ybd --gtest_filter SnapshotScheduleTest.Snapshot

Reviewers: nicolas, amitanand, bogdan

Reviewed By: bogdan

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D10781
  • Loading branch information
spolitov committed Mar 8, 2021
1 parent 51942f6 commit 5a18fb9
Show file tree
Hide file tree
Showing 20 changed files with 582 additions and 217 deletions.
7 changes: 7 additions & 0 deletions ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon

TabletInfos GetTabletInfos(const std::vector<TabletId>& ids) override;

Result<SysRowEntries> CollectEntries(
const google::protobuf::RepeatedPtrField<TableIdentifierPB>& tables,
bool add_indexes,
bool include_parent_colocated_table) override;

server::Clock* Clock() override;

const Schema& schema() override;

void Submit(std::unique_ptr<tablet::Operation> operation) override;
Expand Down
25 changes: 19 additions & 6 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,19 +379,32 @@ void CatalogManager::Submit(std::unique_ptr<tablet::Operation> operation) {
tablet_peer()->Submit(std::move(operation), leader_ready_term());
}

Status CatalogManager::CreateTransactionAwareSnapshot(
const CreateSnapshotRequestPB& req, CreateSnapshotResponsePB* resp, rpc::RpcContext* rpc) {
Result<SysRowEntries> CatalogManager::CollectEntries(
const google::protobuf::RepeatedPtrField<TableIdentifierPB>& table_identifiers,
bool add_indexes,
bool include_parent_colocated_table) {
SysRowEntries entries;
auto tables = VERIFY_RESULT(CollectTables(req.tables(),
req.add_indexes(),
true /* include_parent_colocated_table */));
auto tables = VERIFY_RESULT(CollectTables(
table_identifiers, add_indexes, include_parent_colocated_table));
for (const auto& table : tables) {
// TODO(txn_snapshot) use single lock to resolve all tables to tablets
SnapshotInfo::AddEntries(table, entries.mutable_entries(), /* tablet_infos= */ nullptr);
}

return entries;
}

server::Clock* CatalogManager::Clock() {
return master_->clock();
}

Status CatalogManager::CreateTransactionAwareSnapshot(
const CreateSnapshotRequestPB& req, CreateSnapshotResponsePB* resp, rpc::RpcContext* rpc) {
SysRowEntries entries = VERIFY_RESULT(CollectEntries(
req.tables(), req.add_indexes(), true /* include_parent_colocated_table */));

auto snapshot_id = VERIFY_RESULT(snapshot_coordinator_.Create(
entries, req.imported(), master_->clock()->MaxGlobalNow(), rpc->GetClientDeadline()));
entries, req.imported(), rpc->GetClientDeadline()));
resp->set_snapshot_id(snapshot_id.data(), snapshot_id.size());
return Status::OK();
}
Expand Down
3 changes: 3 additions & 0 deletions ent/src/yb/master/master_backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ message SysSnapshotEntryPB {
optional fixed64 snapshot_hybrid_time = 4;

optional int64 version = 5;

optional bytes schedule_id = 6;
}

////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -188,6 +190,7 @@ message ListSnapshotSchedulesRequestPB {
message SnapshotScheduleInfoPB {
optional bytes id = 1;
optional SnapshotScheduleOptionsPB options = 2;
repeated SnapshotInfoPB snapshots = 3;
}

message ListSnapshotSchedulesResponsePB {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ target_link_libraries(yb_client_test_util
gmock
yb_client)

add_library(ql-dml-test-base ql-dml-test-base.cc txn-test-base.cc)
add_library(ql-dml-test-base ql-dml-test-base.cc txn-test-base.cc snapshot_test_base.cc)
target_link_libraries(ql-dml-test-base integration-tests)

# Tests
Expand Down
112 changes: 2 additions & 110 deletions src/yb/client/backup-txn-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
//

#include "yb/client/session.h"
#include "yb/client/snapshot_test_base.h"
#include "yb/client/transaction.h"
#include "yb/client/txn-test-base.h"

#include "yb/common/transaction_error.h"

Expand Down Expand Up @@ -45,13 +45,10 @@ DECLARE_uint64(snapshot_coordinator_poll_interval_ms);
namespace yb {
namespace client {

using Snapshots = google::protobuf::RepeatedPtrField<master::SnapshotInfoPB>;
using ImportedSnapshotData = google::protobuf::RepeatedPtrField<
master::ImportSnapshotMetaResponsePB::TableMetaPB>;

constexpr auto kWaitTimeout = 15s;

class BackupTxnTest : public TransactionTestBase<MiniCluster> {
class BackupTxnTest : public SnapshotTestBase {
protected:
void SetUp() override {
FLAGS_enable_history_cutoff_propagation = true;
Expand All @@ -69,11 +66,6 @@ class BackupTxnTest : public TransactionTestBase<MiniCluster> {
TransactionTestBase::DoBeforeTearDown();
}

master::MasterBackupServiceProxy MakeBackupServiceProxy() {
return master::MasterBackupServiceProxy(
&client_->proxy_cache(), cluster_->leader_mini_master()->bound_rpc_addr());
}

Result<TxnSnapshotId> StartSnapshot() {
rpc::RpcController controller;
controller.set_timeout(60s);
Expand All @@ -86,20 +78,6 @@ class BackupTxnTest : public TransactionTestBase<MiniCluster> {
return FullyDecodeTxnSnapshotId(resp.snapshot_id());
}

Result<SysSnapshotEntryPB::State> SnapshotState(const TxnSnapshotId& snapshot_id) {
auto snapshots = VERIFY_RESULT(ListSnapshots(snapshot_id));
if (snapshots.size() != 1) {
return STATUS_FORMAT(RuntimeError, "Wrong number of snapshots, one expected but $0 found",
snapshots.size());
}
LOG(INFO) << "Snapshot state: " << snapshots[0].ShortDebugString();
return snapshots[0].entry().state();
}

Result<bool> IsSnapshotDone(const TxnSnapshotId& snapshot_id) {
return VERIFY_RESULT(SnapshotState(snapshot_id)) == SysSnapshotEntryPB::COMPLETE;
}

Result<TxnSnapshotRestorationId> StartRestoration(
const TxnSnapshotId& snapshot_id, HybridTime restore_at = HybridTime(),
int64_t interval = 0) {
Expand Down Expand Up @@ -147,98 +125,12 @@ class BackupTxnTest : public TransactionTestBase<MiniCluster> {
}, kWaitTimeout * kTimeMultiplier, "Restoration done");
}

Result<Snapshots> ListSnapshots(
const TxnSnapshotId& snapshot_id = TxnSnapshotId::Nil()) {
master::ListSnapshotsRequestPB req;
master::ListSnapshotsResponsePB resp;

req.set_list_deleted_snapshots(true);
if (!snapshot_id.IsNil()) {
req.set_snapshot_id(snapshot_id.data(), snapshot_id.size());
}

rpc::RpcController controller;
controller.set_timeout(60s);
RETURN_NOT_OK(MakeBackupServiceProxy().ListSnapshots(req, &resp, &controller));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
LOG(INFO) << "Snapshots: " << resp.ShortDebugString();
return std::move(resp.snapshots());
}

CHECKED_STATUS VerifySnapshot(
const TxnSnapshotId& snapshot_id, SysSnapshotEntryPB::State state) {
auto snapshots = VERIFY_RESULT(ListSnapshots());
SCHECK_EQ(snapshots.size(), 1, IllegalState, "Wrong number of snapshots");
const auto& snapshot = snapshots[0];
auto listed_snapshot_id = VERIFY_RESULT(FullyDecodeTxnSnapshotId(snapshot.id()));
if (listed_snapshot_id != snapshot_id) {
return STATUS_FORMAT(
IllegalState, "Wrong snapshot id returned $0, expected $1", listed_snapshot_id,
snapshot_id);
}
if (snapshot.entry().state() != state) {
return STATUS_FORMAT(
IllegalState, "Wrong snapshot state: $0 vs $1",
SysSnapshotEntryPB::State_Name(snapshot.entry().state()),
SysSnapshotEntryPB::State_Name(state));
}
size_t num_namespaces = 0, num_tables = 0, num_tablets = 0;
for (const auto& entry : snapshot.entry().entries()) {
switch (entry.type()) {
case master::SysRowEntry::TABLET:
++num_tablets;
break;
case master::SysRowEntry::TABLE:
++num_tables;
break;
case master::SysRowEntry::NAMESPACE:
++num_namespaces;
break;
default:
return STATUS_FORMAT(
IllegalState, "Unexpected entry type: $0",
master::SysRowEntry::Type_Name(entry.type()));
}
}
SCHECK_EQ(num_namespaces, 1, IllegalState, "Wrong number of namespaces");
SCHECK_EQ(num_tables, 1, IllegalState, "Wrong number of tables");
SCHECK_EQ(num_tablets, table_.table()->GetPartitionCount(), IllegalState,
"Wrong number of tablets");

return Status::OK();
}

Result<TxnSnapshotId> CreateSnapshot() {
TxnSnapshotId snapshot_id = VERIFY_RESULT(StartSnapshot());
RETURN_NOT_OK(WaitSnapshotDone(snapshot_id));
return snapshot_id;
}

CHECKED_STATUS WaitSnapshotInState(
const TxnSnapshotId& snapshot_id, SysSnapshotEntryPB::State state,
MonoDelta duration = kWaitTimeout) {
auto state_name = SysSnapshotEntryPB::State_Name(state);
SysSnapshotEntryPB::State last_state = SysSnapshotEntryPB::UNKNOWN;
auto status = WaitFor([this, &snapshot_id, state, &last_state]() -> Result<bool> {
last_state = VERIFY_RESULT(SnapshotState(snapshot_id));
return last_state == state;
}, duration * kTimeMultiplier, "Snapshot in state " + state_name);

if (!status.ok() && status.IsTimedOut()) {
return STATUS_FORMAT(
IllegalState, "Wrong snapshot state: $0, while $1 expected",
SysSnapshotEntryPB::State_Name(last_state), state_name);
}
return status;
}

CHECKED_STATUS WaitSnapshotDone(
const TxnSnapshotId& snapshot_id, MonoDelta duration = kWaitTimeout) {
return WaitSnapshotInState(snapshot_id, SysSnapshotEntryPB::COMPLETE, duration);
}

CHECKED_STATUS DeleteSnapshot(const TxnSnapshotId& snapshot_id) {
master::DeleteSnapshotRequestPB req;
master::DeleteSnapshotResponsePB resp;
Expand Down
52 changes: 45 additions & 7 deletions src/yb/client/snapshot-schedule-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,37 @@
// under the License.
//

#include "yb/client/txn-test-base.h"
#include "yb/client/snapshot_test_base.h"

#include "yb/master/master_backup.proxy.h"

using namespace std::literals;

DECLARE_uint64(snapshot_coordinator_poll_interval_ms);


namespace yb {
namespace client {

using Schedules = google::protobuf::RepeatedPtrField<master::SnapshotScheduleInfoPB>;
constexpr auto kSnapshotInterval = 10s * kTimeMultiplier;

class SnapshotScheduleTest : public TransactionTestBase<MiniCluster> {
class SnapshotScheduleTest : public SnapshotTestBase {
public:
master::MasterBackupServiceProxy MakeBackupServiceProxy() {
return master::MasterBackupServiceProxy(
&client_->proxy_cache(), cluster_->leader_mini_master()->bound_rpc_addr());
void SetUp() override {
FLAGS_snapshot_coordinator_poll_interval_ms = 250;
SnapshotTestBase::SetUp();
}

Result<SnapshotScheduleId> CreateSchedule() {
rpc::RpcController controller;
controller.set_timeout(60s);
master::CreateSnapshotScheduleRequestPB req;
req.mutable_options()->set_interval_sec(60);
req.mutable_options()->set_retention_duration_sec(1200);
auto& options = *req.mutable_options();
options.set_interval_sec(std::chrono::seconds(kSnapshotInterval).count());
options.set_retention_duration_sec(1200);
auto& tables = *options.mutable_filter()->mutable_tables()->mutable_tables();
tables.Add()->set_table_id(table_.table()->id());
master::CreateSnapshotScheduleResponsePB resp;
RETURN_NOT_OK(MakeBackupServiceProxy().CreateSnapshotSchedule(req, &resp, &controller));
return FullyDecodeSnapshotScheduleId(resp.snapshot_schedule_id());
Expand Down Expand Up @@ -86,5 +93,36 @@ TEST_F(SnapshotScheduleTest, Create) {
}
}

TEST_F(SnapshotScheduleTest, Snapshot) {
ASSERT_NO_FATALS(WriteData());
auto schedule_id = ASSERT_RESULT(CreateSchedule());
ASSERT_OK(WaitFor([this, schedule_id]() -> Result<bool> {
auto snapshots = VERIFY_RESULT(ListSnapshots());
EXPECT_LE(snapshots.size(), 1);
LOG(INFO) << "Snapshots: " << AsString(snapshots);
for (const auto& snapshot : snapshots) {
EXPECT_EQ(TryFullyDecodeSnapshotScheduleId(snapshot.entry().schedule_id()), schedule_id);
if (snapshot.entry().state() == master::SysSnapshotEntryPB::COMPLETE) {
return true;
}
}
return false;
}, kSnapshotInterval / 2, "First snapshot"));

auto schedules = ASSERT_RESULT(ListSchedules());
ASSERT_EQ(schedules.size(), 1);
ASSERT_EQ(schedules[0].snapshots().size(), 1);
ASSERT_EQ(schedules[0].snapshots()[0].entry().state(), master::SysSnapshotEntryPB::COMPLETE);

std::this_thread::sleep_for(kSnapshotInterval / 4);
auto snapshots = ASSERT_RESULT(ListSnapshots());
ASSERT_EQ(snapshots.size(), 1);

ASSERT_OK(WaitFor([this]() -> Result<bool> {
auto snapshots = VERIFY_RESULT(ListSnapshots());
return snapshots.size() == 2;
}, kSnapshotInterval, "Second snapshot"));
}

} // namespace client
} // namespace yb
Loading

0 comments on commit 5a18fb9

Please sign in to comment.