Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pingcap/tics into mpp_partition_t…
Browse files Browse the repository at this point in the history
…able
  • Loading branch information
windtalker committed Mar 14, 2022
2 parents c637731 + 18b4b04 commit f7065bf
Show file tree
Hide file tree
Showing 13 changed files with 412 additions and 55 deletions.
37 changes: 36 additions & 1 deletion dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,45 @@ int MockTiDB::newTables(
Timestamp tso,
const String & engine_type)
{
std::lock_guard lock(tables_mutex);
if (databases.find(database_name) == databases.end())
{
throw Exception("MockTiDB not found db: " + database_name, ErrorCodes::LOGICAL_ERROR);
}

version++;
SchemaDiff diff;
diff.type = SchemaActionType::CreateTables;
for (const auto & [table_name, columns, handle_pk_name] : tables)
{
newTable(database_name, table_name, columns, tso, handle_pk_name, engine_type);
String qualified_name = database_name + "." + table_name;
if (tables_by_name.find(qualified_name) != tables_by_name.end())
{
throw Exception("Mock TiDB table " + qualified_name + " already exists", ErrorCodes::TABLE_ALREADY_EXISTS);
}

auto table_info = *parseColumns(table_name, columns, handle_pk_name, engine_type);
table_info.id = table_id_allocator++;
table_info.update_timestamp = tso;

auto table = std::make_shared<Table>(database_name, databases[database_name], table_info.name, std::move(table_info));
tables_by_id.emplace(table->table_info.id, table);
tables_by_name.emplace(qualified_name, table);

AffectedOption opt;
opt.schema_id = table->database_id;
opt.table_id = table->id();
opt.old_schema_id = table->database_id;
opt.old_table_id = table->id();
diff.affected_opts.push_back(std::move(opt));
}

if (diff.affected_opts.empty())
throw Exception("MockTiDB CreateTables should have at lease 1 table", ErrorCodes::LOGICAL_ERROR);

diff.schema_id = diff.affected_opts[0].schema_id;
diff.version = version;
version_diff[version] = diff;
return 0;
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ struct Settings
M(SettingUInt64, dt_segment_delta_small_column_file_size, 8388608, "Determine whether a column file in delta is small or not. 8MB by default.") \
M(SettingUInt64, dt_segment_stable_pack_rows, DEFAULT_MERGE_BLOCK_SIZE, "Expected stable pack rows in DeltaTree Engine.") \
M(SettingFloat, dt_segment_wait_duration_factor, 1, "The factor of wait duration in a write stall.") \
M(SettingUInt64, dt_bg_gc_check_interval, 600, "Background gc thread check interval, the unit is second.") \
M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 15, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.") \
M(SettingUInt64, dt_bg_gc_check_interval, 5, "Background gc thread check interval, the unit is second.") \
M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 100, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.") \
M(SettingFloat, dt_bg_gc_ratio_threhold_to_trigger_gc, 1.2, "Trigger segment's gc when the ratio of invalid version exceed this threhold. Values smaller than or equal to 1.0 means gc all " \
"segments") \
M(SettingFloat, dt_bg_gc_delta_delete_ratio_to_trigger_gc, 0.3, "Trigger segment's gc when the ratio of delta delete range to stable exceeds this ratio.") \
Expand Down
46 changes: 21 additions & 25 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ void VersionedPageEntries::createNewEntry(const PageVersionType & ver, const Pag

if (type == EditRecordType::VAR_ENTRY)
{
auto last_iter = entries.rbegin();
if (last_iter->second.isDelete())
auto last_iter = MapUtils::findLess(entries, PageVersionType(ver.sequence + 1, 0));
if (last_iter == entries.end())
{
entries.emplace(ver, EntryOrDelete::newNormalEntry(entry));
}
else if (last_iter->second.isDelete())
{
entries.emplace(ver, EntryOrDelete::newNormalEntry(entry));
}
Expand Down Expand Up @@ -302,10 +306,8 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 *
}
else if (type == EditRecordType::VAR_EXTERNAL)
{
// If we applied write batches like this: [ver=1]{putExternal 10}, [ver=2]{ref 11->10, del 10}
// then by ver=2, we should not able to read 10, but able to read 11 (resolving 11 ref to 10).
// when resolving 11 to 10, we need to set `check_prev` to true
bool ok = !is_deleted || (is_deleted && (check_prev ? (seq <= delete_ver.sequence) : (seq < delete_ver.sequence)));
// We may add reference to an external id even if it is logically deleted.
bool ok = check_prev ? true : (!is_deleted || (is_deleted && seq < delete_ver.sequence));
if (create_ver.sequence <= seq && ok)
{
return {RESOLVE_TO_NORMAL, buildV3Id(0, 0), PageVersionType(0)};
Expand Down Expand Up @@ -354,8 +356,9 @@ Int64 VersionedPageEntries::incrRefCount(const PageVersionType & ver)
}
else if (type == EditRecordType::VAR_EXTERNAL)
{
if (create_ver <= ver && (!is_deleted || (is_deleted && ver < delete_ver)))
if (create_ver <= ver)
{
// We may add reference to an external id even if it is logically deleted.
return ++being_ref_count;
}
}
Expand Down Expand Up @@ -397,7 +400,6 @@ PageSize VersionedPageEntries::getEntriesByBlobIds(

bool VersionedPageEntries::cleanOutdatedEntries(
UInt64 lowest_seq,
PageIdV3Internal page_id,
std::map<PageIdV3Internal, std::pair<PageVersionType, Int64>> * normal_entries_to_deref,
PageEntriesV3 & entries_removed,
const PageLock & /*page_lock*/)
Expand All @@ -416,15 +418,6 @@ bool VersionedPageEntries::cleanOutdatedEntries(
// need to decrease the ref count by <id=iter->second.origin_page_id, ver=iter->first, num=1>
if (auto [deref_counter, new_created] = normal_entries_to_deref->emplace(std::make_pair(ori_page_id, std::make_pair(/*ver=*/create_ver, /*count=*/1))); !new_created)
{
if (deref_counter->second.first.sequence != create_ver.sequence)
{
throw Exception(fmt::format(
"There exist two different version of ref, should not happen [page_id={}] [ori_page_id={}] [ver={}] [another_ver={}]",
page_id,
ori_page_id,
create_ver,
deref_counter->second.first));
}
// the id is already exist in deref map, increase the num to decrease ref count
deref_counter->second.second += 1;
}
Expand Down Expand Up @@ -508,7 +501,8 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag
}
else if (type == EditRecordType::VAR_ENTRY)
{
// decrease the ref-counter
// Decrease the ref-counter. The entry may be moved to a newer entry with same sequence but higher epoch,
// so we need to find the one less than <seq+1, 0> and decrease the ref-counter of it.
auto iter = MapUtils::findMutLess(entries, PageVersionType(deref_ver.sequence + 1, 0));
if (iter == entries.end())
{
Expand All @@ -530,7 +524,7 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag

// Clean outdated entries after decreased the ref-counter
// set `normal_entries_to_deref` to be nullptr to ignore cleaning ref-var-entries
return cleanOutdatedEntries(lowest_seq, page_id, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock);
return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock);
}

throw Exception(fmt::format("calling derefAndClean with invalid state [state={}]", toDebugString()));
Expand Down Expand Up @@ -863,7 +857,10 @@ void PageDirectory::applyRefEditRecord(
const VersionedPageEntriesPtr & resolve_version_list = resolve_ver_iter->second;
// If we already hold the lock from `id_to_resolve`, then we should not request it again.
// This can happen when `id_to_resolve` have other operating in current writebatch
auto [need_collapse, next_id_to_resolve, next_ver_to_resolve] = resolve_version_list->resolveToPageId(ver_to_resolve.sequence, false, nullptr);
auto [need_collapse, next_id_to_resolve, next_ver_to_resolve] = resolve_version_list->resolveToPageId(
ver_to_resolve.sequence,
/*check_prev=*/true,
nullptr);
switch (need_collapse)
{
case VersionedPageEntries::RESOLVE_FAIL:
Expand Down Expand Up @@ -966,7 +963,7 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write
case EditRecordType::VAR_ENTRY:
case EditRecordType::VAR_EXTERNAL:
case EditRecordType::VAR_REF:
throw Exception(fmt::format("should not handle {} edit", r.type));
throw Exception(fmt::format("should not handle edit with invalid type [type={}]", r.type));
}
}
catch (DB::Exception & e)
Expand Down Expand Up @@ -1000,7 +997,7 @@ void PageDirectory::gcApply(PageEntriesEdit && migrated_edit, const WriteLimiter
iter = mvcc_table_directory.find(record.page_id);
if (unlikely(iter == mvcc_table_directory.end()))
{
throw Exception(fmt::format("Can't found [page_id={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("Can't find [page_id={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR);
}
} // release the read lock on `table_rw_mutex`

Expand Down Expand Up @@ -1126,7 +1123,6 @@ PageEntriesV3 PageDirectory::gcInMemEntries()
// do gc on the version list without lock on `mvcc_table_directory`.
const bool all_deleted = iter->second->cleanOutdatedEntries(
lowest_seq,
/*page_id=*/iter->first,
&normal_entries_to_deref,
all_del_entries,
iter->second->acquireLock());
Expand Down Expand Up @@ -1161,8 +1157,8 @@ PageEntriesV3 PageDirectory::gcInMemEntries()
const bool all_deleted = iter->second->derefAndClean(
lowest_seq,
page_id,
deref_counter.first,
deref_counter.second,
/*deref_ver=*/deref_counter.first,
/*deref_count=*/deref_counter.second,
all_del_entries);

if (all_deleted)
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Common/CurrentMetrics.h>
#include <Common/LogWithPrefix.h>
#include <Encryption/FileProvider.h>
#include <Poco/Ext/ThreadNumber.h>
#include <Storages/Page/Page.h>
#include <Storages/Page/Snapshot.h>
Expand All @@ -17,8 +18,6 @@
#include <shared_mutex>
#include <unordered_map>

#include "Encryption/FileProvider.h"

namespace CurrentMetrics
{
extern const Metric PSMVCCNumSnapshots;
Expand Down Expand Up @@ -205,7 +204,6 @@ class VersionedPageEntries
*/
bool cleanOutdatedEntries(
UInt64 lowest_seq,
PageIdV3Internal page_id,
std::map<PageIdV3Internal, std::pair<PageVersionType, Int64>> * normal_entries_to_deref,
PageEntriesV3 & entries_removed,
const PageLock & page_lock);
Expand Down
30 changes: 21 additions & 9 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,35 @@ PageDirectoryPtr PageDirectoryFactory::create(FileProviderPtr & file_provider, P
auto [wal, reader] = WALStore::create(file_provider, delegator);
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(wal));
loadFromDisk(dir, std::move(reader));
// TODO: After restored ends, set the last offset of log file for `wal`
if (blob_stats)
blob_stats->restore();
// Reset the `sequence` to the maximum of persisted.
dir->sequence = max_applied_ver.sequence;

if (blob_stats)
{
// After all entries restored to `mvcc_table_directory`, only apply
// the latest entry to `blob_stats`, or we may meet error since
// some entries may be removed in memory but not get compacted
// in the log file.
for (const auto & [page_id, entries] : dir->mvcc_table_directory)
{
(void)page_id;
if (auto entry = entries->getEntry(max_applied_ver.sequence); entry)
{
blob_stats->restoreByEntry(*entry);
}
}

blob_stats->restore();
}

// TODO: After restored ends, set the last offset of log file for `wal`
return dir;
}

PageDirectoryPtr PageDirectoryFactory::createFromEdit(FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit)
{
auto [wal, reader] = WALStore::create(file_provider, delegator);
(void)reader;
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(wal));
loadEdit(dir, edit);
if (blob_stats)
Expand Down Expand Up @@ -67,8 +85,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr
}
case EditRecordType::VAR_ENTRY:
version_list->fromRestored(r);
if (blob_stats)
blob_stats->restoreByEntry(r.entry);
break;
case EditRecordType::PUT_EXTERNAL:
{
Expand All @@ -82,8 +98,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr
}
case EditRecordType::PUT:
version_list->createNewEntry(restored_version, r.entry);
if (blob_stats)
blob_stats->restoreByEntry(r.entry);
break;
case EditRecordType::DEL:
case EditRecordType::VAR_DELETE: // nothing different from `DEL`
Expand All @@ -94,8 +108,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr
break;
case EditRecordType::UPSERT:
version_list->createNewEntry(restored_version, r.entry);
if (blob_stats)
blob_stats->restoreByEntry(r.entry);
break;
}
}
Expand Down
45 changes: 44 additions & 1 deletion dbms/src/Storages/Page/V3/PageEntriesEdit.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,31 @@ enum class EditRecordType
VAR_DELETE,
};

inline const char * typeToString(EditRecordType t)
{
switch (t)
{
case EditRecordType::PUT:
return "PUT ";
case EditRecordType::PUT_EXTERNAL:
return "EXT ";
case EditRecordType::REF:
return "REF ";
case EditRecordType::DEL:
return "DEL ";
case EditRecordType::UPSERT:
return "UPSERT ";
case EditRecordType::VAR_ENTRY:
return "VAR_ENT";
case EditRecordType::VAR_REF:
return "VAR_REF";
case EditRecordType::VAR_EXTERNAL:
return "VAR_EXT";
case EditRecordType::VAR_DELETE:
return "VAR_DEL";
}
}

/// Page entries change to apply to PageDirectory
class PageEntriesEdit
{
Expand Down Expand Up @@ -176,10 +201,28 @@ class PageEntriesEdit
PageIdV3Internal ori_page_id;
PageVersionType version;
PageEntryV3 entry;
Int64 being_ref_count = 1;
Int64 being_ref_count;

EditRecord()
: page_id(0)
, ori_page_id(0)
, being_ref_count(1)
{}
};
using EditRecords = std::vector<EditRecord>;

static String toDebugString(const EditRecord & rec)
{
return fmt::format(
"{{type:{}, page_id:{}, ori_id:{}, version:{}, entry:{}, being_ref_count:{}}}",
typeToString(rec.type),
rec.page_id,
rec.ori_page_id,
rec.version,
DB::PS::V3::toDebugString(rec.entry),
rec.being_ref_count);
}

void appendRecord(const EditRecord & rec)
{
records.emplace_back(rec);
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Storages/Page/V3/WAL/WALReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,18 @@ WALStoreReader::findCheckpoint(LogFilenameSet && all_files)
LogFilename latest_checkpoint = *latest_checkpoint_iter;
for (auto iter = all_files.cbegin(); iter != all_files.cend(); /*empty*/)
{
if (iter->log_num < latest_checkpoint.log_num)
// We use <largest_num, 1> as the checkpoint, so all files less than or equal
// to latest_checkpoint.log_num can be erase
if (iter->log_num <= latest_checkpoint.log_num)
{
// TODO: clean useless file that is older than `checkpoint`
if (iter->log_num == latest_checkpoint.log_num && iter->level_num != 0)
{
// the checkpoint file, not remove
}
else
{
// TODO: clean useless file that is older than `checkpoint`
}
iter = all_files.erase(iter);
}
else
Expand Down Expand Up @@ -186,6 +195,7 @@ bool WALStoreReader::openNextFile()
if (!checkpoint_read_done)
{
do_open(*checkpoint_file);
checkpoint_read_done = true;
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/WAL/serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void serializePutTo(const PageEntriesEdit::EditRecord & record, WriteBuffer & bu
{
assert(record.type == EditRecordType::PUT || record.type == EditRecordType::UPSERT || record.type == EditRecordType::VAR_ENTRY);

writeIntBinary(EditRecordType::PUT, buf);
writeIntBinary(record.type, buf);

UInt32 flags = 0;
writeIntBinary(flags, buf);
Expand Down
Loading

0 comments on commit f7065bf

Please sign in to comment.