Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

program suspends when destroying an async client in requestError() callback #70

Closed
zach-yu opened this issue Jan 5, 2015 · 6 comments
Closed

Comments

@zach-yu
Copy link

zach-yu commented Jan 5, 2015

In my program, I need to reconnect to the server when request error occurs, then I shall insert the new client into a map shared by several worker threads and the map is locked before insertion. However , I find out that when server has closed the connection and my program suspends at where the client is destructed. At first sight, it may seems like a deadlock, what is strange is that the requestError() is called twice by the same thread, and the first call seems to be not completed and the lock is not released!
please see attachment for a short program to reproduce the problem.

#include <thrift/lib/cpp2/server/ThriftServer.h>
#include <thrift/lib/cpp2/async/HeaderClientChannel.h>
#include <thrift/lib/cpp/async/TEventBase.h>
#include <thrift/lib/cpp/async/TAsyncSocket.h>
#include <thrift/lib/cpp/util/ScopedServerThread.h>
#include <atomic>
#include <TestService.h>
using namespace apache::thrift;
using namespace apache::thrift::test::cpp2;
using namespace apache::thrift::util;
using namespace apache::thrift::async;
std::shared_ptr<TEventBase> base;
std::shared_ptr<TestServiceAsyncClient> client;

static std::atomic<uint32_t> nCreate(0);
static std::atomic<uint32_t> nDelete(0);
static std::atomic<uint32_t> nError(0);

std::mutex gLock;

// a test server interface
class TestServerInterface : public TestServiceSvIf {
    void async_tm_test(std::unique_ptr<apache::thrift::HandlerCallback<void>> callback) {
        RequestChannel* channel = callback->getConnectionContext()->
                getConnectionContext()->getDuplexClient<TestServiceAsyncClient>()->getChannel();
        TAsyncTransport* transport =  dynamic_cast<HeaderClientChannel*>(channel)->getTransport();
        // try to close the connection
        callback->getEventBase()->runInEventBaseThread([=]() {
            transport->close();
    });
    }
};

class TestCallback : public RequestCallback {
public:
    std::shared_ptr<TestServiceAsyncClient> _client;


    TestCallback() :  _client(nullptr) {
        std::cout << "TestCallback" << std::endl;
        ++nCreate;
    }

    TestCallback(std::shared_ptr<TestServiceAsyncClient> client) :  _client(client) {
        std::cout << "TestCallback" << std::endl;
        ++nCreate;
    }

    ~TestCallback() {
        std::cout << "~TestCallback" << std::endl;
        ++nDelete;
    }

    virtual void requestSent() {
        std::lock_guard<std::mutex> lck(gLock);
        std::cerr << "requestSent" << std::endl;
    };
    virtual void requestError(ClientReceiveState&& state) {
        ++nError;
    std::cout << "before renew" << std::endl;
    {
        std::lock_guard<std::mutex> lck(gLock);
        std::shared_ptr<TAsyncSocket> socket(TAsyncSocket::newSocket(base.get(), "127.0.0.1", 18888));
        std::shared_ptr<HeaderClientChannel> channel = HeaderClientChannel::newChannel(socket);
        client.reset(new TestServiceAsyncClient(std::move(channel))); // will hang here !!!!
    }
    std::cout << "after renew" << std::endl;

    std::unique_ptr<TestCallback> cb(
             //new TestCallback(client) 
            new TestCallback()
        );
        client->test(std::move(cb));

        std::cout << "requestError" << std::endl;
    }

    virtual void replyReceived(ClientReceiveState&& state) {
        std::lock_guard<std::mutex> lck(gLock);
        std::cerr << "replyReceived" << std::endl;
    }
};

void request() {
    while(true){
        base->runInEventBaseThread([]() {
            std::unique_ptr<TestCallback> cb(
                //new TestCallback(client) 
                new TestCallback()
            );
            client->test(std::move(cb));
        });
    };
}

int main(int argc, char** argv) {

    // start a test server
    std::shared_ptr<ThriftServer> server(new ThriftServer);
    server->setInterface(std::unique_ptr<TestServerInterface>(new TestServerInterface));
    server->setPort(18888);
    server->setDuplex(true);
    ScopedServerThread sst(server);

    // init test client
    base.reset(new TEventBase);
    std::shared_ptr<TAsyncSocket> socket(TAsyncSocket::newSocket(base.get(), "127.0.0.1", 18888));
    std::shared_ptr<HeaderClientChannel> channel = HeaderClientChannel::newChannel(socket);
    client.reset(new TestServiceAsyncClient(std::move(channel)));

    // eventbase loop
    std::thread loopThread([=](){base->loopForever();});
    loopThread.detach();


    // run request and reset thread
     std::thread req_thread(request);
     req_thread.detach();



    while(true) {
        std::cerr << "callback create: " << nCreate << " "  << " delete: " << nDelete << " Error:" << nError << std::endl;
        sleep(1);
    }
    return 0;    
}
@zach-yu
Copy link
Author

zach-yu commented Jan 5, 2015

