Skip to content

Commit

Permalink
[#19359] DocDB: Do not remove large transactions that are still apply…
Browse files Browse the repository at this point in the history
…ing intents during bootstrap

Summary:
Consider the following scenario:
- A large transaction gets started, intents are written written, the transaction is gets committed. (A large transaction is one that has more than txn_max_apply_batch_records intents.)
- A tablet starts but does not finish applying the intents of the transaction, and writes an apply state into regular RocksDB.
- None of the WAL records of the transaction on the tablet are flushed to intents DB yet.
- The regular RocksDB has flushed recently enough so that its flushed op id is >= the op id of the UPDATE_TRANSACTION_OP (APPLYING status) record of the transaction.
- Tablet server restarts.
- During bootstrap, we see that the UPDATE_TRANSACTION_OP has already been applied to RocksDB, based on its op id and the flushed op id of regular RocksDB, but we ignore its apply state in regular DB, and the transaction gets cleaned up. As a result, only part of the transaction's data ends up in the regular RocksDB.

To fix this bug, we modify TransactionParticipant's Add method to check if there is a pending apply record for the transaction in the loader, and initialize the same information (local commit data and apply data) as is already being done in LoadTransaction.

Other changes:
- Using _WITH_PREFIX versions of logging macros in ShouldReplayOperation in tablet bootstrap.
- Refactoring and simplifying single-node test base class in pg_wrapper-test.
- To facilitate the test scenario needed by this fix, add a regular_only flag to the flush/compaction RPC to flush only regular RocksDB.

Test Plan: Jenkins

Reviewers: timur, sergei

Reviewed By: timur

Subscribers: yql, ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D29000
  • Loading branch information
mbautin committed Oct 11, 2023
1 parent 028b623 commit 5159eb3
Show file tree
Hide file tree
Showing 25 changed files with 353 additions and 150 deletions.
2 changes: 1 addition & 1 deletion java/yb-cdc/src/test/java/org/yb/cdc/TestCDCStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testCreateStreamWithInvalidNamespace() {
} catch (Exception e) {
// The above try block would throw an exception since we are trying to create a stream
// on a namespace which doesn't exist.
assertTrue(e.getMessage().contains("Keyspace name not found: non_existing_namespace"));
assertTrue(e.getMessage().contains("YSQL keyspace name not found: non_existing_namespace"));
exceptionThrown = true;
}
assertTrue(exceptionThrown);
Expand Down
4 changes: 2 additions & 2 deletions src/yb/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1099,8 +1099,8 @@ Status YBClient::Data::WaitForAlterTableToFinish(YBClient* client,
}

Status YBClient::Data::FlushTablesHelper(YBClient* client,
const CoarseTimePoint deadline,
const FlushTablesRequestPB& req) {
const CoarseTimePoint deadline,
const FlushTablesRequestPB& req) {
FlushTablesResponsePB resp;

RETURN_NOT_OK(SyncLeaderMasterRpc(
Expand Down
12 changes: 6 additions & 6 deletions src/yb/client/snapshot-schedule-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ TEST_F(SnapshotScheduleTest, TablegroupGC) {
NamespaceId namespace_id;
TablegroupId tablegroup_id = "11223344556677889900aabbccddeeff";
TablespaceId tablespace_id = "";
auto client_ = ASSERT_RESULT(cluster_->CreateClient());
auto client = ASSERT_RESULT(cluster_->CreateClient());

ASSERT_OK(client_->CreateNamespace(namespace_name, YQL_DATABASE_PGSQL, "" /* creator */,
ASSERT_OK(client->CreateNamespace(namespace_name, YQL_DATABASE_PGSQL, "" /* creator */,
"" /* ns_id */, "" /* src_ns_id */,
boost::none /* next_pg_oid */, nullptr /* txn */, false));
{
auto namespaces = ASSERT_RESULT(client_->ListNamespaces(boost::none));
auto namespaces = ASSERT_RESULT(client->ListNamespaces(boost::none));
for (const auto& ns : namespaces) {
if (ns.id.name() == namespace_name) {
namespace_id = ns.id.id();
Expand All @@ -211,14 +211,14 @@ TEST_F(SnapshotScheduleTest, TablegroupGC) {
}

// Since this is just for testing purposes, we do not bother generating a valid PgsqlTablegroupId.
ASSERT_OK(client_->CreateTablegroup(namespace_name,
ASSERT_OK(client->CreateTablegroup(namespace_name,
namespace_id,
tablegroup_id,
tablespace_id,
nullptr /* txn */));

// Ensure that the newly created tablegroup shows up in the list.
auto exist = ASSERT_RESULT(client_->TablegroupExists(namespace_name, tablegroup_id));
auto exist = ASSERT_RESULT(client->TablegroupExists(namespace_name, tablegroup_id));
ASSERT_TRUE(exist);
TableId parent_table_id = GetTablegroupParentTableId(tablegroup_id);

Expand All @@ -227,7 +227,7 @@ TEST_F(SnapshotScheduleTest, TablegroupGC) {
nullptr, YQLDatabase::YQL_DATABASE_PGSQL, namespace_name, WaitSnapshot::kTrue,
kSnapshotInterval, kSnapshotInterval * 2));

ASSERT_OK(client_->DeleteTablegroup(tablegroup_id, nullptr /* txn */));
ASSERT_OK(client->DeleteTablegroup(tablegroup_id, nullptr /* txn */));

// We give 2 rounds of retention period for cleanup.
ASSERT_OK(WaitFor([&]() -> Result<bool> {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/snapshot-txn-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ bool IntermittentTxnFailure(const Status& status) {
return false;
}

// Concurrently execute multiple transaction, each of them writes the same key multiple times.
// Concurrently execute multiple transactions, each of them writes the same key multiple times.
// And perform tserver restarts in parallel to it.
// This test checks that transaction participant state correctly restored after restart.
void SnapshotTxnTest::TestMultiWriteWithRestart() {
Expand Down
5 changes: 4 additions & 1 deletion src/yb/common/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ enum TransactionStatus {

PROMOTED = 9;

// The APPLYING status is used in Raft in transaction participant tablets but not in status
// tablets.
APPLYING = 20;

// All following entries are not used in RAFT, but as events between status tablet and involved
// tablets:
APPLYING = 20;
APPLIED_IN_ONE_OF_INVOLVED_TABLETS = 21;
IMMEDIATE_CLEANUP = 22;
GRACEFUL_CLEANUP = 23;
Expand Down
8 changes: 8 additions & 0 deletions src/yb/integration-tests/yb_mini_cluster_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ Status MiniClusterTestWithClient<T>::CreateClient() {
return Status::OK();
}

template <class T>
Status MiniClusterTestWithClient<T>::EnsureClientCreated() {
if (!client_) {
return CreateClient();
}
return Status::OK();
}

template <class T>
void MiniClusterTestWithClient<T>::DoTearDown() {
client_.reset();
Expand Down
4 changes: 4 additions & 0 deletions src/yb/integration-tests/yb_mini_cluster_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ class MiniClusterTestWithClient : public YBMiniClusterTestBase<T> {

protected:
virtual Status CreateClient();

// Creates the client only if it has not been created before.
virtual Status EnsureClientCreated();

void DoTearDown() override;

std::unique_ptr<client::YBClient> client_;
Expand Down
5 changes: 4 additions & 1 deletion src/yb/master/async_flush_tablets_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ AsyncFlushTablets::AsyncFlushTablets(Master *master,
const vector<TabletId>& tablet_ids,
const FlushRequestId& flush_id,
bool is_compaction,
bool regular_only,
LeaderEpoch epoch)
: RetrySpecificTSRpcTask(master, callback_pool, ts_uuid, table, std::move(epoch),
/* async_task_throttler */ nullptr),
tablet_ids_(tablet_ids),
flush_id_(flush_id),
is_compaction_(is_compaction) {
is_compaction_(is_compaction),
regular_only_(regular_only) {
}

string AsyncFlushTablets::description() const {
Expand Down Expand Up @@ -97,6 +99,7 @@ bool AsyncFlushTablets::SendRequest(int attempt) {
for (const TabletId& id : tablet_ids_) {
req.add_tablet_ids(id);
}
req.set_regular_only(regular_only_);

ts_admin_proxy_->FlushTabletsAsync(req, &resp_, &rpc_, BindRpcCallback());
VLOG(1) << "Send flush tablets request to " << permanent_uuid_
Expand Down
2 changes: 2 additions & 0 deletions src/yb/master/async_flush_tablets_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class AsyncFlushTablets : public RetrySpecificTSRpcTask {
const std::vector<TabletId>& tablet_ids,
const FlushRequestId& flush_id,
bool is_compaction,
bool regular_only,
LeaderEpoch epoch);

server::MonitoredTaskType type() const override {
Expand All @@ -49,6 +50,7 @@ class AsyncFlushTablets : public RetrySpecificTSRpcTask {
const FlushRequestId flush_id_;
tserver::FlushTabletsResponsePB resp_;
bool is_compaction_ = false;
bool regular_only_ = false;
};

} // namespace master
Expand Down
10 changes: 8 additions & 2 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5424,8 +5424,11 @@ Result<scoped_refptr<NamespaceInfo>> CatalogManager::FindNamespaceUnlocked(
auto db = GetDatabaseType(ns_identifier);
auto it = namespace_names_mapper_[db].find(ns_identifier.name());
if (it == namespace_names_mapper_[db].end()) {
return STATUS(NotFound, "Keyspace name not found", ns_identifier.name(),
MasterError(MasterErrorPB::NAMESPACE_NOT_FOUND));
return STATUS(
NotFound,
Format("$0 keyspace name not found", ShortDatabaseType(db)),
ns_identifier.name(),
MasterError(MasterErrorPB::NAMESPACE_NOT_FOUND));
}
return it->second;
}
Expand Down Expand Up @@ -12684,6 +12687,9 @@ Result<vector<TableDescription>> CatalogManager::CollectTables(
table_id_pb.has_namespace_()) {
auto namespace_info = FindNamespaceUnlocked(table_id_pb.namespace_());
if (!namespace_info.ok()) {
VLOG_WITH_PREFIX_AND_FUNC(1)
<< "Namespace not found: " << table_id_pb.namespace_().ShortDebugString()
<< ", status: "<< namespace_info.status().ToString();
if (namespace_info.status().IsNotFound()) {
continue;
}
Expand Down
6 changes: 4 additions & 2 deletions src/yb/master/flush_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ Status FlushManager::FlushTables(const FlushTablesRequestPB* req,
// Send FlushTablets requests to all Tablet Servers (one TS - one request).
for (const auto& ts : ts_tablet_map) {
// Using last table async task queue.
SendFlushTabletsRequest(ts.first, table, ts.second, flush_id, is_compaction, epoch);
SendFlushTabletsRequest(
ts.first, table, ts.second, flush_id, is_compaction, req->regular_only(), epoch);
}

resp->set_flush_request_id(flush_id);
Expand Down Expand Up @@ -145,10 +146,11 @@ void FlushManager::SendFlushTabletsRequest(const TabletServerId& ts_uuid,
const vector<TabletId>& tablet_ids,
const FlushRequestId& flush_id,
const bool is_compaction,
const bool regular_only,
const LeaderEpoch& epoch) {
auto call = std::make_shared<AsyncFlushTablets>(
master_, catalog_manager_->AsyncTaskPool(), ts_uuid, table, tablet_ids, flush_id,
is_compaction, epoch);
is_compaction, regular_only, epoch);
table->AddTask(call);
WARN_NOT_OK(catalog_manager_->ScheduleTask(call), "Failed to send flush tablets request");
}
Expand Down
1 change: 1 addition & 0 deletions src/yb/master/flush_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class FlushManager {
const std::vector<TabletId>& tablet_ids,
const FlushRequestId& flush_id,
bool is_compaction,
bool regular_only,
const LeaderEpoch& epoch);

void DeleteCompleteFlushRequests();
Expand Down
6 changes: 3 additions & 3 deletions src/yb/master/master-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,7 @@ TEST_F(MasterTest, TestNamespaces) {
ASSERT_TRUE(resp.has_error());
ASSERT_EQ(resp.error().code(), MasterErrorPB::NAMESPACE_NOT_FOUND);
ASSERT_EQ(resp.error().status().code(), AppStatusPB::NOT_FOUND);
ASSERT_STR_CONTAINS(resp.error().status().ShortDebugString(), "Keyspace name not found");
ASSERT_STR_CONTAINS(resp.error().status().ShortDebugString(), "YCQL keyspace name not found");
}
{
ASSERT_NO_FATALS(DoListAllNamespaces(&namespaces));
Expand Down Expand Up @@ -1656,7 +1656,7 @@ TEST_F(MasterTest, TestTablesWithNamespace) {
{
Status s = CreateTable("nonexistingns", kTableName, kTableSchema);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Keyspace name not found");
ASSERT_STR_CONTAINS(s.ToString(), "YCQL keyspace name not found");
}

// List tables, should show 1 table.
Expand Down Expand Up @@ -1710,7 +1710,7 @@ TEST_F(MasterTest, TestTablesWithNamespace) {
ASSERT_TRUE(resp.has_error());
ASSERT_EQ(resp.error().code(), MasterErrorPB::NAMESPACE_NOT_FOUND);
ASSERT_EQ(resp.error().status().code(), AppStatusPB::NOT_FOUND);
ASSERT_STR_CONTAINS(resp.error().status().ShortDebugString(), "Keyspace name not found");
ASSERT_STR_CONTAINS(resp.error().status().ShortDebugString(), "YCQL keyspace name not found");
}
ASSERT_NO_FATALS(DoListAllTables(&tables));
ASSERT_EQ(1 + kNumSystemTables, tables.tables_size());
Expand Down
3 changes: 3 additions & 0 deletions src/yb/master/master_admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ message FlushTablesRequestPB {
// Whether we want to include index tables in this flush. If there are index tables in the tables
// field this field must be set to false, otherwise the request will fail.
optional bool add_indexes = 3;

// Whether the operation only applies to regular RocksDB but not intents RocksDB.
optional bool regular_only = 4;
}

message FlushTablesResponsePB {
Expand Down
11 changes: 11 additions & 0 deletions src/yb/master/master_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "yb/rpc/rpc_controller.h"

#include "yb/util/countdown_latch.h"
#include "yb/util/format.h"
#include "yb/util/net/net_util.h"
#include "yb/util/result.h"
#include "yb/util/status_format.h"
Expand All @@ -46,6 +47,16 @@ const char* DatabasePrefix(YQLDatabase db) {
return kDBTypePrefixUnknown;
}

std::string ShortDatabaseType(YQLDatabase db_type) {
switch(db_type) {
case YQL_DATABASE_UNKNOWN: return "UNKNOWN";
case YQL_DATABASE_CQL: return "YCQL";
case YQL_DATABASE_PGSQL: return "YSQL";
case YQL_DATABASE_REDIS: return "YEDIS";
}
return Format("<invalid database type $0>", to_underlying(db_type));
}

namespace master {

namespace {
Expand Down
4 changes: 4 additions & 0 deletions src/yb/master/master_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ namespace yb {

const char* DatabasePrefix(YQLDatabase db);

// A short version of the given database type, such as "YCQL" or "YSQL". Used
// for human-readable messages.
std::string ShortDatabaseType(YQLDatabase db_type);

namespace consensus {

class RaftPeerPB;
Expand Down
4 changes: 2 additions & 2 deletions src/yb/tablet/running_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,8 @@ void RunningTransaction::SetApplyData(const docdb::ApplyTransactionState& apply_
apply_state_ = apply_state;
bool active = apply_state_.active();
if (active) {
// We are trying to assign set processing apply before starting actual process, and unset
// after we complete processing.
// We are trying to set processing_apply before starting the actual process of applying, and
// unset it after we complete processing.
processing_apply_.store(true, std::memory_order_release);
}

Expand Down
Loading

0 comments on commit 5159eb3

Please sign in to comment.