aboutsummaryrefslogblamecommitdiff
path: root/include/salticidae/queue.h
blob: 3df1529f351e08f79919736a63a47a6f11163a18 (plain) (tree)
1
2
3
4
5
6
7





                           
                 








                                        
                                   
                                           











                                                                           

                
                                                         

                                                                               
                                                        
                                                        
                                                                           







                                                              










                                                         
                                                               



                                                                               
                                                                                     



                                                                             

                                                                                















                                                                           

                               

                    
              
                                         



                                                           








                                                      













































                                                                                               





                                          
                                                       



                                              










                                                              
                                            
                                                              


                                                  
                        
                                                
                                      



                             
                                  




                            








                                                                                            
             




                                                                                          
                                    
                         
             









                                                                                       
         
                    


     


                                       

                                                            
                            




















                                                                     




                        
                                                            















                                                                                   



                    


      
#ifndef _SALTICIDAE_QUEUE_H
#define _SALTICIDAE_QUEUE_H

#include <atomic>
#include <vector>
#include <cassert>
#include <thread>

namespace salticidae {

static size_t const cacheline_size = 64;

class FreeList {
    public:
    struct Node {
        std::atomic<Node *> next;
        std::atomic<size_t> refcnt;
        Node(): next(nullptr), refcnt(1) {}
    };

    private:
    alignas(cacheline_size) std::atomic<Node *> top;

    public:
    FreeList(): top(nullptr) {}
    FreeList(const FreeList &) = delete;
    FreeList(FreeList &&) = delete;

    void release_ref(Node *u) {
        if (u->refcnt.fetch_sub(1, std::memory_order_relaxed) != 1) return;
        for (;;)
        {
            auto t = top.load(std::memory_order_relaxed);
            // repair the next pointer before CAS, otherwise u->next == nullptr
            // could lead to skipping elements
            u->next.store(t, std::memory_order_relaxed);
            // the replacement is ok even if ABA happens
            if (top.compare_exchange_weak(t, u, std::memory_order_release))
            {
                u->refcnt.store(1, std::memory_order_relaxed);
                break;
            }
        }
    }

    bool push(Node *u) {
        release_ref(u);
        return true;
    }

    bool pop(Node *&r) {
        bool loop = true;
        while (loop)
        {
            auto u = top.load(std::memory_order_acquire);
            /* the list is now empty */
            if (u == nullptr) return false;
            auto t = u->refcnt.load(std::memory_order_relaxed);
            /* let's wait for another round if u is a ghost (already popped) */
            if (!t) continue;
            /* otherwise t > 0, so with CAS, the invariant that zero refcnt can
             * never be increased is guaranteed */
            if (u->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_relaxed))
            {
                /* here, nobody is able to change v->next (because v->next is
                 * only changed when pushed) even when ABA happens */
                auto v = u;
                auto nv = u->next.load(std::memory_order_relaxed);
                if (top.compare_exchange_weak(v, nv, std::memory_order_relaxed))
                {
                    /* manage to pop the head */
                    r = u;
                    loop = false;
                    /* do not need to try cas_push here because the current
                     * thread is the only one who can push u back */
                }
                /* release the refcnt and execute the delayed push call if
                 * necessary */
                release_ref(u);
            }
        }
        return true;
    }
};

const size_t MPMCQ_SIZE = 4096;

template<typename T>
class MPMCQueue {
    protected:
    struct Block: public FreeList::Node {
        alignas(cacheline_size) std::atomic<uint32_t> head;
        alignas(cacheline_size) std::atomic<uint32_t> tail;
        T elem[MPMCQ_SIZE];
        std::atomic<bool> avail[MPMCQ_SIZE];
        std::atomic<Block *> next;
    };

    FreeList blks;

    alignas(cacheline_size) std::atomic<Block *> head;
    alignas(cacheline_size) std::atomic<Block *> tail;

    template<typename U>
    bool _enqueue(U &&e, bool unbounded = true) {
        for (;;)
        {
            auto t = tail.load(std::memory_order_relaxed);
            auto tcnt = t->refcnt.load(std::memory_order_relaxed);
            if (!tcnt) c