when I pass the client, a shared_ptr, into callback as a member variable, the problem seems to disappear. Hope that clue will be useful for you guys.

@zach-yu
Copy link
Author

zach-yu commented Jan 6, 2015

It seems that when I destroy client in requestError(), thrift will call TAsyncSocket::failAllWrites() which will call requestError(), requestError() has been called recursively. this is why the dead lock occurs. Is it by design? If so, can you guys document it, if not can you improve it? thanks!

@djwatson
Copy link

djwatson commented Jan 6, 2015

It looks like there might have been an attempt at a fix in this commit: (but was later reverted) 4418d1e

Does that solve this issue?

@zach-yu
Copy link
Author

zach-yu commented Jan 7, 2015

I am afraid this commit does not solve the problem. Here is the stack trace captured from my test. please note that stack #0 and stack #20, requestError() is called recursively:
#0 TestCallback::requestError(apache::thrift::ClientReceiveState&&) (this=0x7fff5c009040, state=<unknown type in /home/ze.yu/test_yz/test, CU 0x0, DIE 0x63e04>) at Test.cpp:60
#1 0x00007ffff665a760 in apache::thrift::HeaderClientChannel::TwowayCallback::requestError (this=0x7fff5c0093e0, ex=...)
at ../../../thrift/lib/cpp2/async/HeaderClientChannel.h:308
#2 0x00007ffff665520b in apache::thrift::HeaderClientChannel::messageReceiveErrorWrapped(folly::exception_wrapper&&) (this=0x6b5550,
ex=<unknown type in /home/ze.yu/fbthrift/thrift/lib/cpp2/.libs/libthriftcpp2.so.17, CU 0x30ed, DIE 0x5d06d>) at async/HeaderClientChannel.cpp:544
#3 0x00007ffff6654fe8 in apache::thrift::HeaderClientChannel::messageChannelEOF (this=0x6b5550) at async/HeaderClientChannel.cpp:519
#4 0x00007ffff66a9905 in apache::thrift::Cpp2Channel::processReadEOF (this=0x6b5700) at async/Cpp2Channel.cpp:233
#5 0x00007ffff66a87fe in apache::thrift::Cpp2Channel::closeNow (this=0x6b5700) at async/Cpp2Channel.cpp:71
#6 0x00007ffff6652547 in apache::thrift::HeaderClientChannel::destroy (this=0x6b5550) at async/HeaderClientChannel.cpp:78
#7 0x0000000000451ea1 in folly::DelayedDestruction::Destructor::operator() (this=0x6ba1d8, dd=0x6b56c0)
at /home/ze.yu/fbthrift/thrift/folly/folly/io/async/DelayedDestruction.h:55
#8 0x0000000000459e72 in std::_Sp_counted_deleter<apache::thrift::HeaderClientChannel*, folly::DelayedDestruction::Destructor, std::allocator, (__gnu_cxx::_Lock_policy)2>::_M_dispose (this=0x6ba1c0) at /usr/include/c++/4.8.2/bits/shared_ptr_base.h:347
#9 0x0000000000455c84 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release (this=0x6ba1c0) at /usr/include/c++/4.8.2/bits/shared_ptr_base.h:144
#10 0x00000000004545e9 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count (this=0x6ba238, __in_chrg=)
at /usr/include/c++/4.8.2/bits/shared_ptr_base.h:546
#11 0x0000000000453b8a in std::__shared_ptr<apache::thrift::RequestChannel, (__gnu_cxx::_Lock_policy)2>::
__shared_ptr (this=0x6ba230, __in_chrg=)
at /usr/include/c++/4.8.2/bits/shared_ptr_base.h:781
#12 0x0000000000453ba4 in std::shared_ptrapache::thrift::RequestChannel::~shared_ptr (this=0x6ba230, __in_chrg=) at /usr/include/c++/4.8.2/bits/shared_ptr.h:93
#13 0x0000000000465f27 in apache::thrift::test::cpp2::TestServiceAsyncClient::TestServiceAsyncClient (this=0x6ba1f0, __in_chrg=) at gen-cpp2/TestService.h:82
#14 0x0000000000465f72 in apache::thrift::test::cpp2::TestServiceAsyncClient::TestServiceAsyncClient (this=0x6ba1f0, __in_chrg=) at gen-cpp2/TestService.h:82
#15 0x0000000000459d8a in std::_Sp_counted_ptr<apache::thrift::test::cpp2::TestServiceAsyncClient*, (__gnu_cxx::_Lock_policy)2>::_M_dispose (this=0x6ba310)
at /usr/include/c++/4.8.2/bits/shared_ptr_base.h:290
#16 0x0000000000455c84 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release (this=0x6ba310) at /usr/include/c++/4.8.2/bits/shared_ptr_base.h:144
#17 0x00000000004545e9 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::
__shared_count (this=0x7fff677fd3f8, __in_chrg=)
at /usr/include/c++/4.8.2/bits/shared_ptr_base.h:546
#18 0x0000000000453dba in std::__shared_ptr<apache::thrift::test::cpp2::TestServiceAsyncClient, (__gnu_cxx::_Lock_policy)2>::
__shared_ptr (this=0x7fff677fd3f0,
__in_chrg=) at /usr/include/c++/4.8.2/bits/shared_ptr_base.h:781
#19 0x0000000000455446 in std::__shared_ptr<apache::thrift::test::cpp2::TestServiceAsyncClient, (_gnu_cxx::Lock_policy)2>::resetapache::thrift::test::cpp2::TestServiceAsyncClient (this=0x6acb10 , p=0x7fff5c00e5a0) at /usr/include/c++/4.8.2/bits/shared_ptr_base.h:890
#20 0x0000000000454216 in TestCallback::requestError(apache::thrift::ClientReceiveState&&) (this=0x7fff5c007990,
state=<unknown type in /home/ze.yu/test_yz/test, CU 0x0, DIE 0x63e04>) at Test.cpp:65
#21 0x00007ffff6659e5a in apache::thrift::HeaderClientChannel::TwowayCallback::messageSendError(folly::exception_wrapper&&) (this=0x7fff5c0047c0,
ex=<unknown type in /home/ze.yu/fbthrift/thrift/lib/cpp2/.libs/libthriftcpp2.so.17, CU 0x30ed, DIE 0x5d06d>) at ../../../thrift/lib/cpp2/async/HeaderClientChannel.h:272
#22 0x00007ffff66a972f in apache::thrift::Cpp2Channel::writeError (this=0x6b5700, bytesWritten=0, ex=...) at async/Cpp2Channel.cpp:223
#23 0x00007ffff66aad69 in apache::thrift::async::TAsyncTransport::WriteCallback::writeErr (this=0x6b5738, bytes=0, ex=...) at ../../../thrift/lib/cpp/async/TAsyncTransport.h:73
#24 0x00007ffff6b79607 in folly::AsyncSocket::failWrite (this=this@entry=0x6b52c8,
fn=fn@entry=0x7ffff6c338d0 <folly::AsyncSocket::writeImpl(folly::AsyncTransportWrapper::WriteCallback*, iovec const*, unsigned long, std::unique_ptr<folly::IOBuf, std::default_deletefolly::IOBuf >&&, folly::WriteFlags)::func> "writeImpl", callback=callback@entry=0x6b5738, bytesWritten=bytesWritten@entry=0, ex=...)
at io/async/AsyncSocket.cpp:1861
#25 0x00007ffff6b7b89b in folly::AsyncSocket::writeImpl(folly::AsyncTransportWrapper::WriteCallback
, iovec const
, unsigned long, std::unique_ptr<folly::IOBuf, std::default_delet---Type to continue, or q to quit---
efolly::IOBuf >&&, folly::WriteFlags) (this=0x6b52c8, callback=0x6b5738, vec=0x7fff677fd950, count=20,
buf=buf@entry=<unknown type in /home/ze.yu/fbthrift/thrift/folly/folly/.libs/libfolly.so.20, CU 0x2e2f86, DIE 0x32be79>, flags=)
at io/async/AsyncSocket.cpp:685
#26 0x00007ffff6b7bdf2 in folly::AsyncSocket::writeChainImpl(folly::AsyncTransportWrapper::WriteCallback
, iovec
, unsigned long, std::unique_ptr<folly::IOBuf, std::default_deletefolly::IOBuf >&&, folly::WriteFlags) (this=this@entry=0x6b52c8, callback=callback@entry=0x6b5738, vec=vec@entry=0x7fff677fd950, count=count@entry=20,
buf=buf@entry=<unknown type in /home/ze.yu/fbthrift/thrift/folly/folly/.libs/libfolly.so.20, CU 0x2e2f86, DIE 0x32c437>, flags=flags@entry=folly::NONE)
at io/async/AsyncSocket.cpp:644
#27 0x00007ffff6b7beb6 in folly::AsyncSocket::writeChain(folly::AsyncTransportWrapper::WriteCallback*, std::unique_ptr<folly::IOBuf, std::default_deletefolly::IOBuf >&&, folly::WriteFlags) (this=0x6b52c8, callback=0x6b5738, buf=<unknown type in /home/ze.yu/fbthrift/thrift/folly/folly/.libs/libfolly.so.20, CU 0x2e2f86, DIE 0x32c5c7>, flags=folly::NONE)
at io/async/AsyncSocket.cpp:620
#28 0x00007ffff66aa12d in apache::thrift::Cpp2Channel::runLoopCallback (this=0x6b5700) at async/Cpp2Channel.cpp:292
#29 0x00007ffff6b88bc5 in folly::EventBase::runLoopCallbacks (this=this@entry=0x6b4570, setContext=setContext@entry=true) at io/async/EventBase.cpp:599

@ghost
Copy link

ghost commented Aug 5, 2015

Thank you for reporting this issue and appreciate your patience. We've notified the core team for an update on this issue. We're looking for a response within the next 30 days or the issue may be closed.

@eduardo-elizondo
Copy link
Contributor

Closing to clean and provide better support to new issues. We are rewriting docs and guides on how to support this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants