Skip to content

Commit

Permalink
Fixed memory barrier issues in thread_base and use thread_interrupt s…
Browse files Browse the repository at this point in the history
…ocket for all archs.
  • Loading branch information
rakshasa committed Apr 3, 2013
1 parent 2ba037b commit 6fb96e3
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 40 deletions.
42 changes: 14 additions & 28 deletions src/torrent/utils/thread_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ thread_base::thread_base() :
{
std::memset(&m_thread, 0, sizeof(pthread_t));

#ifdef USE_INTERRUPT_SOCKET
// #ifdef USE_INTERRUPT_SOCKET
thread_interrupt::pair_type interrupt_sockets = thread_interrupt::create_pair();

m_interrupt_sender = interrupt_sockets.first;
m_interrupt_receiver = interrupt_sockets.second;
#endif
// #endif
}

thread_base::~thread_base() {
Expand All @@ -77,8 +77,7 @@ thread_base::start_thread() {
if (m_poll == NULL)
throw internal_error("No poll object for thread defined.");

if (m_state != STATE_INITIALIZED ||
pthread_create(&m_thread, NULL, (pthread_func)&thread_base::event_loop, this))
if (!is_initialized() || pthread_create(&m_thread, NULL, (pthread_func)&thread_base::event_loop, this))
throw internal_error("Failed to create thread.");
}

Expand All @@ -101,31 +100,17 @@ thread_base::stop_thread_wait() {
acquire_global_lock();
}

// Fix interrupting when shutting down thread.
void
thread_base::interrupt() {
#ifndef USE_INTERRUPT_SOCKET
__sync_fetch_and_or(&m_flags, flag_no_timeout);

while (is_polling() && has_no_timeout()) {
pthread_kill(m_thread, SIGUSR1);

if (!(is_polling() && has_no_timeout()))
return;

usleep(0);
}
#else
m_interrupt_sender->poke();
#endif
// Only poke when polling, set no_timeout
if (is_polling())
m_interrupt_sender->poke();
}

bool
thread_base::should_handle_sigusr1() {
#ifndef USE_INTERRUPT_SOCKET
return true;
#else
return false;
#endif
}

void*
Expand All @@ -142,9 +127,9 @@ thread_base::event_loop(thread_base* thread) {

try {

#ifdef USE_INTERRUPT_SOCKET
// #ifdef USE_INTERRUPT_SOCKET
thread->m_poll->insert_read(thread->m_interrupt_receiver);
#endif
// #endif

while (true) {
if (thread->m_slot_do_work)
Expand All @@ -165,7 +150,7 @@ thread_base::event_loop(thread_base* thread) {

uint64_t next_timeout = 0;

if (!(thread->m_flags & flag_no_timeout)) {
if (!thread->has_no_timeout()) {
next_timeout = thread->next_timeout_usec();

if (thread->m_slot_next_timeout)
Expand All @@ -177,16 +162,17 @@ thread_base::event_loop(thread_base* thread) {

int poll_flags = 0;

if (!(thread->m_flags & flag_main_thread))
if (!(thread->flags() & flag_main_thread))
poll_flags = torrent::Poll::poll_worker_thread;

thread->m_poll->do_poll(next_timeout, poll_flags);

__sync_fetch_and_and(&thread->m_flags, ~(flag_polling | flag_no_timeout));
}

#ifdef USE_INTERRUPT_SOCKET
// #ifdef USE_INTERRUPT_SOCKET
thread->m_poll->remove_write(thread->m_interrupt_receiver);
#endif
// #endif

} catch (torrent::shutdown_exception& e) {
lt_log_print(torrent::LOG_THREAD_NOTICE, "%s: Shutting down thread.", thread->name());
Expand Down
36 changes: 24 additions & 12 deletions src/torrent/utils/thread_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,18 @@ class LIBTORRENT_EXPORT lt_cacheline_aligned thread_base {
thread_base();
virtual ~thread_base();

bool is_initialized() const { return m_state == STATE_INITIALIZED; }
bool is_active() const { return m_state == STATE_ACTIVE; }
bool is_inactive() const { return m_state == STATE_INACTIVE; }
bool is_initialized() const { return state() == STATE_INITIALIZED; }
bool is_active() const { return state() == STATE_ACTIVE; }
bool is_inactive() const { return state() == STATE_INACTIVE; }

bool is_polling() const { return (m_flags & flag_polling); }
bool is_polling() const { return (flags() & flag_polling); }

bool has_no_timeout() const { return (m_flags & flag_no_timeout); }
bool has_do_shutdown() const { return (m_flags & flag_do_shutdown); }
bool has_did_shutdown() const { return (m_flags & flag_did_shutdown); }
bool has_no_timeout() const { return (flags() & flag_no_timeout); }
bool has_do_shutdown() const { return (flags() & flag_do_shutdown); }
bool has_did_shutdown() const { return (flags() & flag_did_shutdown); }

state_type state() const { return m_state; }
int flags() const { return m_flags; }
state_type state() const;
int flags() const;

virtual const char* name() const = 0;

Expand Down Expand Up @@ -119,7 +119,7 @@ class LIBTORRENT_EXPORT lt_cacheline_aligned thread_base {
static void* event_loop(thread_base* thread);

protected:
struct global_lock_type {
struct lt_cacheline_aligned global_lock_type {
int waiting;
int main_polling;
pthread_mutex_t lock;
Expand All @@ -131,8 +131,8 @@ class LIBTORRENT_EXPORT lt_cacheline_aligned thread_base {
static global_lock_type m_global;

pthread_t m_thread;
state_type m_state;
int m_flags;
state_type m_state lt_cacheline_aligned;
int m_flags lt_cacheline_aligned;

Poll* m_poll;
signal_type m_signal_bitfield;
Expand All @@ -144,6 +144,18 @@ class LIBTORRENT_EXPORT lt_cacheline_aligned thread_base {
thread_interrupt* m_interrupt_receiver;
};

inline int
thread_base::flags() const {
__sync_synchronize();
return m_flags;
}

inline thread_base::state_type
thread_base::state() const {
__sync_synchronize();
return m_state;
}

inline void
thread_base::send_event_signal(unsigned int index, bool do_interrupt) {
m_signal_bitfield.signal(index);
Expand Down

0 comments on commit 6fb96e3

Please sign in to comment.