Skip to content

Commit

Permalink
Adjust order of region check and lock by region scanner creation (#136)
Browse files Browse the repository at this point in the history
* Fix dead lock in learner read

* Adjust order of region check and lock by region scanner creation

* Remove useless invalid region version check
  • Loading branch information
zanmato1984 authored Jul 24, 2019
1 parent 22679aa commit bb200fa
Showing 1 changed file with 50 additions and 34 deletions.
84 changes: 50 additions & 34 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,29 @@ void RegionTable::writeBlockByRegion(

UInt64 region_read_cost = -1, region_decode_cost = -1, write_part_cost = -1;

/// Read raw KVs from region cache.
RegionDataReadInfoList data_list_read;
{
auto start_time = Clock::now();
auto scanner = region->createCommittedScanner(table_id);
// Shortcut for empty region.
if (!scanner->hasNext())
return;
do

/// Some sanity checks for region meta.
{
data_list_read.emplace_back(scanner->next());
} while (scanner->hasNext());
region_read_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
}
if (region->isPendingRemove())
return;
}

/// Read raw KVs from region cache.
{
// Shortcut for empty region.
if (!scanner->hasNext())
return;
auto start_time = Clock::now();
do
{
data_list_read.emplace_back(scanner->next());
} while (scanner->hasNext());
region_read_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
}
}

/// Declare lambda of atomic read then write to call multiple times.
auto atomicReadWrite = [&](bool force_decode) {
Expand Down Expand Up @@ -87,17 +95,21 @@ void RegionTable::writeBlockByRegion(
};

/// Try read then write once.
if (atomicReadWrite(false))
return;
{
if (atomicReadWrite(false))
return;
}

/// If first try failed, sync schema and force read then write.
tmt.getSchemaSyncer()->syncSchemas(context);
{
tmt.getSchemaSyncer()->syncSchemas(context);

if (!atomicReadWrite(true))
// Failure won't be tolerated this time.
// TODO: Enrich exception message.
throw Exception("Write region " + std::to_string(region->id()) + " to table " + std::to_string(table_id) + " failed",
ErrorCodes::LOGICAL_ERROR);
if (!atomicReadWrite(true))
// Failure won't be tolerated this time.
// TODO: Enrich exception message.
throw Exception("Write region " + std::to_string(region->id()) + " to table " + std::to_string(table_id) + " failed",
ErrorCodes::LOGICAL_ERROR);
}

LOG_TRACE(log,
__PRETTY_FUNCTION__ << ": table " << table_id << ", region " << region->id() << ", cost [region read " << region_read_cost
Expand Down Expand Up @@ -126,19 +138,19 @@ std::tuple<BlockOption, RegionTable::RegionReadStatus> RegionTable::readBlockByR
wait_index_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
}

/// Some sanity checks for region meta.
{
if (region->isPendingRemove())
return {BlockOption{}, PENDING_REMOVE};

if (region_version != InvalidRegionVersion && (region->version() != region_version || region->confVer() != conf_version))
return {BlockOption{}, VERSION_ERROR};
}

RegionDataReadInfoList data_list_read;
{
auto scanner = region->createCommittedScanner(table_info.id);

/// Some sanity checks for region meta.
{
if (region->isPendingRemove())
return {BlockOption{}, PENDING_REMOVE};

if (region->version() != region_version || region->confVer() != conf_version)
return {BlockOption{}, VERSION_ERROR};
}

/// Deal with locks.
{
if (resolve_locks)
Expand Down Expand Up @@ -170,13 +182,17 @@ std::tuple<BlockOption, RegionTable::RegionReadStatus> RegionTable::readBlockByR
}

/// Read region data as block.
auto start_time = Clock::now();
auto [block, ok] = readRegionBlock(table_info, columns, column_names_to_read, data_list_read, start_ts, true);
region_decode_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
if (!ok)
// TODO: Enrich exception message.
throw Exception("Read region " + std::to_string(region->id()) + " of table " + std::to_string(table_info.id) + " failed",
ErrorCodes::LOGICAL_ERROR);
Block block;
{
bool ok = false;
auto start_time = Clock::now();
std::tie(block, ok) = readRegionBlock(table_info, columns, column_names_to_read, data_list_read, start_ts, true);
region_decode_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
if (!ok)
// TODO: Enrich exception message.
throw Exception("Read region " + std::to_string(region->id()) + " of table " + std::to_string(table_info.id) + " failed",
ErrorCodes::LOGICAL_ERROR);
}

LOG_TRACE(log,
__PRETTY_FUNCTION__ << ": table " << table_info.id << ", region " << region->id() << ", cost [wait index " << wait_index_cost
Expand Down

0 comments on commit bb200fa

Please sign in to comment.