Skip to content

Commit

Permalink
Move core functionality to QuicTransportBaseLite [8/n]
Browse files Browse the repository at this point in the history
Summary: See title.

Reviewed By: mjoras

Differential Revision: D64065153

fbshipit-source-id: 5c9515dcaba1ef1f30d49f701e366f715854527a
  • Loading branch information
Aman Sharma authored and facebook-github-bot committed Oct 10, 2024
1 parent c62dac1 commit b7169e3
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 277 deletions.
215 changes: 1 addition & 214 deletions quic/api/QuicTransportBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,8 @@ QuicTransportBase::QuicTransportBase(
useConnectionEndWithErrorCallback),
ackTimeout_(this),
pathValidationTimeout_(this),
keepaliveTimeout_(this),
drainTimeout_(this),
pingTimeout_(this),
readLooper_(new FunctionLooper(
evb_,
[this]() { invokeReadDataAndCallbacks(); },
LooperType::ReadLooper)),
peekLooper_(new FunctionLooper(
evb_,
[this]() { invokePeekDataAndCallbacks(); },
LooperType::PeekLooper)) {
pingTimeout_(this) {
writeLooper_->setPacingFunction([this]() -> auto {
if (isConnectionPaced(*conn_)) {
return conn_->pacer->getTimeUntilNextWrite();
Expand Down Expand Up @@ -734,34 +725,6 @@ QuicTransportBase::pauseOrResumeRead(StreamId id, bool resume) {
return folly::unit;
}

void QuicTransportBase::updateReadLooper() {
if (closeState_ != CloseState::OPEN) {
VLOG(10) << "Stopping read looper " << *this;
readLooper_->stop();
return;
}
auto iter = std::find_if(
conn_->streamManager->readableStreams().begin(),
conn_->streamManager->readableStreams().end(),
[&readCallbacks = readCallbacks_](StreamId s) {
auto readCb = readCallbacks.find(s);
if (readCb == readCallbacks.end()) {
return false;
}
// TODO: if the stream has an error and it is also paused we should
// still return an error
return readCb->second.readCb && readCb->second.resumed;
});
if (iter != conn_->streamManager->readableStreams().end() ||
!conn_->datagramState.readBuffer.empty()) {
VLOG(10) << "Scheduling read looper " << *this;
readLooper_->run();
} else {
VLOG(10) << "Stopping read looper " << *this;
readLooper_->stop();
}
}

folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::setPeekCallback(
StreamId id,
PeekCallback* cb) {
Expand Down Expand Up @@ -848,89 +811,6 @@ void QuicTransportBase::invokeStreamsAvailableCallbacks() {
}
}

void QuicTransportBase::updatePeekLooper() {
if (peekCallbacks_.empty() || closeState_ != CloseState::OPEN) {
VLOG(10) << "Stopping peek looper " << *this;
peekLooper_->stop();
return;
}
VLOG(10) << "Updating peek looper, has "
<< conn_->streamManager->peekableStreams().size()
<< " peekable streams";
auto iter = std::find_if(
conn_->streamManager->peekableStreams().begin(),
conn_->streamManager->peekableStreams().end(),
[&peekCallbacks = peekCallbacks_](StreamId s) {
VLOG(10) << "Checking stream=" << s;
auto peekCb = peekCallbacks.find(s);
if (peekCb == peekCallbacks.end()) {
VLOG(10) << "No peek callbacks for stream=" << s;
return false;
}
if (!peekCb->second.resumed) {
VLOG(10) << "peek callback for stream=" << s << " not resumed";
}

if (!peekCb->second.peekCb) {
VLOG(10) << "no peekCb in peekCb stream=" << s;
}
return peekCb->second.peekCb && peekCb->second.resumed;
});
if (iter != conn_->streamManager->peekableStreams().end()) {
VLOG(10) << "Scheduling peek looper " << *this;
peekLooper_->run();
} else {
VLOG(10) << "Stopping peek looper " << *this;
peekLooper_->stop();
}
}

void QuicTransportBase::updateWriteLooper(bool thisIteration, bool runInline) {
if (closeState_ == CloseState::CLOSED) {
VLOG(10) << nodeToString(conn_->nodeType)
<< " stopping write looper because conn closed " << *this;
writeLooper_->stop();
return;
}

if (conn_->transportSettings.checkIdleTimerOnWrite) {
checkIdleTimer(Clock::now());
if (closeState_ == CloseState::CLOSED) {
return;
}
}

// If socket writable events are in use, do nothing if we are already waiting
// for the write event.
if (conn_->transportSettings.useSockWritableEvents &&
socket_->isWritableCallbackSet()) {
return;
}

auto writeDataReason = shouldWriteData(*conn_);
if (writeDataReason != WriteDataReason::NO_WRITE) {
VLOG(10) << nodeToString(conn_->nodeType)
<< " running write looper thisIteration=" << thisIteration << " "
<< *this;
writeLooper_->run(thisIteration, runInline);
if (conn_->loopDetectorCallback) {
conn_->writeDebugState.needsWriteLoopDetect =
(conn_->loopDetectorCallback != nullptr);
}
} else {
VLOG(10) << nodeToString(conn_->nodeType) << " stopping write looper "
<< *this;
writeLooper_->stop();
if (conn_->loopDetectorCallback) {
conn_->writeDebugState.needsWriteLoopDetect = false;
conn_->writeDebugState.currentEmptyLoopCount = 0;
}
}
if (conn_->loopDetectorCallback) {
conn_->writeDebugState.writeDataReason = writeDataReason;
}
}

void QuicTransportBase::cancelDeliveryCallbacksForStream(StreamId id) {
cancelByteEventCallbacksForStream(ByteEvent::Type::ACK, id);
}
Expand Down Expand Up @@ -1235,62 +1115,6 @@ void QuicTransportBase::handlePingCallbacks() {
conn_->pendingEvents.cancelPingTimeout = false;
}

void QuicTransportBase::processCallbacksAfterWriteData() {
if (closeState_ != CloseState::OPEN) {
return;
}

auto txStreamId = conn_->streamManager->popTx();
while (txStreamId.has_value()) {
auto streamId = *txStreamId;
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId));
auto largestOffsetTxed = getLargestWriteOffsetTxed(*stream);
// if it's in the set of streams with TX, we should have a valid offset
CHECK(largestOffsetTxed.has_value());

// lambda to help get the next callback to call for this stream
auto getNextTxCallbackForStreamAndCleanup =
[this, &largestOffsetTxed](
const auto& streamId) -> Optional<ByteEventDetail> {
auto txCallbacksForStreamIt = txCallbacks_.find(streamId);
if (txCallbacksForStreamIt == txCallbacks_.end() ||
txCallbacksForStreamIt->second.empty()) {
return none;
}

auto& txCallbacksForStream = txCallbacksForStreamIt->second;
if (txCallbacksForStream.front().offset > *largestOffsetTxed) {
return none;
}

// extract the callback, pop from the queue, then check for cleanup
auto result = txCallbacksForStream.front();
txCallbacksForStream.pop_front();
if (txCallbacksForStream.empty()) {
txCallbacks_.erase(txCallbacksForStreamIt);
}
return result;
};

Optional<ByteEventDetail> nextOffsetAndCallback;
while (
(nextOffsetAndCallback =
getNextTxCallbackForStreamAndCleanup(streamId))) {
ByteEvent byteEvent{
streamId, nextOffsetAndCallback->offset, ByteEvent::Type::TX};
nextOffsetAndCallback->callback->onByteEvent(byteEvent);

// connection may be closed by callback
if (closeState_ != CloseState::OPEN) {
return;
}
}

// pop the next stream
txStreamId = conn_->streamManager->popTx();
}
}

void QuicTransportBase::handleKnobCallbacks() {
if (!conn_->transportSettings.advertisedKnobFrameSupport) {
VLOG(4) << "Received knob frames without advertising support";
Expand Down Expand Up @@ -1770,33 +1594,6 @@ void QuicTransportBase::onNetworkData(
}
}

void QuicTransportBase::setIdleTimer() {
if (closeState_ == CloseState::CLOSED) {
return;
}
cancelTimeout(&idleTimeout_);
cancelTimeout(&keepaliveTimeout_);
auto localIdleTimeout = conn_->transportSettings.idleTimeout;
// The local idle timeout being zero means it is disabled.
if (localIdleTimeout == 0ms) {
return;
}
auto peerIdleTimeout =
conn_->peerIdleTimeout > 0ms ? conn_->peerIdleTimeout : localIdleTimeout;
auto idleTimeout = timeMin(localIdleTimeout, peerIdleTimeout);

idleTimeoutCheck_.idleTimeoutMs = idleTimeout;
idleTimeoutCheck_.lastTimeIdleTimeoutScheduled_ = Clock::now();

scheduleTimeout(&idleTimeout_, idleTimeout);
auto idleTimeoutCount = idleTimeout.count();
if (conn_->transportSettings.enableKeepalive) {
std::chrono::milliseconds keepaliveTimeout = std::chrono::milliseconds(
idleTimeoutCount - static_cast<int64_t>(idleTimeoutCount * .15));
scheduleTimeout(&keepaliveTimeout_, keepaliveTimeout);
}
}

uint64_t QuicTransportBase::getNumOpenableBidirectionalStreams() const {
return conn_->streamManager->openableLocalBidirectionalStreams();
}
Expand Down Expand Up @@ -2198,12 +1995,6 @@ void QuicTransportBase::pathValidationTimeoutExpired() noexcept {
std::string("Path validation timed out")));
}

void QuicTransportBase::keepaliveTimeoutExpired() noexcept {
[[maybe_unused]] auto self = sharedGuard();
conn_->pendingEvents.sendPing = true;
updateWriteLooper(true, conn_->transportSettings.inlineWriteAfterRead);
}

void QuicTransportBase::scheduleAckTimeout() {
if (closeState_ == CloseState::CLOSED) {
return;
Expand Down Expand Up @@ -2272,10 +2063,6 @@ void QuicTransportBase::schedulePathValidationTimeout() {
}
}

bool QuicTransportBase::isLossTimeoutScheduled() {
return isTimeoutScheduled(&lossTimeout_);
}

void QuicTransportBase::setSupportedVersions(
const std::vector<QuicVersion>& versions) {
conn_->originalVersion = versions.at(0);
Expand Down
29 changes: 0 additions & 29 deletions quic/api/QuicTransportBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,24 +433,6 @@ class QuicTransportBase : public QuicSocket,
QuicTransportBase* transport_;
};

class KeepaliveTimeout : public QuicTimerCallback {
public:
~KeepaliveTimeout() override = default;

explicit KeepaliveTimeout(QuicTransportBase* transport)
: transport_(transport) {}

void timeoutExpired() noexcept override {
transport_->keepaliveTimeoutExpired();
}
void callbackCanceled() noexcept override {
// Specifically do nothing since if we got canceled we shouldn't write.
}

private:
QuicTransportBase* transport_;
};

// DrainTimeout is a bit different from other timeouts. It needs to hold a
// shared_ptr to the transport, since if a DrainTimeout is scheduled,
// transport cannot die.
Expand All @@ -469,8 +451,6 @@ class QuicTransportBase : public QuicSocket,
QuicTransportBase* transport_;
};

bool isLossTimeoutScheduled() override; // TODO: make this const again

// If you don't set it, the default is Cubic
void setCongestionControl(CongestionControlType type) override;

Expand Down Expand Up @@ -552,12 +532,8 @@ class QuicTransportBase : public QuicSocket,
void updateCongestionControlSettings(
const TransportSettings& transportSettings);
void updateSocketTosSettings(uint8_t dscpValue);
void processCallbacksAfterWriteData() override;
void processCallbacksAfterNetworkData();
void invokeStreamsAvailableCallbacks();
void updateReadLooper() override;
void updatePeekLooper() override;
void updateWriteLooper(bool thisIteration, bool runInline = false) override;
void handlePingCallbacks();
void handleKnobCallbacks();
void handleAckEventCallbacks();
Expand Down Expand Up @@ -604,11 +580,9 @@ class QuicTransportBase : public QuicSocket,

void ackTimeoutExpired() noexcept;
void pathValidationTimeoutExpired() noexcept;
void keepaliveTimeoutExpired() noexcept;
void drainTimeoutExpired() noexcept;
void pingTimeoutExpired() noexcept;

void setIdleTimer() override;
void scheduleAckTimeout() override;
void schedulePathValidationTimeout() override;
void schedulePingTimeout(
Expand Down Expand Up @@ -662,11 +636,8 @@ class QuicTransportBase : public QuicSocket,

AckTimeout ackTimeout_;
PathValidationTimeout pathValidationTimeout_;
KeepaliveTimeout keepaliveTimeout_;
DrainTimeout drainTimeout_;
PingTimeout pingTimeout_;
FunctionLooper::Ptr readLooper_;
FunctionLooper::Ptr peekLooper_;

// TODO: This is silly. We need a better solution.
// Uninitialied local address as a fallback answer when socket isn't bound.
Expand Down
Loading

0 comments on commit b7169e3

Please sign in to comment.