Skip to content

Commit

Permalink
Resolve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Dec 29, 2023
1 parent 20ead31 commit 3ab2f27
Show file tree
Hide file tree
Showing 12 changed files with 14 additions and 355 deletions.
20 changes: 1 addition & 19 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/traverseExecutors.h>
<<<<<<< HEAD
#include <Storages/Transaction/TMTContext.h>
=======
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/KVStore/TMTContext.h>
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
#include <Storages/Transaction/TMTContext.h>
#include <kvproto/disaggregated.pb.h>
#include <tipb/executor.pb.h>

Expand Down Expand Up @@ -344,19 +340,5 @@ const SingleTableRegions & DAGContext::getTableRegionsInfoByTableID(Int64 table_
{
return tables_regions_info.getTableRegionInfoByTableID(table_id);
}
<<<<<<< HEAD
=======

RU DAGContext::getReadRU() const
{
UInt64 read_bytes = 0;
for (const auto & [id, sc] : scan_context_map)
{
(void)id; // Disable unused variable warnning.
read_bytes += sc->user_read_bytes;
}
return bytesToRU(read_bytes);
}

>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
} // namespace DB
7 changes: 1 addition & 6 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,9 @@
#include <Interpreters/SubqueryForSet.h>
#include <Parsers/makeDummyQuery.h>
#include <Storages/DeltaMerge/Remote/DisaggTaskId.h>
<<<<<<< HEAD
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/Transaction/TiDB.h>

=======
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <TiDB/Schema/TiDB.h>
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
#include <Storages/Transaction/TiDB.h>
namespace DB
{
class Context;
Expand Down
7 changes: 0 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,22 +396,15 @@ void DAGStorageInterpreter::prepare()
learner_read_snapshot = doBatchCopLearnerRead();
else
learner_read_snapshot = doCopLearnerRead();
<<<<<<< HEAD
=======
scan_context->learner_read_ns += watch.elapsed();
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))

// Acquire read lock on `alter lock` and build the requested inputstreams
storages_with_structure_lock = getAndLockStorages(context.getSettingsRef().schema_version);
assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end());
storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage;

<<<<<<< HEAD
std::tie(required_columns, is_need_add_cast_column) = getColumnsForTableScan();
=======
std::tie(required_columns, may_need_add_cast_column) = getColumnsForTableScan();
scan_context->num_columns = required_columns.size();
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
}

void DAGStorageInterpreter::executeCastAfterTableScan(
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
#pragma once

#include <Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h>
<<<<<<< HEAD
=======
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/ScanContext.h>
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
#include <Storages/DeltaMerge/SkippableBlockInputStream.h>

namespace DB
Expand Down
19 changes: 3 additions & 16 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,19 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
constexpr static const char * COMPACT_FILTER_NAME = "mode=COMPACT";

public:
<<<<<<< HEAD
DMVersionFilterBlockInputStream(const BlockInputStreamPtr & input,
const ColumnDefines & read_columns,
UInt64 version_limit_,
bool is_common_handle_,
const String & tracing_id = "")
=======
DMVersionFilterBlockInputStream(
const BlockInputStreamPtr & input,
const ColumnDefines & read_columns,
UInt64 version_limit_,
bool is_common_handle_,
const String & tracing_id = "",
const ScanContextPtr & scan_context_ = nullptr)
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
const String & tracing_id = "",
const ScanContextPtr & scan_context_ = nullptr)
: version_limit(version_limit_)
, is_common_handle(is_common_handle_)
, header(toEmptyBlock(read_columns))
, select_by_colid_action(input->getHeader(), header)
<<<<<<< HEAD
, scan_context(scan_context_)
, log(Logger::get((MODE == DM_VERSION_FILTER_MODE_MVCC ? MVCC_FILTER_NAME : COMPACT_FILTER_NAME),
tracing_id))
=======
, scan_context(scan_context_)
, log(Logger::get((MODE == DM_VERSION_FILTER_MODE_MVCC ? MVCC_FILTER_NAME : COMPACT_FILTER_NAME), tracing_id))
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
{
children.push_back(input);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,14 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool)
auto seg_id = pa.first;
merging_segments[pool->tableId()][seg_id].push_back(pool->poolId());
}
<<<<<<< HEAD
auto block_slots = pool->getFreeBlockSlots();
LOG_DEBUG(log, "Added, pool_id={} table_id={} block_slots={} segment_count={} pool_count={} cost={}ns do_add_cost={}ns", //
pool->poolId(),
pool->tableId(),
block_slots,
tasks.size(),
read_pools.size(),
sw_add.elapsed(),
sw_do_add.elapsed());
=======
LOG_INFO(
req_log,
"Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={:.3f}us do_add_cost={:.3f}us", //
pool->pool_id,
pool->getFreeBlockSlots(),
tasks.size(),
read_pools.size(),
sw_add.elapsed() / 1000.0,
sw_do_add.elapsed() / 1000.0);
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
LOG_INFO(log, "Added, pool_id={} table_id={} block_slots={} segment_count={} pool_count={} cost={:3f}us do_add_cost={:.3f}us", //
pool->poolId(),
pool->tableId(),
pool->getFreeBlockSlots(),
tasks.size(),
read_pools.size(),
sw_add.elapsed() / 1000.0,
sw_do_add.elapsed() / 1000.0);
}

