Skip to content

Commit

Permalink
Switch to ObserverContainer (5/x): Start passing observer events via …
Browse files Browse the repository at this point in the history
…`invokeInterfaceMethod`

Summary:
This diff is part of a larger change to use `folly::ObserverContainer` (introduced in D27062840) with `AsyncSocket`.

This diff:
- generates observer events for the non-byteEvent calls through `ObserverContainer::invokeInterfaceMethod`
- adds unit tests for the above events

Reviewed By: cristianlumezanu

Differential Revision: D42646344

fbshipit-source-id: 7d5c6be05735ff2a57dce6640fae3eb89306bcc3
  • Loading branch information
Brandon Schlinker authored and facebook-github-bot committed Jun 26, 2023
1 parent 925b435 commit d59f984
Show file tree
Hide file tree
Showing 3 changed files with 543 additions and 0 deletions.
82 changes: 82 additions & 0 deletions folly/io/async/AsyncSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,8 @@ AsyncSocket::AsyncSocket(AsyncSocket* oldAsyncSocket)
// inform lifecycle observers to give them an opportunity to unsubscribe from
// events for the old socket and subscribe to the new socket; we do not move
// the subscription ourselves

// legacy observer support
for (const auto& cb : oldAsyncSocket->lifecycleObservers_) {
cb->move(oldAsyncSocket, this);
}
Expand Down Expand Up @@ -789,9 +791,17 @@ NetworkSocket AsyncSocket::detachNetworkSocket() {
VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
<< ", evb=" << eventBase_ << ", state=" << state_
<< ", events=" << std::hex << eventFlags_ << ")";
// legacy observer support
for (const auto& cb : lifecycleObservers_) {
cb->fdDetach(this);
}

// folly::ObserverContainer observer support
if (auto list = getAsyncSocketObserverContainer()) {
list->invokeInterfaceMethodAllObservers(
[](auto observer, auto observed) { observer->fdDetach(observed); });
}

// Extract the fd, and set fd_ to -1 first, so closeNow() won't
// actually close the descriptor.
if (const auto socketSet = wShutdownSocketSet_.lock()) {
Expand Down Expand Up @@ -2116,9 +2126,18 @@ void AsyncSocket::attachEventBase(EventBase* eventBase) {
if (evbChangeCb_) {
evbChangeCb_->evbAttached(this);
}

// legacy observer support
for (const auto& cb : lifecycleObservers_) {
cb->evbAttach(this, eventBase_);
}

// folly::ObserverContainer observer support
if (auto list = getAsyncSocketObserverContainer()) {
list->invokeInterfaceMethodAllObservers([&](auto observer, auto observed) {
observer->evbAttach(observed, eventBase_);
});
}
}

void AsyncSocket::detachEventBase() {
Expand All @@ -2141,9 +2160,19 @@ void AsyncSocket::detachEventBase() {
if (evbChangeCb_) {
evbChangeCb_->evbDetached(this);
}

// legacy observer support
for (const auto& cb : lifecycleObservers_) {
cb->evbDetach(this, existingEvb);
}

// folly::ObserverContainer observer support
if (auto list = getAsyncSocketObserverContainer()) {
list->invokeInterfaceMethodAllObservers(
[existingEvb](auto observer, auto observed) {
observer->evbDetach(observed, existingEvb);
});
}
}

bool AsyncSocket::isDetachable() const {
Expand Down Expand Up @@ -3451,10 +3480,18 @@ void AsyncSocket::handleNetworkSocketAttached() {
VLOG(6) << "AsyncSocket::attachFd(this=" << this << ", fd=" << fd_
<< ", evb=" << eventBase_ << " , state=" << state_
<< ", events=" << std::hex << eventFlags_ << ")";

// legacy observer support
for (const auto& cb : lifecycleObservers_) {
cb->fdAttach(this);
}

// folly::ObserverContainer observer support
if (auto list = getAsyncSocketObserverContainer()) {
list->invokeInterfaceMethodAllObservers(
[](auto observer, auto observed) { observer->fdAttach(observed); });
}

if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
shutdownSocketSet->add(fd_);
}
Expand Down Expand Up @@ -4032,10 +4069,19 @@ void AsyncSocket::invalidState(ConnectCallback* callback) {
"connect() called with socket in invalid state");
connectEndTime_ = std::chrono::steady_clock::now();
if ((state_ == StateEnum::CONNECTING) || (state_ == StateEnum::ERROR)) {
// legacy observer support
for (const auto& cb : lifecycleObservers_) {
// inform any lifecycle observes that the connection failed
cb->connectError(this, ex);
}

// folly::ObserverContainer observer support
if (auto list = getAsyncSocketObserverContainer()) {
list->invokeInterfaceMethodAllObservers(
[ex](auto observer, auto observed) {
observer->connectError(observed, ex);
});
}
}
if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
if (callback) {
Expand Down Expand Up @@ -4085,9 +4131,19 @@ void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
// ESTABLISHED} (!?) and a bunch of other places that are not what this
// call back wants. This seems like a bug but work around here while we
// explore it independently

// legacy observer support
for (const auto& cb : lifecycleObservers_) {
cb->connectError(this, ex);
}

// folly::ObserverContainer observer support
if (auto list = getAsyncSocketObserverContainer()) {
list->invokeInterfaceMethodAllObservers(
[ex](auto observer, auto observed) {
observer->connectError(observed, ex);
});
}
}
if (connectCallback_) {
ConnectCallback* callback = connectCallback_;
Expand All @@ -4101,10 +4157,20 @@ void AsyncSocket::invokeConnectSuccess() {
<< "): connect success invoked";
connectEndTime_ = std::chrono::steady_clock::now();
bool enableByteEventsForObserver = false;

// legacy observer support
for (const auto& cb : lifecycleObservers_) {
cb->connectSuccess(this);
enableByteEventsForObserver |= ((cb->getConfig().byteEvents) ? 1 : 0);
}

// folly::ObserverContainer observer support
if (auto list = getAsyncSocketObserverContainer()) {
list->invokeInterfaceMethodAllObservers([](auto observer, auto observed) {
observer->connectSuccess(observed);
});
}

if (enableByteEventsForObserver) {
enableByteEvents();
}
Expand All @@ -4118,9 +4184,17 @@ void AsyncSocket::invokeConnectSuccess() {
void AsyncSocket::invokeConnectAttempt() {
VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
<< "): connect attempt";
// legacy observer support
for (const auto& cb : lifecycleObservers_) {
cb->connectAttempt(this);
}

// folly::ObserverContainer observer support
if (auto list = getAsyncSocketObserverContainer()) {
list->invokeInterfaceMethodAllObservers([](auto observer, auto observed) {
observer->connectAttempt(observed);
});
}
}

void AsyncSocket::invalidState(ReadCallback* callback) {
Expand Down Expand Up @@ -4166,9 +4240,17 @@ void AsyncSocket::invalidState(WriteCallback* callback) {
}

void AsyncSocket::doClose() {
// legacy observer support
for (const auto& cb : lifecycleObservers_) {
cb->close(this);
}

// folly::ObserverContainer support
if (auto list = getAsyncSocketObserverContainer()) {
list->invokeInterfaceMethodAllObservers(
[](auto observer, auto observed) { observer->close(observed); });
}

if (fd_ == NetworkSocket()) {
return;
}
Expand Down
Loading

0 comments on commit d59f984

Please sign in to comment.