Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix a bug that ExchangeReceiver can't be canceled #4436

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
virtual void readSuffix() override
{
LOG_FMT_DEBUG(log, "finish read {} rows from remote", total_rows);
remote_reader->close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about local-reader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain more? I didn't get it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does local-reader not need to close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where islocal-reader?

}
};

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/CoprocessorReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class CoprocessorReader

void cancel() { resp_iter.cancel(); }


static DecodeDetail decodeChunks(
const std::shared_ptr<tipb::SelectResponse> & resp,
std::queue<Block> & block_queue,
Expand Down Expand Up @@ -187,6 +188,8 @@ class CoprocessorReader
collected = false;
}

void close() {}

bool collected = false;
int concurrency_;
};
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ const std::unordered_map<String, std::shared_ptr<ExchangeReceiver>> & DAGContext
return mpp_exchange_receiver_map;
}

void DAGContext::cancelAllExchangeReceiver()
{
for (auto & it : mpp_exchange_receiver_map)
{
it.second->cancel();
}
}

int DAGContext::getNewThreadCountOfExchangeReceiver() const
{
return new_thread_count_of_exchange_receiver;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ class DAGContext
return sql_mode & f;
}

void cancelAllExchangeReceiver();

void initExchangeReceiverIfMPP(Context & context, size_t max_streams);
const std::unordered_map<String, std::shared_ptr<ExchangeReceiver>> & getMPPExchangeReceiverMap() const;

Expand Down
20 changes: 18 additions & 2 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,18 +311,34 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
template <typename RPCContext>
ExchangeReceiverBase<RPCContext>::~ExchangeReceiverBase()
{
setState(ExchangeReceiverState::CLOSED);
msg_channel.finish();
close();
thread_manager->wait();
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::cancel()
{
auto cur_state = getState();
if (cur_state == ExchangeReceiverState::CANCELED || cur_state == ExchangeReceiverState::CLOSED)
{
return;
}
setState(ExchangeReceiverState::CANCELED);
msg_channel.finish();
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::close()
{
auto cur_state = getState();
if (cur_state == ExchangeReceiverState::CANCELED || cur_state == ExchangeReceiverState::CLOSED)
{
return;
}
Comment on lines +334 to +337
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check should be moved into setState and under protection of mu.

setState(ExchangeReceiverState::CLOSED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure close and cancel only work if the state is normal?

Copy link
Contributor Author

@bestwoody bestwoody Mar 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safe to call it multiple times since they are idempotent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's weird that you can change the status from CLOSE to CANCELLED, and CANCELLED to CLOSE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another annoying point is that ExchangeReceiver also has another ExchangeReceiverState::ERROR state. So any suggestion about state checking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's weird that you can change the status from CLOSE to CANCELLED, and CANCELLED to CLOSE

The current version of TiFlash may also suffer from the same problem. How about refining it in future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check added.

msg_channel.finish();
}
Comment on lines +332 to +340
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that those lines are almost similar to L321-327

Copy link
Contributor Author

@bestwoody bestwoody Mar 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refined in another pr, since my local repo is broken, see another PR #4441


template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::setUpConnection()
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class ExchangeReceiverBase

void cancel();

void close();

const DAGSchema & getOutputSchema() const { return schema; }

ExchangeReceiverResult nextResult(
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ void MPPTask::runImpl()
else
{
context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true);
if (dag_context)
dag_context->cancelAllExchangeReceiver();
Comment on lines +355 to +356
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why before L357? is it better to write errors to the turnnel first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refering to chat history in our OOT group, xufei has explained the reason. Receiver needs close first

writeErrToAllTunnels(err_msg);
}
LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
Expand Down