std::pair<MergedTaskPtr, bool> SegmentReadTaskScheduler::scheduleMergedTask()
Expand Down
11 changes: 1 addition & 10 deletions dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,18 @@ class UnorderedInputStream : public IProfilingBlockInputStream
header.insert(extra_table_id_index, col);
}
ref_no = task_pool->increaseUnorderedInputStreamRefCount();
<<<<<<< HEAD
LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->poolId(), ref_no);
=======
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
}

~UnorderedInputStream() override
{
<<<<<<< HEAD
task_pool->decreaseUnorderedInputStreamRefCount();
LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->poolId(), ref_no);
=======
if (const auto rc_before_decr = task_pool->decreaseUnorderedInputStreamRefCount(); rc_before_decr == 1)
{
LOG_INFO(
log,
"All unordered input streams are finished, pool_id={} last_stream_ref_no={}",
task_pool->pool_id,
task_pool->poolId(),
ref_no);
}
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
}

String getName() const override { return NAME; }
Expand Down
9 changes: 0 additions & 9 deletions dbms/src/Storages/DeltaMerge/Remote/RNLocalPageCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,17 +350,8 @@ RNLocalPageCache::OccupySpaceResult RNLocalPageCache::occupySpace(const std::vec
// Pages may be occupied but not written yet, so we always return missing pages according
// to the storage.
if (const auto & page_entry = storage->getEntry(keys[i], snapshot); page_entry.isValid())
<<<<<<< HEAD
=======
{
scan_context->disagg_read_cache_hit_size += page_sizes[i];
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
continue;
missing_ids.push_back(pages[i]);
<<<<<<< HEAD
=======
scan_context->disagg_read_cache_miss_size += page_sizes[i];
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
}
GET_METRIC(tiflash_storage_remote_cache, type_page_miss).Increment(missing_ids.size());
GET_METRIC(tiflash_storage_remote_cache, type_page_hit).Increment(pages.size() - missing_ids.size());
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Storages/DeltaMerge/Remote/RNLocalPageCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
#include <Interpreters/Context_fwd.h>
#include <Storages/DeltaMerge/Remote/ObjectId.h>
#include <Storages/DeltaMerge/Remote/RNLocalPageCache_fwd.h>
<<<<<<< HEAD
=======
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <Storages/KVStore/Types.h>
>>>>>>> ce42814e49 (*: Add table scan details logging; change default logging level to "info" (#8616))
#include <Storages/Page/Page.h>
#include <Storages/Page/PageStorage_fwd.h>
#include <Storages/Page/V3/Universal/UniversalPageId.h>
Expand Down
Loading

0 comments on commit 3ab2f27

Please sign in to comment.