Wednesday, June 26, 2013

A simple, fast, unbounded, batched, thread-safe queue

In my current project, I needed to pass items from one thread to another. While this is a classic problem in multi-threaded programs, I wanted to handle items in batches, keep the critical sections as small as possible, and reduce dynamic memory allocation churn. Examples of this pattern came up in a watchdog implementation (where multiple threads queue "heartbeats" for the watchdog to consume), and in recording status (multiple threads provide reports at different frequences, and they are consumed to update the global status table).

The solution I settled on uses a mutex and two vectors (one for other threads to push new items onto, the other for the worker thread to consume). After I'm done with one batch, I simply swap the working vector with the queue. The only time that the worker can block another thread is during the swap; for C++ vectors, that's is guaranteed to be O(1).

(The enqueue can block if it has to reallocate, but that should be progressively rarer as the program runs. Since we never actually create new vectors -- we just swap them back and forth -- any time either vector increases capacity, that capacity is preserved. This can be bootstrapped by using reserve() during startup.)

#include <vector>
#include <boost/thread.hpp>

class Service
{

public:

    enum { INIT_QUEUE_SIZE = 100 };

    struct Item { ... };

    Service()
        : queue(),
          running( true ),
          worker( runWorker, this )
    {
        {
            Lock lock( mutex );
            queue.reserve( INIT_QUEUE_SIZE );
        }
    }

    ~Service()
    {
        {
            Lock lock( mutex );
            running = false;
        }
        worker.join();
    }

    void enqueue( const Item & item )
    {
        Lock lock( mutex );
        if ( running )
            queue.push_back( item );
    }

private:

    typedef std::vector< Item > ItemVec;
    typedef boost::mutex Mutex;
    typedef boost::unique_lock< Mutex > Lock;
    typedef boost::thread Thread;

    Mutex mutex;
    ItemVec queue;
    bool running;
    Thread worker;

    void runWorker()
    {
        ItemVec work.reserve( INIT_QUEUE_SIZE );
        bool done( false );
        while ( ! done )
        {
            {
                Lock lock( mutex );
                queue.swap( work );
                done = !running;
            }

            if ( work.empty() );
            {
                // sleep or wait on a condition variable
                continue;
            }

            // process elements in work

            work.clear();
        }
    }

};