Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust order of region check and lock by region scanner creation #136

Merged
merged 4 commits into from
Jul 24, 2019
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 50 additions & 34 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,29 @@ void RegionTable::writeBlockByRegion(

UInt64 region_read_cost = -1, region_decode_cost = -1, write_part_cost = -1;

/// Read raw KVs from region cache.
RegionDataReadInfoList data_list_read;
{
auto start_time = Clock::now();
auto scanner = region->createCommittedScanner(table_id);
// Shortcut for empty region.
if (!scanner->hasNext())
return;
do

/// Some sanity checks for region meta.
{
data_list_read.emplace_back(scanner->next());
} while (scanner->hasNext());
region_read_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
}
if (region->isPendingRemove())
return;
}

/// Read raw KVs from region cache.
{
// Shortcut for empty region.
if (!scanner->hasNext())
return;
auto start_time = Clock::now();
do
{
data_list_read.emplace_back(scanner->next());
} while (scanner->hasNext());
region_read_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
}
}

/// Declare lambda of atomic read then write to call multiple times.
auto atomicReadWrite = [&](bool force_decode) {
Expand Down Expand Up @@ -87,17 +95,21 @@ void RegionTable::writeBlockByRegion(
};

/// Try read then write once.
if (atomicReadWrite(false))
return;
{
if (atomicReadWrite(false))
return;
}

/// If first try failed, sync schema and force read then write.
tmt.getSchemaSyncer()->syncSchemas(context);
{
tmt.getSchemaSyncer()->syncSchemas(context);

if (!atomicReadWrite(true))
// Failure won't be tolerated this time.
// TODO: Enrich exception message.
throw Exception("Write region " + std::to_string(region->id()) + " to table " + std::to_string(table_id) + " failed",
ErrorCodes::LOGICAL_ERROR);
if (!atomicReadWrite(true))
// Failure won't be tolerated this time.
// TODO: Enrich exception message.
throw Exception("Write region " + std::to_string(region->id()) + " to table " + std::to_string(table_id) + " failed",
ErrorCodes::LOGICAL_ERROR);
}

LOG_TRACE(log,
__PRETTY_FUNCTION__ << ": table " << table_id << ", region " << region->id() << ", cost [region read " << region_read_cost
Expand Down Expand Up @@ -126,19 +138,19 @@ std::tuple<BlockOption, RegionTable::RegionReadStatus> RegionTable::readBlockByR
wait_index_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
}

/// Some sanity checks for region meta.
{
if (region->isPendingRemove())
return {BlockOption{}, PENDING_REMOVE};

if (region_version != InvalidRegionVersion && (region->version() != region_version || region->confVer() != conf_version))
return {BlockOption{}, VERSION_ERROR};
}

RegionDataReadInfoList data_list_read;
{
auto scanner = region->createCommittedScanner(table_info.id);

/// Some sanity checks for region meta.
{
if (region->isPendingRemove())
return {BlockOption{}, PENDING_REMOVE};

if (region_version != InvalidRegionVersion && (region->version() != region_version || region->confVer() != conf_version))
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
return {BlockOption{}, VERSION_ERROR};
}

/// Deal with locks.
{
if (resolve_locks)
Expand Down Expand Up @@ -170,13 +182,17 @@ std::tuple<BlockOption, RegionTable::RegionReadStatus> RegionTable::readBlockByR
}

/// Read region data as block.
auto start_time = Clock::now();
auto [block, ok] = readRegionBlock(table_info, columns, column_names_to_read, data_list_read, start_ts, true);
region_decode_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
if (!ok)
// TODO: Enrich exception message.
throw Exception("Read region " + std::to_string(region->id()) + " of table " + std::to_string(table_info.id) + " failed",
ErrorCodes::LOGICAL_ERROR);
Block block;
{
bool ok = false;
auto start_time = Clock::now();
std::tie(block, ok) = readRegionBlock(table_info, columns, column_names_to_read, data_list_read, start_ts, true);
region_decode_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
if (!ok)
// TODO: Enrich exception message.
throw Exception("Read region " + std::to_string(region->id()) + " of table " + std::to_string(table_info.id) + " failed",
ErrorCodes::LOGICAL_ERROR);
}

LOG_TRACE(log,
__PRETTY_FUNCTION__ << ": table " << table_info.id << ", region " << region->id() << ", cost [wait index " << wait_index_cost
Expand Down