Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UnboundedQueue::minTicket() assertion failed on enqueue #1272

Closed
tabokie opened this issue Dec 4, 2019 · 4 comments
Closed

UnboundedQueue::minTicket() assertion failed on enqueue #1272

tabokie opened this issue Dec 4, 2019 · 4 comments

Comments

@tabokie
Copy link

tabokie commented Dec 4, 2019

I am currently using UMPSCQueue<Task, true> to implement a rather simple scheduler where multiple threads enqueue the task and one worker thread receives them.
And after some user code change, the queue now constantly crash at:

#0 0x00005555556fd85e in folly::UnboundedQueue<txservice::queuedkv::Scheduler::Task, false, true, true, 8ul, 7ul, std::atomic>::Segment::minTicket (this=0x0)
at /usr/local/include/folly/concurrency/UnboundedQueue.h:829
#1 folly::UnboundedQueue<txservice::queuedkv::Scheduler::Task, false, true, true, 8ul, 7ul, std::atomic>::findSegment (t=93824999817696, s=0x0, this=0x555555d75af8)
at /usr/local/include/folly/concurrency/UnboundedQueue.h:527
#2 folly::UnboundedQueue<txservice::queuedkv::Scheduler::Task, false, true, true, 8ul, 7ul, std::atomic>::enqueueCommontxservice::queuedkv::Scheduler::Task (arg=..., s=0x0,
this=0x555555d75af8) at /usr/local/include/folly/concurrency/UnboundedQueue.h:385
#3 folly::UnboundedQueue<txservice::queuedkv::Scheduler::Task, false, true, true, 8ul, 7ul, std::atomic>::enqueueImpltxservice::queuedkv::Scheduler::Task (arg=..., this=0x555555d75af8)
at /usr/local/include/folly/concurrency/UnboundedQueue.h:376
#4 folly::UnboundedQueue<txservice::queuedkv::Scheduler::Task, false, true, true, 8ul, 7ul, std::atomic>::enqueue (arg=..., this=0x555555d75af8)
at /usr/local/include/folly/concurrency/UnboundedQueue.h:271

I was suspecting the queue size went out of limit, so I use DMPSCQueue to implement the same logic, and it crashed at the exact same spot.

Am I doing anything wrong here? What behaviour could possibly break this assertion? Thanks!

p.s. the scheduler code

class Scheduler
{
    using Executor = std::thread;
    class Task
    {
        std::function<void(void)> inner_;
    public:
        Task() : inner_([]{}) {}
        Task(std::function<void(void)> &&job) : inner_(std::move(job)) {}
        Task(Task &&rhs) : inner_(std::move(rhs.inner_)) {}
        Task &operator=(Task &&rhs) {
            inner_.swap(rhs.inner_);
        }
        void run() {
            inner_();
        }
    };

    using MPSCQueue = folly::UMPSCQueue<Task, true /*MayBlock*/>;
    // using MPSCQueue = folly::DMPSCQueue<Task, true>;

public:
    Scheduler(bool start_on_creation = true) {
        if (start_on_creation)
            Start();
    }

    ~Scheduler() {
        Stop();
    }

    bool Schedule(std::function<void(void)> &&job) {
        queue_.enqueue({std::move(job)});
        return true;
    }

    void Start(void) {
        worker_ = std::move(std::thread{&Scheduler::Run, this});
    }

    // return false if scheduler is already shutdown.
    bool Stop(void) {
        if (!stopped_.exchange(true, std::memory_order_relaxed)) {
            // first time shutdown
            worker_.join();
            return true;
        } else {
            return false;
        }
    }

private:
    MPSCQueue queue_;
    std::atomic<bool> stopped_{false};
    std::thread worker_;

    void Run() {
        while (!stopped_.load(std::memory_order_relaxed)) {
            auto p = queue_.try_dequeue_for(std::chrono::milliseconds(1000));
            if (p) {
                Task &task = *p;
                task.run();
            } else {
                std::this_thread::yield();
            }
        }
    }
};
@tabokie tabokie changed the title UnboundedQueue::minTicket() assertion failed UnboundedQueue::minTicket() assertion failed on enqueue Dec 4, 2019
@tabokie
Copy link
Author

tabokie commented Dec 4, 2019

I also add a flag to ensure that worker thread is only initiated once.

@yfeldblum
Copy link
Contributor

Is there a test program that would reproduce the crash when run?

@tabokie
Copy link
Author

tabokie commented Dec 5, 2019

@yfeldblum I can't provide a minimum test at the moment. But I ran some debugging to find out hptr.get_protected(p_.tail); returns a null segment that leads to the assertion failure.

@tabokie
Copy link
Author

tabokie commented Dec 5, 2019

user-end memory corruption, closing this.

@tabokie tabokie closed this as completed Dec 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants